#region License Information /* HeuristicLab * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using HeuristicLab.Clients.Hive; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Persistence.Default.CompositeSerializers.Storable; namespace HeuristicLab.HiveEngine { /// /// Represents an engine that executes operations which can be executed in parallel on the hive /// [StorableClass] [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")] public class HiveEngine : Engine { private static object locker = new object(); private CancellationToken cancellationToken; private bool firstRun = true; [Storable] private IOperator currentOperator; [Storable] public string ResourceNames { get; set; } [Storable] private int priority; public int Priority { get { return priority; } set { priority = value; } } [Storable] private TimeSpan executionTimeOnHive; public TimeSpan ExecutionTimeOnHive { get { return executionTimeOnHive; } set { if (value != executionTimeOnHive) { executionTimeOnHive = value; OnExecutionTimeOnHiveChanged(); } } } [Storable] private bool isPrivileged; public bool IsPrivileged { get { return isPrivileged; } set { isPrivileged = value; } } // HiveExperiment can't be storable, so RefreshableHiveExperiment can't be stored. But as previous runs are only informative it does not matter (only execution time on hive will be wrong because of that -> Todo) private ItemCollection hiveExperiments = new ItemCollection(); public ItemCollection HiveExperiments { get { return hiveExperiments; } set { hiveExperiments = value; } } private List onlinePlugins; public List OnlinePlugins { get { return onlinePlugins; } set { onlinePlugins = value; } } private List alreadyUploadedPlugins; public List AlreadyUploadedPlugins { get { return alreadyUploadedPlugins; } set { alreadyUploadedPlugins = value; } } #region constructors and cloning public HiveEngine() { ResourceNames = "HEAL"; Priority = 0; this.log = new ThreadSafeLog(this.log); } [StorableConstructor] protected HiveEngine(bool deserializing) : base(deserializing) { } protected HiveEngine(HiveEngine original, Cloner cloner) : base(original, cloner) { this.ResourceNames = original.ResourceNames; this.currentOperator = cloner.Clone(original.currentOperator); this.priority = original.priority; this.executionTimeOnHive = original.executionTimeOnHive; this.IsPrivileged = original.IsPrivileged; // do not clone hiveExperiments - otherwise they would be sent with every job } public override IDeepCloneable Clone(Cloner cloner) { return new HiveEngine(this, cloner); } #endregion #region Events protected override void OnPrepared() { base.OnPrepared(); this.ExecutionTimeOnHive = TimeSpan.Zero; } public event EventHandler ExecutionTimeOnHiveChanged; protected virtual void OnExecutionTimeOnHiveChanged() { var handler = ExecutionTimeOnHiveChanged; if (handler != null) handler(this, EventArgs.Empty); } #endregion protected override void Run(CancellationToken cancellationToken) { this.cancellationToken = cancellationToken; Run(ExecutionStack); } private void Run(object state) { Stack executionStack = (Stack)state; IOperation next; OperationCollection coll; IAtomicOperation operation; if (firstRun) { TaskScheduler.UnobservedTaskException += new EventHandler(TaskScheduler_UnobservedTaskException); this.OnlinePlugins = ServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.Hash != null).ToList(); this.AlreadyUploadedPlugins = new List(); firstRun = false; } while (executionStack.Count > 0) { cancellationToken.ThrowIfCancellationRequested(); next = executionStack.Pop(); if (next is OperationCollection) { coll = (OperationCollection)next; if (coll.Parallel) { try { // clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone(); parentScopeClone.SubScopes.Clear(); parentScopeClone.ClearParentScopes(); EngineJob[] jobs = new EngineJob[coll.Count]; for (int i = 0; i < coll.Count; i++) { jobs[i] = new EngineJob(coll[i], new SequentialEngine.SequentialEngine()); } var experiment = CreateHiveExperiment(); IScope[] scopes = ExecuteOnHive(experiment, jobs, parentScopeClone, cancellationToken); for (int i = 0; i < coll.Count; i++) { if (coll[i] is IAtomicOperation) { ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope); } else if (coll[i] is OperationCollection) { // todo ?? } } } catch { executionStack.Push(coll); throw; } } else { for (int i = coll.Count - 1; i >= 0; i--) if (coll[i] != null) executionStack.Push(coll[i]); } } else if (next is IAtomicOperation) { operation = (IAtomicOperation)next; try { next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken); } catch (Exception ex) { executionStack.Push(operation); if (ex is OperationCanceledException) throw ex; else throw new OperatorExecutionException(operation.Operator, ex); } if (next != null) executionStack.Push(next); if (operation.Operator.Breakpoint) { log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName)); Pause(); } } } } private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) { e.SetObserved(); // avoid crash of process } private IRandom FindRandomParameter(IExecutionContext ec) { try { if (ec == null) return null; foreach (var p in ec.Parameters) { if (p.Name == "Random" && p is IValueParameter) return ((IValueParameter)p).Value as IRandom; } return FindRandomParameter(ec.Parent); } catch { return null; } } private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) { ExchangeScope(source.Scope, target.Scope); } private static void ExchangeScope(IScope source, IScope target) { target.Variables.Clear(); target.Variables.AddRange(source.Variables); target.SubScopes.Clear(); target.SubScopes.AddRange(source.SubScopes); // TODO: validate if parent scopes match - otherwise source is invalid } /// /// This method blocks until all jobs are finished /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation /// /// private IScope[] ExecuteOnHive(RefreshableHiveExperiment refreshableHiveExperiment, EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) { log.LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length)); IScope[] scopes = new Scope[jobs.Length]; object locker = new object(); var hiveExperiment = refreshableHiveExperiment.HiveExperiment; try { // create upload-tasks for (int i = 0; i < jobs.Length; i++) { var engineHiveJob = new EngineHiveJob(jobs[i], parentScopeClone); engineHiveJob.Job.Priority = this.Priority; hiveExperiment.HiveJobs.Add(engineHiveJob); // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable) IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext); if (random != null) random.Reset(random.Next()); } HiveClient.StartExperiment((e) => { log.LogException(e); }, refreshableHiveExperiment); // do polling until experiment is finished and all jobs are downloaded while (!refreshableHiveExperiment.AllJobsFinished()) { Thread.Sleep(2000); this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(hiveExperiments.Sum(x => x.HiveExperiment.ExecutionTime.TotalMilliseconds)); cancellationToken.ThrowIfCancellationRequested(); } log.LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableHiveExperiment.ToString(), refreshableHiveExperiment.HiveExperiment.ExecutionTime)); var failedJobs = hiveExperiment.HiveJobs.Where(x => x.Job.State == JobState.Failed); if (failedJobs.Count() > 0) { throw new HiveEngineException("Job failed: " + failedJobs.First().Job.StateLog.Last().Exception); } // get scopes int j = 0; foreach (var hiveJob in hiveExperiment.HiveJobs) { var scope = ((IAtomicOperation)((EngineJob)hiveJob.ItemJob).InitialOperation).Scope; scopes[j++] = scope; } return scopes; } catch (OperationCanceledException e) { throw e; } catch (Exception e) { log.LogException(e); throw e; } finally { DisposeHiveExperiment(refreshableHiveExperiment); } } private RefreshableHiveExperiment CreateHiveExperiment() { lock (locker) { var hiveExperiment = new HiveExperiment(); hiveExperiment.Name = "HiveEngine Run " + hiveExperiments.Count; hiveExperiment.DateCreated = DateTime.Now; hiveExperiment.ResourceNames = this.ResourceNames; hiveExperiment.IsPrivileged = this.IsPrivileged; var refreshableHiveExperiment = new RefreshableHiveExperiment(hiveExperiment); refreshableHiveExperiment.IsControllable = false; hiveExperiments.Add(refreshableHiveExperiment); return refreshableHiveExperiment; } } private void DisposeHiveExperiment(RefreshableHiveExperiment refreshableHiveExperiment) { refreshableHiveExperiment.RefreshAutomatically = false; DeleteHiveExperiment(refreshableHiveExperiment.HiveExperiment.Id); ClearData(refreshableHiveExperiment); } private void ClearData(RefreshableHiveExperiment refreshableHiveExperiment) { var jobs = refreshableHiveExperiment.HiveExperiment.GetAllHiveJobs(); foreach (var job in jobs) { job.ClearData(); } } private void DeleteHiveExperiment(Guid hiveExperimentId) { HiveClient.TryAndRepeat(() => { ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(hiveExperimentId)); }, 5, string.Format("Could not delete jobs")); } private List GetResourceIds() { return ServiceLocator.Instance.CallHiveService(service => { var resourceNames = ResourceNames.Split(';'); var resourceIds = new List(); foreach (var resourceName in resourceNames) { Guid resourceId = service.GetResourceId(resourceName); if (resourceId == Guid.Empty) { throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName)); } resourceIds.Add(resourceId); } return resourceIds; }); } } }