using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using HeuristicLab.Clients.Hive;
using HeuristicLab.Common;
using HeuristicLab.Core;
using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
using HeuristicLab.PluginInfrastructure;
namespace HeuristicLab.HiveEngine {
///
/// Represents an engine that executes operations which can be executed in parallel on the hive
///
[StorableClass]
[Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
public class HiveEngine : Engine {
private Semaphore maxConcurrentConnections = new Semaphore(4, 4); // avoid too many connections
private Semaphore maxSerializedJobsInMemory = new Semaphore(4, 4); // avoid memory problems
private CancellationToken cancellationToken;
[Storable]
private IOperator currentOperator;
[Storable]
public string ResourceIds { get; set; }
[Storable]
private int priority;
public int Priority {
get { return priority; }
set { priority = value; }
}
[Storable]
private TimeSpan executionTimeOnHive;
public TimeSpan ExecutionTimeOnHive {
get { return executionTimeOnHive; }
set {
if (value != executionTimeOnHive) {
executionTimeOnHive = value;
OnExecutionTimeOnHiveChanged();
}
}
}
private List onlinePlugins;
public List OnlinePlugins {
get { return onlinePlugins; }
set { onlinePlugins = value; }
}
private List alreadyUploadedPlugins;
public List AlreadyUploadedPlugins {
get { return alreadyUploadedPlugins; }
set { alreadyUploadedPlugins = value; }
}
#region constructors and cloning
public HiveEngine() {
ResourceIds = "HEAL";
}
[StorableConstructor]
protected HiveEngine(bool deserializing) : base(deserializing) { }
protected HiveEngine(HiveEngine original, Cloner cloner)
: base(original, cloner) {
this.ResourceIds = original.ResourceIds;
this.currentOperator = cloner.Clone(original.currentOperator);
this.priority = original.priority;
this.executionTimeOnHive = original.executionTimeOnHive;
}
public override IDeepCloneable Clone(Cloner cloner) {
return new HiveEngine(this, cloner);
}
#endregion
#region Events
protected override void OnPrepared() {
base.OnPrepared();
this.ExecutionTimeOnHive = TimeSpan.Zero;
}
public event EventHandler ExecutionTimeOnHiveChanged;
protected virtual void OnExecutionTimeOnHiveChanged() {
var handler = ExecutionTimeOnHiveChanged;
if (handler != null) handler(this, EventArgs.Empty);
}
#endregion
protected override void Run(CancellationToken cancellationToken) {
this.cancellationToken = cancellationToken;
Run(ExecutionStack);
}
private void Run(object state) {
Stack executionStack = (Stack)state;
IOperation next;
OperationCollection coll;
IAtomicOperation operation;
TaskScheduler.UnobservedTaskException += new EventHandler(TaskScheduler_UnobservedTaskException);
this.OnlinePlugins = ServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.IsLocal == false).ToList();
this.AlreadyUploadedPlugins = new List();
while (ExecutionStack.Count > 0) {
cancellationToken.ThrowIfCancellationRequested();
next = ExecutionStack.Pop();
if (next is OperationCollection) {
coll = (OperationCollection)next;
if (coll.Parallel) {
// clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
parentScopeClone.SubScopes.Clear();
parentScopeClone.ClearParentScopes();
EngineJob[] jobs = new EngineJob[coll.Count];
for (int i = 0; i < coll.Count; i++) {
jobs[i] = new EngineJob(coll[i], new SequentialEngine.SequentialEngine());
}
IScope[] scopes = ExecuteOnHive(jobs, parentScopeClone, cancellationToken);
//IScope[] scopes = ExecuteLocally(jobs, parentScopeClone, cancellationToken);
for (int i = 0; i < coll.Count; i++) {
if (coll[i] is IAtomicOperation) {
ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
} else if (coll[i] is OperationCollection) {
// todo ??
}
}
} else {
for (int i = coll.Count - 1; i >= 0; i--)
if (coll[i] != null) executionStack.Push(coll[i]);
}
} else if (next is IAtomicOperation) {
operation = (IAtomicOperation)next;
try {
next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
}
catch (Exception ex) {
ExecutionStack.Push(operation);
if (ex is OperationCanceledException) throw ex;
else throw new OperatorExecutionException(operation.Operator, ex);
}
if (next != null) ExecutionStack.Push(next);
if (operation.Operator.Breakpoint) {
LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
Pause();
}
}
}
}
private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
e.SetObserved(); // avoid crash of process
}
private IRandom FindRandomParameter(IExecutionContext ec) {
try {
if (ec == null)
return null;
foreach (var p in ec.Parameters) {
if (p.Name == "Random" && p is IValueParameter)
return ((IValueParameter)p).Value as IRandom;
}
return FindRandomParameter(ec.Parent);
}
catch { return null; }
}
private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
ExchangeScope(source.Scope, target.Scope);
}
private static void ExchangeScope(IScope source, IScope target) {
target.Variables.Clear();
target.Variables.AddRange(source.Variables);
target.SubScopes.Clear();
target.SubScopes.AddRange(source.SubScopes);
// TODO: validate if parent scopes match - otherwise source is invalid
}
private IScope[] ExecuteLocally(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
IScope[] scopes = new Scope[jobs.Length];
for (int i = 0; i < jobs.Length; i++) {
var job = (EngineJob)jobs[i].Clone();
job.Start();
while (job.ExecutionState != ExecutionState.Stopped) {
Thread.Sleep(100);
}
scopes[i] = ((IAtomicOperation)job.InitialOperation).Scope;
}
return scopes;
}
///
/// This method blocks until all jobs are finished
/// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
///
///
private IScope[] ExecuteOnHive(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
IScope[] scopes = new Scope[jobs.Length];
object locker = new object();
IDictionary jobIndices = new Dictionary();
try {
List remainingJobIds = new List();
List lightweightJobs;
int finishedCount = 0;
int uploadCount = 0;
// create upload-tasks
var uploadTasks = new List>();
for (int i = 0; i < jobs.Length; i++) {
var job = jobs[i];
// shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
IRandom random = FindRandomParameter(job.InitialOperation as IExecutionContext);
if (random != null)
random.Reset(random.Next());
uploadTasks.Add(Task.Factory.StartNew((keyValuePairObj) => {
return UploadJob(keyValuePairObj, parentScopeClone, cancellationToken, GetResourceIds());
}, new KeyValuePair(i, job), cancellationToken));
}
Task processUploadedJobsTask = new Task(() => {
// process finished upload-tasks
int uploadTasksCount = uploadTasks.Count;
for (int i = 0; i < uploadTasksCount; i++) {
cancellationToken.ThrowIfCancellationRequested();
var uploadTasksArray = uploadTasks.ToArray();
var task = uploadTasksArray[Task.WaitAny(uploadTasksArray)];
if (task.Status == TaskStatus.Faulted) {
LogException(task.Exception);
throw task.Exception;
}
int key = ((KeyValuePair)task.AsyncState).Key;
Job job = task.Result;
lock (locker) {
uploadCount++;
jobIndices.Add(job.Id, key);
remainingJobIds.Add(job.Id);
}
jobs[key] = null; // relax memory
LogMessage(string.Format("Uploaded job #{0}", key + 1, job.Id));
uploadTasks.Remove(task);
}
}, cancellationToken, TaskCreationOptions.PreferFairness);
processUploadedJobsTask.Start();
// poll job-statuses and create tasks for those which are finished
var downloadTasks = new List>();
var executionTimes = new List();
var executionTimeOnHiveBefore = executionTimeOnHive;
while (processUploadedJobsTask.Status != TaskStatus.RanToCompletion || remainingJobIds.Count > 0) {
cancellationToken.ThrowIfCancellationRequested();
Thread.Sleep(10000);
try {
lightweightJobs = ServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobs(remainingJobIds));
var jobsFinished = lightweightJobs.Where(j => j.State == JobState.Finished || j.State == JobState.Failed || j.State == JobState.Aborted);
finishedCount += jobsFinished.Count();
if (jobsFinished.Count() > 0) LogMessage(string.Format("Finished: {0}/{1}", finishedCount, jobs.Length));
ExecutionTimeOnHive = executionTimeOnHiveBefore + executionTimes.Sum() + lightweightJobs.Select(x => x.ExecutionTime.HasValue ? x.ExecutionTime.Value : TimeSpan.Zero).Sum();
foreach (var result in jobsFinished) {
if (result.State == JobState.Finished) {
downloadTasks.Add(Task.Factory.StartNew((jobIdObj) => {
return DownloadJob(jobIndices, jobIdObj, cancellationToken);
}, result.Id, cancellationToken));
} else if (result.State == JobState.Aborted) {
LogMessage(string.Format("Job #{0} aborted (id: {1})", jobIndices[result.Id] + 1, result.Id));
} else if (result.State == JobState.Failed) {
LogMessage(string.Format("Job #{0} failed (id: {1}): {2}", jobIndices[result.Id] + 1, result.Id, result.CurrentStateLog != null ? result.CurrentStateLog.Exception : string.Empty));
}
remainingJobIds.Remove(result.Id);
executionTimes.Add(result.ExecutionTime.HasValue ? result.ExecutionTime.Value : TimeSpan.Zero);
}
}
catch (Exception e) {
LogException(e);
}
}
// process finished download-tasks
int downloadTasksCount = downloadTasks.Count;
for (int i = 0; i < downloadTasksCount; i++) {
cancellationToken.ThrowIfCancellationRequested();
var downloadTasksArray = downloadTasks.ToArray();
var task = downloadTasksArray[Task.WaitAny(downloadTasksArray)];
var jobId = (Guid)task.AsyncState;
if (task.Status == TaskStatus.Faulted) {
LogException(task.Exception);
throw task.Exception;
}
scopes[jobIndices[(Guid)task.AsyncState]] = ((IAtomicOperation)task.Result.InitialOperation).Scope;
downloadTasks.Remove(task);
}
LogMessage(string.Format("All jobs finished (TotalExecutionTime: {0}).", executionTimes.Sum()));
DeleteJobs(jobIndices);
return scopes;
}
catch (OperationCanceledException e) {
lock (locker) {
if (jobIndices != null) DeleteJobs(jobIndices);
}
throw e;
}
catch (Exception e) {
lock (locker) {
if (jobIndices != null) DeleteJobs(jobIndices);
}
LogException(e);
throw e;
}
}
private void DeleteJobs(IDictionary jobIndices) {
if (jobIndices.Count > 0) {
TryAndRepeat(() => {
LogMessage(string.Format("Deleting {0} jobs on hive.", jobIndices.Count));
ServiceLocator.Instance.CallHiveService(service => {
foreach (Guid jobId in jobIndices.Keys) {
service.DeleteJob(jobId);
}
jobIndices.Clear();
});
}, 5, string.Format("Could not delete {0} jobs", jobIndices.Count));
}
}
private static object locker = new object();
private Job UploadJob(object keyValuePairObj, IScope parentScopeClone, CancellationToken cancellationToken, List resourceIds) {
var keyValuePair = (KeyValuePair)keyValuePairObj;
Job job = new Job();
try {
maxSerializedJobsInMemory.WaitOne();
JobData jobData = new JobData();
IEnumerable usedTypes;
// clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems
lock (locker) {
((IAtomicOperation)keyValuePair.Value.InitialOperation).Scope.Parent = parentScopeClone;
keyValuePair.Value.InitialOperation = (IOperation)keyValuePair.Value.InitialOperation.Clone();
if (keyValuePair.Value.InitialOperation is IAtomicOperation)
((IAtomicOperation)keyValuePair.Value.InitialOperation).Scope.ClearParentScopes();
jobData.Data = PersistenceUtil.Serialize(keyValuePair.Value, out usedTypes);
}
var neededPlugins = new List();
PluginUtil.CollectDeclaringPlugins(neededPlugins, usedTypes);
job.CoresNeeded = 1;
job.PluginsNeededIds = ServiceLocator.Instance.CallHiveService(s => PluginUtil.GetPluginDependencies(s, this.OnlinePlugins, this.AlreadyUploadedPlugins, neededPlugins, false));
job.Priority = priority;
try {
maxConcurrentConnections.WaitOne();
while (job.Id == Guid.Empty) { // repeat until success
cancellationToken.ThrowIfCancellationRequested();
try {
job.Id = ServiceLocator.Instance.CallHiveService(s => s.AddJob(job, jobData, resourceIds));
}
catch (Exception e) {
LogException(e);
LogMessage("Repeating upload");
}
}
}
finally {
maxConcurrentConnections.Release();
}
}
finally {
maxSerializedJobsInMemory.Release();
}
return job;
}
private List GetResourceIds() {
return ServiceLocator.Instance.CallHiveService(service => {
var resourceNames = ResourceIds.Split(';');
var resourceIds = new List();
foreach (var resourceName in resourceNames) {
Guid resourceId = service.GetResourceId(resourceName);
if (resourceId == Guid.Empty) {
throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
}
resourceIds.Add(resourceId);
}
return resourceIds;
});
}
private EngineJob DownloadJob(IDictionary jobIndices, object jobIdObj, CancellationToken cancellationToken) {
Guid jobId = (Guid)jobIdObj;
JobData jobData = null;
EngineJob engineJob = null;
try {
maxSerializedJobsInMemory.WaitOne();
maxConcurrentConnections.WaitOne();
while (jobData == null) { // repeat until success
cancellationToken.ThrowIfCancellationRequested();
try {
jobData = ServiceLocator.Instance.CallHiveService(s => s.GetJobData(jobId));
}
catch (Exception e) {
LogException(e);
LogMessage("Repeating download");
}
}
engineJob = PersistenceUtil.Deserialize(jobData.Data);
jobData = null;
LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId));
}
finally {
maxConcurrentConnections.Release();
maxSerializedJobsInMemory.Release();
}
return engineJob;
}
///
/// Threadsafe message logging
///
private void LogMessage(string message) {
lock (Log) {
Log.LogMessage(message);
}
}
///
/// Threadsafe exception logging
///
private void LogException(Exception exception) {
lock (Log) {
Log.LogException(exception);
}
}
///
/// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
/// If repetitions is -1, it is repeated infinitely.
///
private static void TryAndRepeat(Action action, int repetitions, string errorMessage) {
try { action(); }
catch (Exception e) {
repetitions--;
if (repetitions <= 0)
throw new HiveEngineException(errorMessage, e);
TryAndRepeat(action, repetitions, errorMessage);
}
}
}
public static class ScopeExtensions {
public static void ClearParentScopes(this IScope scope) {
scope.ClearParentScopes(null);
}
public static void ClearParentScopes(this IScope scope, IScope childScope) {
if (childScope != null) {
scope.SubScopes.Clear();
scope.SubScopes.Add(childScope);
}
if (scope.Parent != null)
scope.Parent.ClearParentScopes(scope);
}
}
public static class EnumerableExtensions {
public static TimeSpan Sum(this IEnumerable times) {
return TimeSpan.FromMilliseconds(times.Select(e => e.TotalMilliseconds).Sum());
}
}
}