Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.HiveEngine/3.3/HiveEngine.cs @ 6955

Last change on this file since 6955 was 6863, checked in by ascheibe, 13 years ago

#1233 adapted code to new ThreadSafeLog

File size: 13.1 KB
RevLine 
[6212]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
[5958]23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive;
28using HeuristicLab.Common;
29using HeuristicLab.Core;
30using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
31
32namespace HeuristicLab.HiveEngine {
33  /// <summary>
34  /// Represents an engine that executes operations which can be executed in parallel on the hive
35  /// </summary>
36  [StorableClass]
37  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
38  public class HiveEngine : Engine {
[6357]39    private static object locker = new object();
[5958]40    private CancellationToken cancellationToken;
[6357]41    private bool firstRun = true;
42
[5958]43    [Storable]
44    private IOperator currentOperator;
45
46    [Storable]
[6006]47    public string ResourceNames { get; set; }
[5958]48
49    [Storable]
50    private int priority;
51    public int Priority {
52      get { return priority; }
53      set { priority = value; }
54    }
55
56    [Storable]
57    private TimeSpan executionTimeOnHive;
58    public TimeSpan ExecutionTimeOnHive {
59      get { return executionTimeOnHive; }
60      set {
61        if (value != executionTimeOnHive) {
62          executionTimeOnHive = value;
63          OnExecutionTimeOnHiveChanged();
64        }
65      }
66    }
67
[6006]68    [Storable]
[6381]69    private bool isPrivileged;
70    public bool IsPrivileged {
71      get { return isPrivileged; }
72      set { isPrivileged = value; }
73    }
74
[6725]75    // Task can't be storable, so RefreshableHiveExperiment can't be stored. But as previous runs are only informative it does not matter (only execution time on hive will be wrong because of that -> Todo)
76    private ItemCollection<RefreshableJob> jobs = new ItemCollection<RefreshableJob>();
77    public ItemCollection<RefreshableJob> Jobs {
78      get { return jobs; }
79      set { jobs = value; }
[6006]80    }
81
[6000]82    private List<Plugin> onlinePlugins;
83    public List<Plugin> OnlinePlugins {
[5958]84      get { return onlinePlugins; }
85      set { onlinePlugins = value; }
86    }
[6000]87
[5958]88    private List<Plugin> alreadyUploadedPlugins;
89    public List<Plugin> AlreadyUploadedPlugins {
90      get { return alreadyUploadedPlugins; }
91      set { alreadyUploadedPlugins = value; }
92    }
93
[6479]94    public bool IsAllowedPrivileged { get; set; }
95
[5958]96    #region constructors and cloning
97    public HiveEngine() {
[6452]98      this.ResourceNames = "HEAL";
99      this.Priority = 0;
[6863]100      this.log = new ThreadSafeLog();
[6479]101      this.IsAllowedPrivileged = ServiceLocator.Instance.CallHiveService((s) => s.IsAllowedPrivileged());
[5958]102    }
103
104    [StorableConstructor]
105    protected HiveEngine(bool deserializing) : base(deserializing) { }
106    protected HiveEngine(HiveEngine original, Cloner cloner)
107      : base(original, cloner) {
[6006]108      this.ResourceNames = original.ResourceNames;
[5958]109      this.currentOperator = cloner.Clone(original.currentOperator);
110      this.priority = original.priority;
111      this.executionTimeOnHive = original.executionTimeOnHive;
[6381]112      this.IsPrivileged = original.IsPrivileged;
[6725]113      // do not clone jobs - otherwise they would be sent with every task
[5958]114    }
115    public override IDeepCloneable Clone(Cloner cloner) {
116      return new HiveEngine(this, cloner);
117    }
118    #endregion
119
120    #region Events
121    protected override void OnPrepared() {
122      base.OnPrepared();
123      this.ExecutionTimeOnHive = TimeSpan.Zero;
124    }
125
126    public event EventHandler ExecutionTimeOnHiveChanged;
127    protected virtual void OnExecutionTimeOnHiveChanged() {
128      var handler = ExecutionTimeOnHiveChanged;
129      if (handler != null) handler(this, EventArgs.Empty);
130    }
131    #endregion
132
133    protected override void Run(CancellationToken cancellationToken) {
134      this.cancellationToken = cancellationToken;
135      Run(ExecutionStack);
136    }
137
138    private void Run(object state) {
139      Stack<IOperation> executionStack = (Stack<IOperation>)state;
140      IOperation next;
141      OperationCollection coll;
142      IAtomicOperation operation;
143
[6357]144      if (firstRun) {
145        TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
[6426]146        this.OnlinePlugins = ServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.Hash != null).ToList();
[6357]147        this.AlreadyUploadedPlugins = new List<Plugin>();
148        firstRun = false;
149      }
[5958]150
[6357]151      while (executionStack.Count > 0) {
[5958]152        cancellationToken.ThrowIfCancellationRequested();
153
[6357]154        next = executionStack.Pop();
[5958]155        if (next is OperationCollection) {
156          coll = (OperationCollection)next;
[6357]157
[6381]158          if (coll.Parallel) {
[6419]159            try {
[6725]160              // clone the parent scope here and reuse it for each operation. otherwise for each task the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
[6419]161              IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
162              parentScopeClone.SubScopes.Clear();
163              parentScopeClone.ClearParentScopes();
[5958]164
[6725]165              EngineTask[] jobs = new EngineTask[coll.Count];
[6419]166              for (int i = 0; i < coll.Count; i++) {
[6725]167                jobs[i] = new EngineTask(coll[i], new SequentialEngine.SequentialEngine());
[6419]168              }
[5958]169
[6725]170              var experiment = CreateJob();
[6419]171              IScope[] scopes = ExecuteOnHive(experiment, jobs, parentScopeClone, cancellationToken);
[6178]172
[6419]173              for (int i = 0; i < coll.Count; i++) {
174                if (coll[i] is IAtomicOperation) {
175                  ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
176                } else if (coll[i] is OperationCollection) {
177                  // todo ??
178                }
[5958]179              }
180            }
[6419]181            catch {
182              executionStack.Push(coll); throw;
183            }
[5958]184          } else {
185            for (int i = coll.Count - 1; i >= 0; i--)
186              if (coll[i] != null) executionStack.Push(coll[i]);
187          }
188        } else if (next is IAtomicOperation) {
189          operation = (IAtomicOperation)next;
190          try {
191            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
192          }
193          catch (Exception ex) {
[6357]194            executionStack.Push(operation);
[5958]195            if (ex is OperationCanceledException) throw ex;
196            else throw new OperatorExecutionException(operation.Operator, ex);
197          }
[6357]198          if (next != null) executionStack.Push(next);
[5958]199
200          if (operation.Operator.Breakpoint) {
[6435]201            log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
[5958]202            Pause();
203          }
204        }
205      }
206    }
207
208    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
209      e.SetObserved(); // avoid crash of process
210    }
211
212    private IRandom FindRandomParameter(IExecutionContext ec) {
213      try {
214        if (ec == null)
215          return null;
216
217        foreach (var p in ec.Parameters) {
218          if (p.Name == "Random" && p is IValueParameter)
219            return ((IValueParameter)p).Value as IRandom;
220        }
221        return FindRandomParameter(ec.Parent);
222      }
223      catch { return null; }
224    }
225
226    private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
227      ExchangeScope(source.Scope, target.Scope);
228    }
229
230    private static void ExchangeScope(IScope source, IScope target) {
231      target.Variables.Clear();
232      target.Variables.AddRange(source.Variables);
233      target.SubScopes.Clear();
234      target.SubScopes.AddRange(source.SubScopes);
235      // TODO: validate if parent scopes match - otherwise source is invalid
236    }
237
238    /// <summary>
239    /// This method blocks until all jobs are finished
240    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
241    /// </summary>
242    /// <param name="jobs"></param>
[6725]243    private IScope[] ExecuteOnHive(RefreshableJob refreshableJob, EngineTask[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
[6435]244      log.LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
[5958]245      IScope[] scopes = new Scope[jobs.Length];
246      object locker = new object();
[6725]247      var hiveExperiment = refreshableJob.Job;
[5958]248
249      try {
250        // create upload-tasks
251        for (int i = 0; i < jobs.Length; i++) {
[6725]252          var engineHiveJob = new EngineHiveTask(jobs[i], parentScopeClone);
253          engineHiveJob.Task.Priority = this.Priority;
[6743]254          refreshableJob.HiveTasks.Add(engineHiveJob);
[5958]255
256          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
[6178]257          IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext);
[5958]258          if (random != null)
259            random.Reset(random.Next());
260        }
[6743]261        HiveClient.StartJob((e) => { log.LogException(e); }, refreshableJob, cancellationToken);
[6200]262
[6178]263        // do polling until experiment is finished and all jobs are downloaded
[6725]264        while (!refreshableJob.AllJobsFinished()) {
[6381]265          Thread.Sleep(2000);
[6725]266          this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(jobs.Sum(x => x.ExecutionTime.TotalMilliseconds));
[6198]267          cancellationToken.ThrowIfCancellationRequested();
[6111]268        }
[6725]269        log.LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableJob.ToString(), refreshableJob.ExecutionTime));
[6111]270
[6743]271        var failedJobs = refreshableJob.HiveTasks.Where(x => x.Task.State == TaskState.Failed);
[6419]272        if (failedJobs.Count() > 0) {
[6725]273          throw new HiveEngineException("Task failed: " + failedJobs.First().Task.StateLog.Last().Exception);
[6381]274        }
275
[6178]276        // get scopes
277        int j = 0;
[6743]278        foreach (var hiveJob in refreshableJob.HiveTasks) {
[6725]279          var scope = ((IAtomicOperation)((EngineTask)hiveJob.ItemTask).InitialOperation).Scope;
[6178]280          scopes[j++] = scope;
[5958]281        }
282        return scopes;
283      }
284      catch (OperationCanceledException e) {
285        throw e;
286      }
287      catch (Exception e) {
[6435]288        log.LogException(e);
[5958]289        throw e;
290      }
[6419]291      finally {
[6725]292        DisposeJob(refreshableJob);
[6419]293      }
[5958]294    }
295
[6725]296    private RefreshableJob CreateJob() {
[6357]297      lock (locker) {
[6723]298        var hiveExperiment = new Job();
[6725]299        hiveExperiment.Name = "HiveEngine Run " + jobs.Count;
[6357]300        hiveExperiment.DateCreated = DateTime.Now;
301        hiveExperiment.ResourceNames = this.ResourceNames;
[6381]302        hiveExperiment.IsPrivileged = this.IsPrivileged;
[6725]303        var refreshableHiveExperiment = new RefreshableJob(hiveExperiment);
[6479]304        refreshableHiveExperiment.IsDownloadable = false; // download happens automatically so disable button
[6725]305        jobs.Add(refreshableHiveExperiment);
[6357]306        return refreshableHiveExperiment;
307      }
308    }
309
[6725]310    private void DisposeJob(RefreshableJob refreshableJob) {
311      refreshableJob.RefreshAutomatically = false;
312      DeleteHiveExperiment(refreshableJob.Job.Id);
313      ClearData(refreshableJob);
[6357]314    }
315
[6725]316    private void ClearData(RefreshableJob refreshableJob) {
[6743]317      var jobs = refreshableJob.GetAllHiveTasks();
[6200]318      foreach (var job in jobs) {
319        job.ClearData();
320      }
321    }
322
[6743]323    private void DeleteHiveExperiment(Guid jobId) {
[6373]324      HiveClient.TryAndRepeat(() => {
[6743]325        ServiceLocator.Instance.CallHiveService(s => s.DeleteJob(jobId));
[6006]326      }, 5, string.Format("Could not delete jobs"));
[5958]327    }
[6357]328
[5958]329    private List<Guid> GetResourceIds() {
330      return ServiceLocator.Instance.CallHiveService(service => {
[6006]331        var resourceNames = ResourceNames.Split(';');
[5958]332        var resourceIds = new List<Guid>();
333        foreach (var resourceName in resourceNames) {
334          Guid resourceId = service.GetResourceId(resourceName);
335          if (resourceId == Guid.Empty) {
336            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
337          }
338          resourceIds.Add(resourceId);
339        }
340        return resourceIds;
341      });
342    }
343  }
344}
Note: See TracBrowser for help on using the repository browser.