#region License Information
/* HeuristicLab
* Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using HeuristicLab.Clients.Hive;
using HeuristicLab.Common;
using HeuristicLab.Core;
using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
namespace HeuristicLab.HiveEngine {
///
/// Represents an engine that executes operations which can be executed in parallel on the hive
///
[StorableClass]
[Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
public class HiveEngine : Engine {
private static object locker = new object();
private CancellationToken cancellationToken;
private bool firstRun = true;
[Storable]
private IOperator currentOperator;
[Storable]
public string ResourceNames { get; set; }
[Storable]
private int priority;
public int Priority {
get { return priority; }
set { priority = value; }
}
[Storable]
private TimeSpan executionTimeOnHive;
public TimeSpan ExecutionTimeOnHive {
get { return executionTimeOnHive; }
set {
if (value != executionTimeOnHive) {
executionTimeOnHive = value;
OnExecutionTimeOnHiveChanged();
}
}
}
[Storable]
private bool isPrivileged;
public bool IsPrivileged {
get { return isPrivileged; }
set { isPrivileged = value; }
}
// Task can't be storable, so RefreshableHiveExperiment can't be stored. But as previous runs are only informative it does not matter (only execution time on hive will be wrong because of that -> Todo)
private ItemCollection jobs = new ItemCollection();
public ItemCollection Jobs {
get { return jobs; }
set { jobs = value; }
}
private List onlinePlugins;
public List OnlinePlugins {
get { return onlinePlugins; }
set { onlinePlugins = value; }
}
private List alreadyUploadedPlugins;
public List AlreadyUploadedPlugins {
get { return alreadyUploadedPlugins; }
set { alreadyUploadedPlugins = value; }
}
public bool IsAllowedPrivileged { get; set; }
#region constructors and cloning
public HiveEngine() {
this.ResourceNames = "HEAL";
this.Priority = 0;
this.log = new ThreadSafeLog();
this.IsAllowedPrivileged = HiveServiceLocator.Instance.CallHiveService((s) => s.IsAllowedPrivileged());
}
[StorableConstructor]
protected HiveEngine(bool deserializing) : base(deserializing) { }
protected HiveEngine(HiveEngine original, Cloner cloner)
: base(original, cloner) {
this.ResourceNames = original.ResourceNames;
this.currentOperator = cloner.Clone(original.currentOperator);
this.priority = original.priority;
this.executionTimeOnHive = original.executionTimeOnHive;
this.IsPrivileged = original.IsPrivileged;
// do not clone jobs - otherwise they would be sent with every task
}
public override IDeepCloneable Clone(Cloner cloner) {
return new HiveEngine(this, cloner);
}
#endregion
#region Events
protected override void OnPrepared() {
base.OnPrepared();
this.ExecutionTimeOnHive = TimeSpan.Zero;
}
public event EventHandler ExecutionTimeOnHiveChanged;
protected virtual void OnExecutionTimeOnHiveChanged() {
var handler = ExecutionTimeOnHiveChanged;
if (handler != null) handler(this, EventArgs.Empty);
}
#endregion
protected override void Run(CancellationToken cancellationToken) {
this.cancellationToken = cancellationToken;
Run(ExecutionStack);
}
private void Run(object state) {
Stack executionStack = (Stack)state;
IOperation next;
OperationCollection coll;
IAtomicOperation operation;
if (firstRun) {
TaskScheduler.UnobservedTaskException += new EventHandler(TaskScheduler_UnobservedTaskException);
this.OnlinePlugins = HiveServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.Hash != null).ToList();
this.AlreadyUploadedPlugins = new List();
firstRun = false;
}
while (executionStack.Count > 0) {
cancellationToken.ThrowIfCancellationRequested();
next = executionStack.Pop();
if (next is OperationCollection) {
coll = (OperationCollection)next;
if (coll.Parallel) {
try {
// clone the parent scope here and reuse it for each operation. otherwise for each task the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
parentScopeClone.SubScopes.Clear();
parentScopeClone.ClearParentScopes();
EngineTask[] jobs = new EngineTask[coll.Count];
for (int i = 0; i < coll.Count; i++) {
jobs[i] = new EngineTask(coll[i], new SequentialEngine.SequentialEngine());
}
var experiment = CreateJob();
IScope[] scopes = ExecuteOnHive(experiment, jobs, parentScopeClone, cancellationToken);
for (int i = 0; i < coll.Count; i++) {
if (coll[i] is IAtomicOperation) {
ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
} else if (coll[i] is OperationCollection) {
// todo ??
}
}
}
catch {
executionStack.Push(coll); throw;
}
} else {
for (int i = coll.Count - 1; i >= 0; i--)
if (coll[i] != null) executionStack.Push(coll[i]);
}
} else if (next is IAtomicOperation) {
operation = (IAtomicOperation)next;
try {
next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
}
catch (Exception ex) {
executionStack.Push(operation);
if (ex is OperationCanceledException) throw ex;
else throw new OperatorExecutionException(operation.Operator, ex);
}
if (next != null) executionStack.Push(next);
if (operation.Operator.Breakpoint) {
log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
Pause();
}
}
}
}
private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
e.SetObserved(); // avoid crash of process
}
private IRandom FindRandomParameter(IExecutionContext ec) {
try {
if (ec == null)
return null;
foreach (var p in ec.Parameters) {
if (p.Name == "Random" && p is IValueParameter)
return ((IValueParameter)p).Value as IRandom;
}
return FindRandomParameter(ec.Parent);
}
catch { return null; }
}
private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
ExchangeScope(source.Scope, target.Scope);
}
private static void ExchangeScope(IScope source, IScope target) {
target.Variables.Clear();
target.Variables.AddRange(source.Variables);
target.SubScopes.Clear();
target.SubScopes.AddRange(source.SubScopes);
// TODO: validate if parent scopes match - otherwise source is invalid
}
///
/// This method blocks until all jobs are finished
/// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
///
///
private IScope[] ExecuteOnHive(RefreshableJob refreshableJob, EngineTask[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
log.LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
IScope[] scopes = new Scope[jobs.Length];
object locker = new object();
var hiveExperiment = refreshableJob.Job;
try {
// create upload-tasks
for (int i = 0; i < jobs.Length; i++) {
var engineHiveJob = new EngineHiveTask(jobs[i], parentScopeClone);
engineHiveJob.Task.Priority = this.Priority;
refreshableJob.HiveTasks.Add(engineHiveJob);
// shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext);
if (random != null)
random.Reset(random.Next());
}
HiveClient.StartJob((e) => { log.LogException(e); }, refreshableJob, cancellationToken);
// do polling until experiment is finished and all jobs are downloaded
while (!refreshableJob.AllJobsFinished()) {
Thread.Sleep(2000);
this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(jobs.Sum(x => x.ExecutionTime.TotalMilliseconds));
cancellationToken.ThrowIfCancellationRequested();
}
log.LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableJob.ToString(), refreshableJob.ExecutionTime));
var failedJobs = refreshableJob.HiveTasks.Where(x => x.Task.State == TaskState.Failed);
if (failedJobs.Count() > 0) {
throw new HiveEngineException("Task failed: " + failedJobs.First().Task.StateLog.Last().Exception);
}
// get scopes
int j = 0;
foreach (var hiveJob in refreshableJob.HiveTasks) {
var scope = ((IAtomicOperation)((EngineTask)hiveJob.ItemTask).InitialOperation).Scope;
scopes[j++] = scope;
}
return scopes;
}
catch (OperationCanceledException e) {
throw e;
}
catch (Exception e) {
log.LogException(e);
throw e;
} finally {
DisposeJob(refreshableJob);
}
}
private RefreshableJob CreateJob() {
lock (locker) {
var hiveExperiment = new Job();
hiveExperiment.Name = "HiveEngine Run " + jobs.Count;
hiveExperiment.DateCreated = DateTime.Now;
hiveExperiment.ResourceNames = this.ResourceNames;
hiveExperiment.IsPrivileged = this.IsPrivileged;
var refreshableHiveExperiment = new RefreshableJob(hiveExperiment);
refreshableHiveExperiment.IsDownloadable = false; // download happens automatically so disable button
jobs.Add(refreshableHiveExperiment);
return refreshableHiveExperiment;
}
}
private void DisposeJob(RefreshableJob refreshableJob) {
refreshableJob.RefreshAutomatically = false;
DeleteHiveExperiment(refreshableJob.Job.Id);
ClearData(refreshableJob);
}
private void ClearData(RefreshableJob refreshableJob) {
var jobs = refreshableJob.GetAllHiveTasks();
foreach (var job in jobs) {
job.ClearData();
}
}
private void DeleteHiveExperiment(Guid jobId) {
HiveClient.TryAndRepeat(() => {
HiveServiceLocator.Instance.CallHiveService(s => s.DeleteJob(jobId));
}, 5, string.Format("Could not delete jobs"));
}
private List GetResourceIds() {
return HiveServiceLocator.Instance.CallHiveService(service => {
var resourceNames = ResourceNames.Split(';');
var resourceIds = new List();
foreach (var resourceName in resourceNames) {
Guid resourceId = service.GetResourceId(resourceName);
if (resourceId == Guid.Empty) {
throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
}
resourceIds.Add(resourceId);
}
return resourceIds;
});
}
}
}