Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.HiveEngine/3.4/HiveEngine.cs @ 6111

Last change on this file since 6111 was 6111, checked in by cneumuel, 13 years ago

#1233

  • improved the way jobs are downloaded by ExperimentManager and HiveEngine
File size: 20.5 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Threading;
5using System.Threading.Tasks;
6using HeuristicLab.Clients.Hive;
7using HeuristicLab.Common;
8using HeuristicLab.Core;
9using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
10using HeuristicLab.PluginInfrastructure;
11
12namespace HeuristicLab.HiveEngine {
13  /// <summary>
14  /// Represents an engine that executes operations which can be executed in parallel on the hive
15  /// </summary>
16  [StorableClass]
17  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
18  public class HiveEngine : Engine {
19    private Semaphore maxConcurrentConnections = new Semaphore(4, 4); // avoid too many connections
20    private Semaphore maxSerializedJobsInMemory = new Semaphore(4, 4); // avoid memory problems
21    private CancellationToken cancellationToken;
22
23    [Storable]
24    private IOperator currentOperator;
25
26    [Storable]
27    public string ResourceNames { get; set; }
28
29    [Storable]
30    private int priority;
31    public int Priority {
32      get { return priority; }
33      set { priority = value; }
34    }
35
36    [Storable]
37    private TimeSpan executionTimeOnHive;
38    public TimeSpan ExecutionTimeOnHive {
39      get { return executionTimeOnHive; }
40      set {
41        if (value != executionTimeOnHive) {
42          executionTimeOnHive = value;
43          OnExecutionTimeOnHiveChanged();
44        }
45      }
46    }
47
48    [Storable]
49    private bool useLocalPlugins;
50    public bool UseLocalPlugins {
51      get { return useLocalPlugins; }
52      set { useLocalPlugins = value; }
53    }
54
55    [Storable]
56    private ItemCollection<RefreshableHiveExperiment> hiveExperiments;
57    public ItemCollection<RefreshableHiveExperiment> HiveExperiments {
58      get { return hiveExperiments; }
59      set { hiveExperiments = value; }
60    }
61
62    private List<Plugin> onlinePlugins;
63    public List<Plugin> OnlinePlugins {
64      get { return onlinePlugins; }
65      set { onlinePlugins = value; }
66    }
67
68    private List<Plugin> alreadyUploadedPlugins;
69    public List<Plugin> AlreadyUploadedPlugins {
70      get { return alreadyUploadedPlugins; }
71      set { alreadyUploadedPlugins = value; }
72    }
73
74    #region constructors and cloning
75    public HiveEngine() {
76      ResourceNames = "HEAL";
77      HiveExperiments = new ItemCollection<RefreshableHiveExperiment>();
78      Priority = 0;
79    }
80
81    [StorableConstructor]
82    protected HiveEngine(bool deserializing) : base(deserializing) { }
83    protected HiveEngine(HiveEngine original, Cloner cloner)
84      : base(original, cloner) {
85      this.ResourceNames = original.ResourceNames;
86      this.currentOperator = cloner.Clone(original.currentOperator);
87      this.priority = original.priority;
88      this.executionTimeOnHive = original.executionTimeOnHive;
89      this.useLocalPlugins = original.useLocalPlugins;
90    }
91    public override IDeepCloneable Clone(Cloner cloner) {
92      return new HiveEngine(this, cloner);
93    }
94    #endregion
95
96    #region Events
97    protected override void OnPrepared() {
98      base.OnPrepared();
99      this.ExecutionTimeOnHive = TimeSpan.Zero;
100    }
101
102    public event EventHandler ExecutionTimeOnHiveChanged;
103    protected virtual void OnExecutionTimeOnHiveChanged() {
104      var handler = ExecutionTimeOnHiveChanged;
105      if (handler != null) handler(this, EventArgs.Empty);
106    }
107    #endregion
108
109    protected override void Run(CancellationToken cancellationToken) {
110      this.cancellationToken = cancellationToken;
111      Run(ExecutionStack);
112    }
113
114    private void Run(object state) {
115      Stack<IOperation> executionStack = (Stack<IOperation>)state;
116      IOperation next;
117      OperationCollection coll;
118      IAtomicOperation operation;
119      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
120
121      this.OnlinePlugins = ServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.IsLocal == false).ToList();
122      this.AlreadyUploadedPlugins = new List<Plugin>();
123
124      while (ExecutionStack.Count > 0) {
125        cancellationToken.ThrowIfCancellationRequested();
126
127        next = ExecutionStack.Pop();
128        if (next is OperationCollection) {
129          coll = (OperationCollection)next;
130          if (coll.Parallel) {
131            // clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
132            IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
133            parentScopeClone.SubScopes.Clear();
134            parentScopeClone.ClearParentScopes();
135
136            EngineJob[] jobs = new EngineJob[coll.Count];
137            for (int i = 0; i < coll.Count; i++) {
138              jobs[i] = new EngineJob(coll[i], new SequentialEngine.SequentialEngine());
139            }
140
141            IScope[] scopes = ExecuteOnHive(jobs, parentScopeClone, cancellationToken);
142           
143            for (int i = 0; i < coll.Count; i++) {
144              if (coll[i] is IAtomicOperation) {
145                ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
146              } else if (coll[i] is OperationCollection) {
147                // todo ??
148              }
149            }
150          } else {
151            for (int i = coll.Count - 1; i >= 0; i--)
152              if (coll[i] != null) executionStack.Push(coll[i]);
153          }
154        } else if (next is IAtomicOperation) {
155          operation = (IAtomicOperation)next;
156          try {
157            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
158          }
159          catch (Exception ex) {
160            ExecutionStack.Push(operation);
161            if (ex is OperationCanceledException) throw ex;
162            else throw new OperatorExecutionException(operation.Operator, ex);
163          }
164          if (next != null) ExecutionStack.Push(next);
165
166          if (operation.Operator.Breakpoint) {
167            LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
168            Pause();
169          }
170        }
171      }
172    }
173
174    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
175      e.SetObserved(); // avoid crash of process
176    }
177
178    private IRandom FindRandomParameter(IExecutionContext ec) {
179      try {
180        if (ec == null)
181          return null;
182
183        foreach (var p in ec.Parameters) {
184          if (p.Name == "Random" && p is IValueParameter)
185            return ((IValueParameter)p).Value as IRandom;
186        }
187        return FindRandomParameter(ec.Parent);
188      }
189      catch { return null; }
190    }
191
192    private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
193      ExchangeScope(source.Scope, target.Scope);
194    }
195
196    private static void ExchangeScope(IScope source, IScope target) {
197      target.Variables.Clear();
198      target.Variables.AddRange(source.Variables);
199      target.SubScopes.Clear();
200      target.SubScopes.AddRange(source.SubScopes);
201      // TODO: validate if parent scopes match - otherwise source is invalid
202    }
203
204    // testfunction:
205    //private IScope[] ExecuteLocally(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
206    //  IScope[] scopes = new Scope[jobs.Length];
207    //  for (int i = 0; i < jobs.Length; i++) {
208    //    var job = (EngineJob)jobs[i].Clone();
209    //    job.Start();
210    //    while (job.ExecutionState != ExecutionState.Stopped) {
211    //      Thread.Sleep(100);
212    //    }
213    //    scopes[i] = ((IAtomicOperation)job.InitialOperation).Scope;
214    //  }
215    //  return scopes;
216    //}
217
218    /// <summary>
219    /// This method blocks until all jobs are finished
220    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
221    /// </summary>
222    /// <param name="jobs"></param>
223    private IScope[] ExecuteOnHive(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
224      LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
225      IScope[] scopes = new Scope[jobs.Length];
226      object locker = new object();
227      IDictionary<Guid, int> jobIndices = new Dictionary<Guid, int>();
228      var hiveExperiment = new HiveExperiment();
229
230      try {
231        List<Guid> remainingJobIds = new List<Guid>();
232        List<LightweightJob> lightweightJobs;
233
234        int finishedCount = 0;
235        int uploadCount = 0;
236
237        // create hive experiment
238        hiveExperiment.Name = "HiveEngine Run " + hiveExperiments.Count;
239        hiveExperiment.UseLocalPlugins = this.UseLocalPlugins;
240        hiveExperiment.ResourceNames = this.ResourceNames;
241        hiveExperiment.Id = ServiceLocator.Instance.CallHiveService(s => s.AddHiveExperiment(hiveExperiment));
242        var refreshableHiveExperiment = new RefreshableHiveExperiment(hiveExperiment);
243        hiveExperiments.Add(refreshableHiveExperiment);
244
245        // create upload-tasks
246        var uploadTasks = new List<Task<Job>>();
247        for (int i = 0; i < jobs.Length; i++) {
248          var job = jobs[i];
249
250          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
251          IRandom random = FindRandomParameter(job.InitialOperation as IExecutionContext);
252          if (random != null)
253            random.Reset(random.Next());
254
255          uploadTasks.Add(Task.Factory.StartNew<Job>((keyValuePairObj) => {
256            return UploadJob(keyValuePairObj, parentScopeClone, cancellationToken, GetResourceIds(), hiveExperiment.Id);
257          }, new KeyValuePair<int, EngineJob>(i, job), cancellationToken));
258        }
259
260        Task processUploadedJobsTask = new Task(() => {
261          // process finished upload-tasks
262          int uploadTasksCount = uploadTasks.Count;
263          for (int i = 0; i < uploadTasksCount; i++) {
264            cancellationToken.ThrowIfCancellationRequested();
265
266            var uploadTasksArray = uploadTasks.ToArray();
267            var task = uploadTasksArray[Task.WaitAny(uploadTasksArray)];
268            if (task.Status == TaskStatus.Faulted) {
269              LogException(task.Exception);
270              throw task.Exception;
271            }
272
273            int key = ((KeyValuePair<int, EngineJob>)task.AsyncState).Key;
274            Job job = task.Result;
275            lock (locker) {
276              uploadCount++;
277              jobIndices.Add(job.Id, key);
278              remainingJobIds.Add(job.Id);
279            }
280            jobs[key] = null; // relax memory
281            LogMessage(string.Format("Uploaded job #{0}", key + 1, job.Id));
282            uploadTasks.Remove(task);
283          }
284        }, cancellationToken, TaskCreationOptions.PreferFairness);
285        processUploadedJobsTask.Start();
286
287        refreshableHiveExperiment.RefreshAutomatically = true;
288       
289        while (!refreshableHiveExperiment.AllJobsFinished()) {
290          Thread.Sleep(1000);
291          // update time
292          // handle cancellation
293        }
294
295
296
297        // poll job-statuses and create tasks for those which are finished
298        var downloadTasks = new List<Task<EngineJob>>();
299        var executionTimes = new List<TimeSpan>();
300        var executionTimeOnHiveBefore = executionTimeOnHive;
301        while (processUploadedJobsTask.Status != TaskStatus.RanToCompletion || remainingJobIds.Count > 0) {
302          cancellationToken.ThrowIfCancellationRequested();
303
304          Thread.Sleep(10000);
305          try {
306            lightweightJobs = ServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobs(remainingJobIds));
307
308            var jobsFinished = lightweightJobs.Where(j => j.State == JobState.Finished || j.State == JobState.Failed || j.State == JobState.Aborted);
309            finishedCount += jobsFinished.Count();
310            if (jobsFinished.Count() > 0) LogMessage(string.Format("Finished: {0}/{1}", finishedCount, jobs.Length));
311            ExecutionTimeOnHive = executionTimeOnHiveBefore + executionTimes.Sum() + lightweightJobs.Select(x => x.ExecutionTime.HasValue ? x.ExecutionTime.Value : TimeSpan.Zero).Sum();
312
313            foreach (var result in jobsFinished) {
314              if (result.State == JobState.Finished) {
315                downloadTasks.Add(Task.Factory.StartNew<EngineJob>((jobIdObj) => {
316                  return DownloadJob(jobIndices, jobIdObj, cancellationToken);
317                }, result.Id, cancellationToken));
318              } else if (result.State == JobState.Aborted) {
319                LogMessage(string.Format("Job #{0} aborted (id: {1})", jobIndices[result.Id] + 1, result.Id));
320              } else if (result.State == JobState.Failed) {
321                LogMessage(string.Format("Job #{0} failed (id: {1}): {2}", jobIndices[result.Id] + 1, result.Id, result.CurrentStateLog != null ? result.CurrentStateLog.Exception : string.Empty));
322              }
323              remainingJobIds.Remove(result.Id);
324              executionTimes.Add(result.ExecutionTime.HasValue ? result.ExecutionTime.Value : TimeSpan.Zero);
325            }
326          }
327          catch (Exception e) {
328            LogException(e);
329          }
330        }
331
332        // process finished download-tasks
333        int downloadTasksCount = downloadTasks.Count;
334        for (int i = 0; i < downloadTasksCount; i++) {
335          cancellationToken.ThrowIfCancellationRequested();
336
337          var downloadTasksArray = downloadTasks.ToArray();
338          var task = downloadTasksArray[Task.WaitAny(downloadTasksArray)];
339          var jobId = (Guid)task.AsyncState;
340          if (task.Status == TaskStatus.Faulted) {
341            LogException(task.Exception);
342            throw task.Exception;
343          }
344          scopes[jobIndices[(Guid)task.AsyncState]] = ((IAtomicOperation)task.Result.InitialOperation).Scope;
345          downloadTasks.Remove(task);
346        }
347
348        LogMessage(string.Format("All jobs finished (TotalExecutionTime: {0}).", executionTimes.Sum()));
349        DeleteHiveExperiment(hiveExperiment.Id);
350
351        return scopes;
352      }
353      catch (OperationCanceledException e) {
354        lock (locker) {
355          if (jobIndices != null) DeleteHiveExperiment(hiveExperiment.Id);
356        }
357        throw e;
358      }
359      catch (Exception e) {
360        lock (locker) {
361          if (jobIndices != null) DeleteHiveExperiment(hiveExperiment.Id);
362        }
363        LogException(e);
364        throw e;
365      }
366    }
367
368    private void DeleteHiveExperiment(Guid hiveExperimentId) {
369      TryAndRepeat(() => {
370        ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(hiveExperimentId));
371      }, 5, string.Format("Could not delete jobs"));
372    }
373
374    private static object locker = new object();
375    private Job UploadJob(object keyValuePairObj, IScope parentScopeClone, CancellationToken cancellationToken, List<Guid> resourceIds, Guid hiveExperimentId) {
376      var keyValuePair = (KeyValuePair<int, EngineJob>)keyValuePairObj;
377      Job job = new Job();
378
379      try {
380        maxSerializedJobsInMemory.WaitOne();
381        JobData jobData = new JobData();
382        IEnumerable<Type> usedTypes;
383
384        // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems
385        lock (locker) {
386          ((IAtomicOperation)keyValuePair.Value.InitialOperation).Scope.Parent = parentScopeClone;
387          keyValuePair.Value.InitialOperation = (IOperation)keyValuePair.Value.InitialOperation.Clone();
388          if (keyValuePair.Value.InitialOperation is IAtomicOperation)
389            ((IAtomicOperation)keyValuePair.Value.InitialOperation).Scope.ClearParentScopes();
390          jobData.Data = PersistenceUtil.Serialize(keyValuePair.Value, out usedTypes);
391        }
392        var neededPlugins = new List<IPluginDescription>();
393
394        bool useAllLocalPlugins = true;
395        if (useAllLocalPlugins) {
396          // use all plugins
397          neededPlugins.AddRange(ApplicationManager.Manager.Plugins);
398        } else {
399          // use only
400          PluginUtil.CollectDeclaringPlugins(neededPlugins, usedTypes);
401        }
402
403        job.CoresNeeded = 1;
404        job.PluginsNeededIds = ServiceLocator.Instance.CallHiveService(s => PluginUtil.GetPluginDependencies(s, this.OnlinePlugins, this.AlreadyUploadedPlugins, neededPlugins, useLocalPlugins));
405        job.Priority = priority;
406        job.HiveExperimentId = hiveExperimentId;
407
408        try {
409          maxConcurrentConnections.WaitOne();
410          while (job.Id == Guid.Empty) { // repeat until success
411            cancellationToken.ThrowIfCancellationRequested();
412            try {
413              job.Id = ServiceLocator.Instance.CallHiveService(s => s.AddJob(job, jobData, resourceIds));
414            }
415            catch (Exception e) {
416              LogException(e);
417              LogMessage("Repeating upload");
418            }
419          }
420        }
421        finally {
422          maxConcurrentConnections.Release();
423        }
424      }
425      finally {
426        maxSerializedJobsInMemory.Release();
427      }
428      return job;
429    }
430
431    private List<Guid> GetResourceIds() {
432      return ServiceLocator.Instance.CallHiveService(service => {
433        var resourceNames = ResourceNames.Split(';');
434        var resourceIds = new List<Guid>();
435        foreach (var resourceName in resourceNames) {
436          Guid resourceId = service.GetResourceId(resourceName);
437          if (resourceId == Guid.Empty) {
438            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
439          }
440          resourceIds.Add(resourceId);
441        }
442        return resourceIds;
443      });
444    }
445
446    private EngineJob DownloadJob(IDictionary<Guid, int> jobIndices, object jobIdObj, CancellationToken cancellationToken) {
447      Guid jobId = (Guid)jobIdObj;
448      JobData jobData = null;
449      EngineJob engineJob = null;
450      try {
451        maxSerializedJobsInMemory.WaitOne();
452        maxConcurrentConnections.WaitOne();
453        while (jobData == null) { // repeat until success
454          cancellationToken.ThrowIfCancellationRequested();
455          try {
456            jobData = ServiceLocator.Instance.CallHiveService(s => s.GetJobData(jobId));
457          }
458          catch (Exception e) {
459            LogException(e);
460            LogMessage("Repeating download");
461          }
462        }
463        engineJob = PersistenceUtil.Deserialize<EngineJob>(jobData.Data);
464        jobData = null;
465        LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId));
466      }
467      finally {
468        maxConcurrentConnections.Release();
469        maxSerializedJobsInMemory.Release();
470      }
471      return engineJob;
472    }
473
474    /// <summary>
475    /// Threadsafe message logging
476    /// </summary>
477    private void LogMessage(string message) {
478      lock (Log) {
479        Log.LogMessage(message);
480      }
481    }
482
483    /// <summary>
484    /// Threadsafe exception logging
485    /// </summary>
486    private void LogException(Exception exception) {
487      lock (Log) {
488        Log.LogException(exception);
489      }
490    }
491
492    /// <summary>
493    /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
494    /// If repetitions is -1, it is repeated infinitely.
495    /// </summary>
496    private static void TryAndRepeat(Action action, int repetitions, string errorMessage) {
497      try { action(); }
498      catch (Exception e) {
499        repetitions--;
500        if (repetitions <= 0)
501          throw new HiveEngineException(errorMessage, e);
502        TryAndRepeat(action, repetitions, errorMessage);
503      }
504    }
505  }
506
507  public static class ScopeExtensions {
508    public static void ClearParentScopes(this IScope scope) {
509      scope.ClearParentScopes(null);
510    }
511
512    public static void ClearParentScopes(this IScope scope, IScope childScope) {
513      if (childScope != null) {
514        scope.SubScopes.Clear();
515        scope.SubScopes.Add(childScope);
516      }
517      if (scope.Parent != null)
518        scope.Parent.ClearParentScopes(scope);
519    }
520  }
521
522  public static class EnumerableExtensions {
523    public static TimeSpan Sum(this IEnumerable<TimeSpan> times) {
524      return TimeSpan.FromMilliseconds(times.Select(e => e.TotalMilliseconds).Sum());
525    }
526  }
527}
Note: See TracBrowser for help on using the repository browser.