#region License Information /* HeuristicLab * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using HeuristicLab.Clients.Hive; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Persistence.Default.CompositeSerializers.Storable; namespace HeuristicLab.HiveEngine { /// /// Represents an engine that executes operations which can be executed in parallel on the hive /// [StorableClass] [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")] public class HiveEngine : Engine { private static object locker = new object(); private static object logLocker = new object(); private CancellationToken cancellationToken; private bool firstRun = true; [Storable] private IOperator currentOperator; [Storable] public string ResourceNames { get; set; } [Storable] private int priority; public int Priority { get { return priority; } set { priority = value; } } [Storable] private TimeSpan executionTimeOnHive; public TimeSpan ExecutionTimeOnHive { get { return executionTimeOnHive; } set { if (value != executionTimeOnHive) { executionTimeOnHive = value; OnExecutionTimeOnHiveChanged(); } } } [Storable] private bool isPrivileged; public bool IsPrivileged { get { return isPrivileged; } set { isPrivileged = value; } } // [Storable] -> HiveExperiment can't be storable, so RefreshableHiveExperiment can't be stored private ItemCollection hiveExperiments = new ItemCollection(); public ItemCollection HiveExperiments { get { return hiveExperiments; } set { hiveExperiments = value; } } private List onlinePlugins; public List OnlinePlugins { get { return onlinePlugins; } set { onlinePlugins = value; } } private List alreadyUploadedPlugins; public List AlreadyUploadedPlugins { get { return alreadyUploadedPlugins; } set { alreadyUploadedPlugins = value; } } #region constructors and cloning public HiveEngine() { ResourceNames = "HEAL"; Priority = 0; } [StorableConstructor] protected HiveEngine(bool deserializing) : base(deserializing) { } protected HiveEngine(HiveEngine original, Cloner cloner) : base(original, cloner) { this.ResourceNames = original.ResourceNames; this.currentOperator = cloner.Clone(original.currentOperator); this.priority = original.priority; this.executionTimeOnHive = original.executionTimeOnHive; this.IsPrivileged = original.IsPrivileged; // this.hiveExperiments = cloner.Clone(original.hiveExperiments); do not clone hiveExperiments - otherwise they would be sent with every job } public override IDeepCloneable Clone(Cloner cloner) { return new HiveEngine(this, cloner); } #endregion #region Events protected override void OnPrepared() { base.OnPrepared(); this.ExecutionTimeOnHive = TimeSpan.Zero; } public event EventHandler ExecutionTimeOnHiveChanged; protected virtual void OnExecutionTimeOnHiveChanged() { var handler = ExecutionTimeOnHiveChanged; if (handler != null) handler(this, EventArgs.Empty); } #endregion protected override void Run(CancellationToken cancellationToken) { this.cancellationToken = cancellationToken; Run(ExecutionStack); } private void Run(object state) { Stack executionStack = (Stack)state; IOperation next; OperationCollection coll; IAtomicOperation operation; if (firstRun) { TaskScheduler.UnobservedTaskException += new EventHandler(TaskScheduler_UnobservedTaskException); this.OnlinePlugins = ServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.Hash != null).ToList(); this.AlreadyUploadedPlugins = new List(); firstRun = false; } while (executionStack.Count > 0) { cancellationToken.ThrowIfCancellationRequested(); next = executionStack.Pop(); //bool isOpCollection = next is OperationCollection; //int collCount = isOpCollection ? ((OperationCollection)next).Count : 0; //string opName = !isOpCollection ? ((IAtomicOperation)next).Operator.Name : "OpCollection"; if (next is OperationCollection) { coll = (OperationCollection)next; //bool isPMOEvaluator = coll.Count > 0 && coll.First() is HeuristicLab.Core.ExecutionContext && ((HeuristicLab.Core.ExecutionContext)coll.First()).Operator.GetType().Name == "PMOEvaluator"; //bool isAlgorithmEvaluator = coll.Count > 0 && coll.First() is HeuristicLab.Core.ExecutionContext && ((HeuristicLab.Core.ExecutionContext)coll.First()).Operator.GetType().Name == "AlgorithmEvaluator"; //if (coll.Parallel && isPMOEvaluator) { // Task[] tasks = new Task[coll.Count]; // Stack[] stacks = new Stack[coll.Count]; // for (int i = 0; i < coll.Count; i++) { // stacks[i] = new Stack(); // stacks[i].Push(coll[i]); // tasks[i] = Task.Factory.StartNew(Run, stacks[i], cancellationToken); // } // try { // Task.WaitAll(tasks); // } // catch (AggregateException ex) { // OperationCollection remaining = new OperationCollection() { Parallel = true }; // for (int i = 0; i < stacks.Length; i++) { // if (stacks[i].Count == 1) // remaining.Add(stacks[i].Pop()); // if (stacks[i].Count > 1) { // OperationCollection ops = new OperationCollection(); // while (stacks[i].Count > 0) // ops.Add(stacks[i].Pop()); // remaining.Add(ops); // } // } // if (remaining.Count > 0) executionStack.Push(remaining); // throw ex; // } //} else if (coll.Parallel) { if (coll.Parallel) { try { // clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone(); parentScopeClone.SubScopes.Clear(); parentScopeClone.ClearParentScopes(); EngineJob[] jobs = new EngineJob[coll.Count]; for (int i = 0; i < coll.Count; i++) { jobs[i] = new EngineJob(coll[i], new SequentialEngine.SequentialEngine()); } var experiment = CreateHiveExperiment(); IScope[] scopes = ExecuteOnHive(experiment, jobs, parentScopeClone, cancellationToken); for (int i = 0; i < coll.Count; i++) { if (coll[i] is IAtomicOperation) { ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope); } else if (coll[i] is OperationCollection) { // todo ?? } } } catch { executionStack.Push(coll); throw; } } else { for (int i = coll.Count - 1; i >= 0; i--) if (coll[i] != null) executionStack.Push(coll[i]); } } else if (next is IAtomicOperation) { operation = (IAtomicOperation)next; try { next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken); } catch (Exception ex) { executionStack.Push(operation); if (ex is OperationCanceledException) throw ex; else throw new OperatorExecutionException(operation.Operator, ex); } if (next != null) executionStack.Push(next); if (operation.Operator.Breakpoint) { LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName)); Pause(); } } } } private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) { e.SetObserved(); // avoid crash of process } private IRandom FindRandomParameter(IExecutionContext ec) { try { if (ec == null) return null; foreach (var p in ec.Parameters) { if (p.Name == "Random" && p is IValueParameter) return ((IValueParameter)p).Value as IRandom; } return FindRandomParameter(ec.Parent); } catch { return null; } } private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) { ExchangeScope(source.Scope, target.Scope); } private static void ExchangeScope(IScope source, IScope target) { target.Variables.Clear(); target.Variables.AddRange(source.Variables); target.SubScopes.Clear(); target.SubScopes.AddRange(source.SubScopes); // TODO: validate if parent scopes match - otherwise source is invalid } /// /// This method blocks until all jobs are finished /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation /// /// private IScope[] ExecuteOnHive(RefreshableHiveExperiment refreshableHiveExperiment, EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) { LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length)); IScope[] scopes = new Scope[jobs.Length]; object locker = new object(); var hiveExperiment = refreshableHiveExperiment.HiveExperiment; try { // create upload-tasks for (int i = 0; i < jobs.Length; i++) { var engineHiveJob = new EngineHiveJob(jobs[i], parentScopeClone); engineHiveJob.Job.Priority = this.Priority; hiveExperiment.HiveJobs.Add(engineHiveJob); // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable) IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext); if (random != null) random.Reset(random.Next()); } HiveClient.StartExperiment((e) => { LogException(e); }, refreshableHiveExperiment); // do polling until experiment is finished and all jobs are downloaded while (!refreshableHiveExperiment.AllJobsFinished()) { Thread.Sleep(2000); this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(hiveExperiments.Sum(x => x.HiveExperiment.ExecutionTime.TotalMilliseconds)); cancellationToken.ThrowIfCancellationRequested(); } LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableHiveExperiment.ToString(), refreshableHiveExperiment.HiveExperiment.ExecutionTime)); var failedJobs = hiveExperiment.HiveJobs.Where(x => x.Job.State == JobState.Failed); if (failedJobs.Count() > 0) { throw new HiveEngineException("Job failed: " + failedJobs.First().Job.StateLog.Last().Exception); } // get scopes int j = 0; foreach (var hiveJob in hiveExperiment.HiveJobs) { var scope = ((IAtomicOperation)((EngineJob)hiveJob.ItemJob).InitialOperation).Scope; scopes[j++] = scope; } return scopes; } catch (OperationCanceledException e) { throw e; } catch (Exception e) { LogException(e); throw e; } finally { DisposeHiveExperiment(refreshableHiveExperiment); } } private RefreshableHiveExperiment CreateHiveExperiment() { lock (locker) { var hiveExperiment = new HiveExperiment(); hiveExperiment.Name = "HiveEngine Run " + hiveExperiments.Count; hiveExperiment.DateCreated = DateTime.Now; hiveExperiment.ResourceNames = this.ResourceNames; hiveExperiment.IsPrivileged = this.IsPrivileged; var refreshableHiveExperiment = new RefreshableHiveExperiment(hiveExperiment); refreshableHiveExperiment.IsControllable = false; hiveExperiments.Add(refreshableHiveExperiment); return refreshableHiveExperiment; } } private void DisposeHiveExperiment(RefreshableHiveExperiment refreshableHiveExperiment) { refreshableHiveExperiment.RefreshAutomatically = false; DeleteHiveExperiment(refreshableHiveExperiment.HiveExperiment.Id); ClearData(refreshableHiveExperiment); } private void ClearData(RefreshableHiveExperiment refreshableHiveExperiment) { var jobs = refreshableHiveExperiment.HiveExperiment.GetAllHiveJobs(); foreach (var job in jobs) { job.ClearData(); } } private void DeleteHiveExperiment(Guid hiveExperimentId) { HiveClient.TryAndRepeat(() => { ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(hiveExperimentId)); }, 5, string.Format("Could not delete jobs")); } private List GetResourceIds() { return ServiceLocator.Instance.CallHiveService(service => { var resourceNames = ResourceNames.Split(';'); var resourceIds = new List(); foreach (var resourceName in resourceNames) { Guid resourceId = service.GetResourceId(resourceName); if (resourceId == Guid.Empty) { throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName)); } resourceIds.Add(resourceId); } return resourceIds; }); } /// /// Threadsafe message logging /// private void LogMessage(string message) { lock (logLocker) { Log.LogMessage(message); } } /// /// Threadsafe exception logging /// private void LogException(Exception exception) { lock (logLocker) { Log.LogException(exception); } } // testfunction: //private IScope[] ExecuteLocally(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) { // IScope[] scopes = new Scope[jobs.Length]; // for (int i = 0; i < jobs.Length; i++) { // var serialized = PersistenceUtil.Serialize(jobs[i]); // var deserialized = PersistenceUtil.Deserialize(serialized); // deserialized.Start(); // while (deserialized.ExecutionState != ExecutionState.Stopped) { // Thread.Sleep(100); // } // var serialized2 = PersistenceUtil.Serialize(deserialized); // var deserialized2 = PersistenceUtil.Deserialize(serialized2); // var newScope = ((IAtomicOperation)deserialized2.InitialOperation).Scope; // scopes[i] = newScope; // } // return scopes; //} } }