using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using HeuristicLab.Clients.Hive; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Persistence.Default.CompositeSerializers.Storable; using HeuristicLab.PluginInfrastructure; namespace HeuristicLab.HiveEngine { /// /// Represents an engine that executes operations which can be executed in parallel on the hive /// [StorableClass] [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")] public class HiveEngine : Engine { private Semaphore maxConcurrentConnections = new Semaphore(4, 4); // avoid too many connections private Semaphore maxSerializedJobsInMemory = new Semaphore(4, 4); // avoid memory problems private CancellationToken cancellationToken; [Storable] private IOperator currentOperator; [Storable] public string ResourceIds { get; set; } [Storable] private int priority; public int Priority { get { return priority; } set { priority = value; } } [Storable] private TimeSpan executionTimeOnHive; public TimeSpan ExecutionTimeOnHive { get { return executionTimeOnHive; } set { if (value != executionTimeOnHive) { executionTimeOnHive = value; OnExecutionTimeOnHiveChanged(); } } } private List onlinePlugins; public List OnlinePlugins { get { return onlinePlugins; } set { onlinePlugins = value; } } private List alreadyUploadedPlugins; public List AlreadyUploadedPlugins { get { return alreadyUploadedPlugins; } set { alreadyUploadedPlugins = value; } } #region constructors and cloning public HiveEngine() { ResourceIds = "HEAL"; } [StorableConstructor] protected HiveEngine(bool deserializing) : base(deserializing) { } protected HiveEngine(HiveEngine original, Cloner cloner) : base(original, cloner) { this.ResourceIds = original.ResourceIds; this.currentOperator = cloner.Clone(original.currentOperator); this.priority = original.priority; this.executionTimeOnHive = original.executionTimeOnHive; } public override IDeepCloneable Clone(Cloner cloner) { return new HiveEngine(this, cloner); } #endregion #region Events protected override void OnPrepared() { base.OnPrepared(); this.ExecutionTimeOnHive = TimeSpan.Zero; } public event EventHandler ExecutionTimeOnHiveChanged; protected virtual void OnExecutionTimeOnHiveChanged() { var handler = ExecutionTimeOnHiveChanged; if (handler != null) handler(this, EventArgs.Empty); } #endregion protected override void Run(CancellationToken cancellationToken) { this.cancellationToken = cancellationToken; Run(ExecutionStack); } private void Run(object state) { Stack executionStack = (Stack)state; IOperation next; OperationCollection coll; IAtomicOperation operation; TaskScheduler.UnobservedTaskException += new EventHandler(TaskScheduler_UnobservedTaskException); this.OnlinePlugins = ServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.IsLocal == false).ToList(); this.AlreadyUploadedPlugins = new List(); while (ExecutionStack.Count > 0) { cancellationToken.ThrowIfCancellationRequested(); next = ExecutionStack.Pop(); if (next is OperationCollection) { coll = (OperationCollection)next; if (coll.Parallel) { // clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone(); parentScopeClone.SubScopes.Clear(); parentScopeClone.ClearParentScopes(); EngineJob[] jobs = new EngineJob[coll.Count]; for (int i = 0; i < coll.Count; i++) { jobs[i] = new EngineJob(coll[i], new SequentialEngine.SequentialEngine()); } IScope[] scopes = ExecuteOnHive(jobs, parentScopeClone, cancellationToken); //IScope[] scopes = ExecuteLocally(jobs, parentScopeClone, cancellationToken); for (int i = 0; i < coll.Count; i++) { if (coll[i] is IAtomicOperation) { ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope); } else if (coll[i] is OperationCollection) { // todo ?? } } } else { for (int i = coll.Count - 1; i >= 0; i--) if (coll[i] != null) executionStack.Push(coll[i]); } } else if (next is IAtomicOperation) { operation = (IAtomicOperation)next; try { next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken); } catch (Exception ex) { ExecutionStack.Push(operation); if (ex is OperationCanceledException) throw ex; else throw new OperatorExecutionException(operation.Operator, ex); } if (next != null) ExecutionStack.Push(next); if (operation.Operator.Breakpoint) { LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName)); Pause(); } } } } private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) { e.SetObserved(); // avoid crash of process } private IRandom FindRandomParameter(IExecutionContext ec) { try { if (ec == null) return null; foreach (var p in ec.Parameters) { if (p.Name == "Random" && p is IValueParameter) return ((IValueParameter)p).Value as IRandom; } return FindRandomParameter(ec.Parent); } catch { return null; } } private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) { ExchangeScope(source.Scope, target.Scope); } private static void ExchangeScope(IScope source, IScope target) { target.Variables.Clear(); target.Variables.AddRange(source.Variables); target.SubScopes.Clear(); target.SubScopes.AddRange(source.SubScopes); // TODO: validate if parent scopes match - otherwise source is invalid } private IScope[] ExecuteLocally(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) { IScope[] scopes = new Scope[jobs.Length]; for (int i = 0; i < jobs.Length; i++) { var job = (EngineJob)jobs[i].Clone(); job.Start(); while (job.ExecutionState != ExecutionState.Stopped) { Thread.Sleep(100); } scopes[i] = ((IAtomicOperation)job.InitialOperation).Scope; } return scopes; } /// /// This method blocks until all jobs are finished /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation /// /// private IScope[] ExecuteOnHive(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) { LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length)); IScope[] scopes = new Scope[jobs.Length]; object locker = new object(); IDictionary jobIndices = new Dictionary(); try { List remainingJobIds = new List(); List lightweightJobs; int finishedCount = 0; int uploadCount = 0; // create upload-tasks var uploadTasks = new List>(); for (int i = 0; i < jobs.Length; i++) { var job = jobs[i]; // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable) IRandom random = FindRandomParameter(job.InitialOperation as IExecutionContext); if (random != null) random.Reset(random.Next()); uploadTasks.Add(Task.Factory.StartNew((keyValuePairObj) => { return UploadJob(keyValuePairObj, parentScopeClone, cancellationToken, GetResourceIds()); }, new KeyValuePair(i, job), cancellationToken)); } Task processUploadedJobsTask = new Task(() => { // process finished upload-tasks int uploadTasksCount = uploadTasks.Count; for (int i = 0; i < uploadTasksCount; i++) { cancellationToken.ThrowIfCancellationRequested(); var uploadTasksArray = uploadTasks.ToArray(); var task = uploadTasksArray[Task.WaitAny(uploadTasksArray)]; if (task.Status == TaskStatus.Faulted) { LogException(task.Exception); throw task.Exception; } int key = ((KeyValuePair)task.AsyncState).Key; Job job = task.Result; lock (locker) { uploadCount++; jobIndices.Add(job.Id, key); remainingJobIds.Add(job.Id); } jobs[key] = null; // relax memory LogMessage(string.Format("Uploaded job #{0}", key + 1, job.Id)); uploadTasks.Remove(task); } }, cancellationToken, TaskCreationOptions.PreferFairness); processUploadedJobsTask.Start(); // poll job-statuses and create tasks for those which are finished var downloadTasks = new List>(); var executionTimes = new List(); var executionTimeOnHiveBefore = executionTimeOnHive; while (processUploadedJobsTask.Status != TaskStatus.RanToCompletion || remainingJobIds.Count > 0) { cancellationToken.ThrowIfCancellationRequested(); Thread.Sleep(10000); try { lightweightJobs = ServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobs(remainingJobIds)); var jobsFinished = lightweightJobs.Where(j => j.State == JobState.Finished || j.State == JobState.Failed || j.State == JobState.Aborted); finishedCount += jobsFinished.Count(); if (jobsFinished.Count() > 0) LogMessage(string.Format("Finished: {0}/{1}", finishedCount, jobs.Length)); ExecutionTimeOnHive = executionTimeOnHiveBefore + executionTimes.Sum() + lightweightJobs.Select(x => x.ExecutionTime.HasValue ? x.ExecutionTime.Value : TimeSpan.Zero).Sum(); foreach (var result in jobsFinished) { if (result.State == JobState.Finished) { downloadTasks.Add(Task.Factory.StartNew((jobIdObj) => { return DownloadJob(jobIndices, jobIdObj, cancellationToken); }, result.Id, cancellationToken)); } else if (result.State == JobState.Aborted) { LogMessage(string.Format("Job #{0} aborted (id: {1})", jobIndices[result.Id] + 1, result.Id)); } else if (result.State == JobState.Failed) { LogMessage(string.Format("Job #{0} failed (id: {1}): {2}", jobIndices[result.Id] + 1, result.Id, result.CurrentStateLog != null ? result.CurrentStateLog.Exception : string.Empty)); } remainingJobIds.Remove(result.Id); executionTimes.Add(result.ExecutionTime.HasValue ? result.ExecutionTime.Value : TimeSpan.Zero); } } catch (Exception e) { LogException(e); } } // process finished download-tasks int downloadTasksCount = downloadTasks.Count; for (int i = 0; i < downloadTasksCount; i++) { cancellationToken.ThrowIfCancellationRequested(); var downloadTasksArray = downloadTasks.ToArray(); var task = downloadTasksArray[Task.WaitAny(downloadTasksArray)]; var jobId = (Guid)task.AsyncState; if (task.Status == TaskStatus.Faulted) { LogException(task.Exception); throw task.Exception; } scopes[jobIndices[(Guid)task.AsyncState]] = ((IAtomicOperation)task.Result.InitialOperation).Scope; downloadTasks.Remove(task); } LogMessage(string.Format("All jobs finished (TotalExecutionTime: {0}).", executionTimes.Sum())); DeleteJobs(jobIndices); return scopes; } catch (OperationCanceledException e) { lock (locker) { if (jobIndices != null) DeleteJobs(jobIndices); } throw e; } catch (Exception e) { lock (locker) { if (jobIndices != null) DeleteJobs(jobIndices); } LogException(e); throw e; } } private void DeleteJobs(IDictionary jobIndices) { if (jobIndices.Count > 0) { TryAndRepeat(() => { LogMessage(string.Format("Deleting {0} jobs on hive.", jobIndices.Count)); ServiceLocator.Instance.CallHiveService(service => { foreach (Guid jobId in jobIndices.Keys) { service.DeleteJob(jobId); } jobIndices.Clear(); }); }, 5, string.Format("Could not delete {0} jobs", jobIndices.Count)); } } private static object locker = new object(); private Job UploadJob(object keyValuePairObj, IScope parentScopeClone, CancellationToken cancellationToken, List resourceIds) { var keyValuePair = (KeyValuePair)keyValuePairObj; Job job = new Job(); try { maxSerializedJobsInMemory.WaitOne(); JobData jobData = new JobData(); IEnumerable usedTypes; // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems lock (locker) { ((IAtomicOperation)keyValuePair.Value.InitialOperation).Scope.Parent = parentScopeClone; keyValuePair.Value.InitialOperation = (IOperation)keyValuePair.Value.InitialOperation.Clone(); if (keyValuePair.Value.InitialOperation is IAtomicOperation) ((IAtomicOperation)keyValuePair.Value.InitialOperation).Scope.ClearParentScopes(); jobData.Data = PersistenceUtil.Serialize(keyValuePair.Value, out usedTypes); } var neededPlugins = new List(); PluginUtil.CollectDeclaringPlugins(neededPlugins, usedTypes); job.CoresNeeded = 1; job.PluginsNeededIds = ServiceLocator.Instance.CallHiveService(s => PluginUtil.GetPluginDependencies(s, this.OnlinePlugins, this.AlreadyUploadedPlugins, neededPlugins, false)); job.Priority = priority; try { maxConcurrentConnections.WaitOne(); while (job.Id == Guid.Empty) { // repeat until success cancellationToken.ThrowIfCancellationRequested(); try { job.Id = ServiceLocator.Instance.CallHiveService(s => s.AddJob(job, jobData, resourceIds)); } catch (Exception e) { LogException(e); LogMessage("Repeating upload"); } } } finally { maxConcurrentConnections.Release(); } } finally { maxSerializedJobsInMemory.Release(); } return job; } private List GetResourceIds() { return ServiceLocator.Instance.CallHiveService(service => { var resourceNames = ResourceIds.Split(';'); var resourceIds = new List(); foreach (var resourceName in resourceNames) { Guid resourceId = service.GetResourceId(resourceName); if (resourceId == Guid.Empty) { throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName)); } resourceIds.Add(resourceId); } return resourceIds; }); } private EngineJob DownloadJob(IDictionary jobIndices, object jobIdObj, CancellationToken cancellationToken) { Guid jobId = (Guid)jobIdObj; JobData jobData = null; EngineJob engineJob = null; try { maxSerializedJobsInMemory.WaitOne(); maxConcurrentConnections.WaitOne(); while (jobData == null) { // repeat until success cancellationToken.ThrowIfCancellationRequested(); try { jobData = ServiceLocator.Instance.CallHiveService(s => s.GetJobData(jobId)); } catch (Exception e) { LogException(e); LogMessage("Repeating download"); } } engineJob = PersistenceUtil.Deserialize(jobData.Data); jobData = null; LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId)); } finally { maxConcurrentConnections.Release(); maxSerializedJobsInMemory.Release(); } return engineJob; } /// /// Threadsafe message logging /// private void LogMessage(string message) { lock (Log) { Log.LogMessage(message); } } /// /// Threadsafe exception logging /// private void LogException(Exception exception) { lock (Log) { Log.LogException(exception); } } /// /// Executes the action. If it throws an exception it is repeated until repetition-count is reached. /// If repetitions is -1, it is repeated infinitely. /// private static void TryAndRepeat(Action action, int repetitions, string errorMessage) { try { action(); } catch (Exception e) { repetitions--; if (repetitions <= 0) throw new HiveEngineException(errorMessage, e); TryAndRepeat(action, repetitions, errorMessage); } } } public static class ScopeExtensions { public static void ClearParentScopes(this IScope scope) { scope.ClearParentScopes(null); } public static void ClearParentScopes(this IScope scope, IScope childScope) { if (childScope != null) { scope.SubScopes.Clear(); scope.SubScopes.Add(childScope); } if (scope.Parent != null) scope.Parent.ClearParentScopes(scope); } } public static class EnumerableExtensions { public static TimeSpan Sum(this IEnumerable times) { return TimeSpan.FromMilliseconds(times.Select(e => e.TotalMilliseconds).Sum()); } } }