#region License Information
/* HeuristicLab
* Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using HeuristicLab.Clients.Hive;
using HeuristicLab.Common;
using HeuristicLab.Core;
using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
namespace HeuristicLab.HiveEngine {
///
/// Represents an engine that executes operations which can be executed in parallel on the hive
///
[StorableClass]
[Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
public class HiveEngine : Engine {
private static object logLocker = new object();
private CancellationToken cancellationToken;
[Storable]
private IOperator currentOperator;
[Storable]
public string ResourceNames { get; set; }
[Storable]
private int priority;
public int Priority {
get { return priority; }
set { priority = value; }
}
[Storable]
private TimeSpan executionTimeOnHive;
public TimeSpan ExecutionTimeOnHive {
get { return executionTimeOnHive; }
set {
if (value != executionTimeOnHive) {
executionTimeOnHive = value;
OnExecutionTimeOnHiveChanged();
}
}
}
[Storable]
private bool useLocalPlugins;
public bool UseLocalPlugins {
get { return useLocalPlugins; }
set { useLocalPlugins = value; }
}
// [Storable] -> HiveExperiment can't be storable, so RefreshableHiveExperiment can't be stored
private ItemCollection hiveExperiments = new ItemCollection();
public ItemCollection HiveExperiments {
get { return hiveExperiments; }
set { hiveExperiments = value; }
}
private List onlinePlugins;
public List OnlinePlugins {
get { return onlinePlugins; }
set { onlinePlugins = value; }
}
private List alreadyUploadedPlugins;
public List AlreadyUploadedPlugins {
get { return alreadyUploadedPlugins; }
set { alreadyUploadedPlugins = value; }
}
#region constructors and cloning
public HiveEngine() {
ResourceNames = "HEAL";
Priority = 0;
}
[StorableConstructor]
protected HiveEngine(bool deserializing) : base(deserializing) { }
protected HiveEngine(HiveEngine original, Cloner cloner)
: base(original, cloner) {
this.ResourceNames = original.ResourceNames;
this.currentOperator = cloner.Clone(original.currentOperator);
this.priority = original.priority;
this.executionTimeOnHive = original.executionTimeOnHive;
this.useLocalPlugins = original.useLocalPlugins;
// this.hiveExperiments = cloner.Clone(original.hiveExperiments); do not clone hiveExperiments - otherwise they would be sent with every job
}
public override IDeepCloneable Clone(Cloner cloner) {
return new HiveEngine(this, cloner);
}
#endregion
#region Events
protected override void OnPrepared() {
base.OnPrepared();
this.ExecutionTimeOnHive = TimeSpan.Zero;
}
public event EventHandler ExecutionTimeOnHiveChanged;
protected virtual void OnExecutionTimeOnHiveChanged() {
var handler = ExecutionTimeOnHiveChanged;
if (handler != null) handler(this, EventArgs.Empty);
}
#endregion
protected override void Run(CancellationToken cancellationToken) {
this.cancellationToken = cancellationToken;
Run(ExecutionStack);
}
private void Run(object state) {
Stack executionStack = (Stack)state;
IOperation next;
OperationCollection coll;
IAtomicOperation operation;
TaskScheduler.UnobservedTaskException += new EventHandler(TaskScheduler_UnobservedTaskException);
this.OnlinePlugins = ServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.IsLocal == false).ToList();
this.AlreadyUploadedPlugins = new List();
while (ExecutionStack.Count > 0) {
cancellationToken.ThrowIfCancellationRequested();
next = ExecutionStack.Pop();
if (next is OperationCollection) {
coll = (OperationCollection)next;
if (coll.Parallel) {
// clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
parentScopeClone.SubScopes.Clear();
parentScopeClone.ClearParentScopes();
EngineJob[] jobs = new EngineJob[coll.Count];
for (int i = 0; i < coll.Count; i++) {
jobs[i] = new EngineJob(coll[i], new SequentialEngine.SequentialEngine());
}
IScope[] scopes = ExecuteOnHive(jobs, parentScopeClone, cancellationToken);
//IScope[] scopes = ExecuteLocally(jobs, parentScopeClone, cancellationToken);
for (int i = 0; i < coll.Count; i++) {
if (coll[i] is IAtomicOperation) {
ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
} else if (coll[i] is OperationCollection) {
// todo ??
}
}
} else {
for (int i = coll.Count - 1; i >= 0; i--)
if (coll[i] != null) executionStack.Push(coll[i]);
}
} else if (next is IAtomicOperation) {
operation = (IAtomicOperation)next;
try {
next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
}
catch (Exception ex) {
ExecutionStack.Push(operation);
if (ex is OperationCanceledException) throw ex;
else throw new OperatorExecutionException(operation.Operator, ex);
}
if (next != null) ExecutionStack.Push(next);
if (operation.Operator.Breakpoint) {
LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
Pause();
}
}
}
}
private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
e.SetObserved(); // avoid crash of process
}
private IRandom FindRandomParameter(IExecutionContext ec) {
try {
if (ec == null)
return null;
foreach (var p in ec.Parameters) {
if (p.Name == "Random" && p is IValueParameter)
return ((IValueParameter)p).Value as IRandom;
}
return FindRandomParameter(ec.Parent);
}
catch { return null; }
}
private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
ExchangeScope(source.Scope, target.Scope);
}
private static void ExchangeScope(IScope source, IScope target) {
target.Variables.Clear();
target.Variables.AddRange(source.Variables);
target.SubScopes.Clear();
target.SubScopes.AddRange(source.SubScopes);
// TODO: validate if parent scopes match - otherwise source is invalid
}
///
/// This method blocks until all jobs are finished
/// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
///
///
private IScope[] ExecuteOnHive(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
IScope[] scopes = new Scope[jobs.Length];
object locker = new object();
IDictionary jobIndices = new Dictionary();
var hiveExperiment = new HiveExperiment();
try {
List remainingJobIds = new List();
// create hive experiment
hiveExperiment.Name = "HiveEngine Run " + hiveExperiments.Count;
hiveExperiment.DateCreated = DateTime.Now;
hiveExperiment.UseLocalPlugins = this.UseLocalPlugins;
hiveExperiment.ResourceNames = this.ResourceNames;
var refreshableHiveExperiment = new RefreshableHiveExperiment(hiveExperiment);
refreshableHiveExperiment.IsControllable = false;
hiveExperiments.Add(refreshableHiveExperiment);
// create upload-tasks
var uploadTasks = new List>();
for (int i = 0; i < jobs.Length; i++) {
hiveExperiment.HiveJobs.Add(new EngineHiveJob(jobs[i], parentScopeClone));
// shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext);
if (random != null)
random.Reset(random.Next());
}
ExperimentManagerClient.StartExperiment((e) => {
LogException(e);
}, refreshableHiveExperiment);
// do polling until experiment is finished and all jobs are downloaded
while (!refreshableHiveExperiment.AllJobsFinished()) {
Thread.Sleep(500);
this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(hiveExperiments.Sum(x => x.HiveExperiment.ExecutionTime.TotalMilliseconds));
cancellationToken.ThrowIfCancellationRequested();
}
LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableHiveExperiment.ToString(), refreshableHiveExperiment.HiveExperiment.ExecutionTime));
// get scopes
int j = 0;
foreach (var hiveJob in hiveExperiment.HiveJobs) {
var scope = ((IAtomicOperation) ((EngineJob)hiveJob.ItemJob).InitialOperation).Scope;
scopes[j++] = scope;
}
refreshableHiveExperiment.RefreshAutomatically = false;
DeleteHiveExperiment(hiveExperiment.Id);
ClearData(refreshableHiveExperiment);
return scopes;
}
catch (OperationCanceledException e) {
lock (locker) {
if (jobIndices != null) DeleteHiveExperiment(hiveExperiment.Id);
}
throw e;
}
catch (Exception e) {
lock (locker) {
if (jobIndices != null) DeleteHiveExperiment(hiveExperiment.Id);
}
LogException(e);
throw e;
}
}
private void ClearData(RefreshableHiveExperiment refreshableHiveExperiment) {
var jobs = refreshableHiveExperiment.HiveExperiment.GetAllHiveJobs();
foreach (var job in jobs) {
job.ClearData();
}
}
private void DeleteHiveExperiment(Guid hiveExperimentId) {
ExperimentManagerClient.TryAndRepeat(() => {
ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(hiveExperimentId));
}, 5, string.Format("Could not delete jobs"));
}
private List GetResourceIds() {
return ServiceLocator.Instance.CallHiveService(service => {
var resourceNames = ResourceNames.Split(';');
var resourceIds = new List();
foreach (var resourceName in resourceNames) {
Guid resourceId = service.GetResourceId(resourceName);
if (resourceId == Guid.Empty) {
throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
}
resourceIds.Add(resourceId);
}
return resourceIds;
});
}
///
/// Threadsafe message logging
///
private void LogMessage(string message) {
lock (logLocker) {
Log.LogMessage(message);
}
}
///
/// Threadsafe exception logging
///
private void LogException(Exception exception) {
lock (logLocker) {
Log.LogException(exception);
}
}
// testfunction:
//private IScope[] ExecuteLocally(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
// IScope[] scopes = new Scope[jobs.Length];
// for (int i = 0; i < jobs.Length; i++) {
// var serialized = PersistenceUtil.Serialize(jobs[i]);
// var deserialized = PersistenceUtil.Deserialize(serialized);
// deserialized.Start();
// while (deserialized.ExecutionState != ExecutionState.Stopped) {
// Thread.Sleep(100);
// }
// var serialized2 = PersistenceUtil.Serialize(deserialized);
// var deserialized2 = PersistenceUtil.Deserialize(serialized2);
// var newScope = ((IAtomicOperation)deserialized2.InitialOperation).Scope;
// scopes[i] = newScope;
// }
// return scopes;
//}
}
}