Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.HiveEngine/3.4/HiveEngine.cs @ 6000

Last change on this file since 6000 was 6000, checked in by cneumuel, 13 years ago

#1233

  • added GetPlugin service method
  • fixed minor issues with double plugins in database
  • worked on HiveEngine
  • fixed wrong role name for Hive User
  • fixed bug in group assignment of slaves
File size: 19.1 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Threading;
5using System.Threading.Tasks;
6using HeuristicLab.Clients.Hive;
7using HeuristicLab.Common;
8using HeuristicLab.Core;
9using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
10using HeuristicLab.PluginInfrastructure;
11
12namespace HeuristicLab.HiveEngine {
13  /// <summary>
14  /// Represents an engine that executes operations which can be executed in parallel on the hive
15  /// </summary>
16  [StorableClass]
17  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
18  public class HiveEngine : Engine {
19    private Semaphore maxConcurrentConnections = new Semaphore(4, 4); // avoid too many connections
20    private Semaphore maxSerializedJobsInMemory = new Semaphore(4, 4); // avoid memory problems
21    private CancellationToken cancellationToken;
22
23    [Storable]
24    private IOperator currentOperator;
25
26    [Storable]
27    public string ResourceIds { get; set; }
28
29    [Storable]
30    private int priority;
31    public int Priority {
32      get { return priority; }
33      set { priority = value; }
34    }
35
36    [Storable]
37    private TimeSpan executionTimeOnHive;
38    public TimeSpan ExecutionTimeOnHive {
39      get { return executionTimeOnHive; }
40      set {
41        if (value != executionTimeOnHive) {
42          executionTimeOnHive = value;
43          OnExecutionTimeOnHiveChanged();
44        }
45      }
46    }
47
48    private List<Plugin> onlinePlugins;
49    public List<Plugin> OnlinePlugins {
50      get { return onlinePlugins; }
51      set { onlinePlugins = value; }
52    }
53
54    private List<Plugin> alreadyUploadedPlugins;
55    public List<Plugin> AlreadyUploadedPlugins {
56      get { return alreadyUploadedPlugins; }
57      set { alreadyUploadedPlugins = value; }
58    }
59
60    #region constructors and cloning
61    public HiveEngine() {
62      ResourceIds = "HEAL";
63    }
64
65    [StorableConstructor]
66    protected HiveEngine(bool deserializing) : base(deserializing) { }
67    protected HiveEngine(HiveEngine original, Cloner cloner)
68      : base(original, cloner) {
69      this.ResourceIds = original.ResourceIds;
70      this.currentOperator = cloner.Clone(original.currentOperator);
71      this.priority = original.priority;
72      this.executionTimeOnHive = original.executionTimeOnHive;
73    }
74    public override IDeepCloneable Clone(Cloner cloner) {
75      return new HiveEngine(this, cloner);
76    }
77    #endregion
78
79    #region Events
80    protected override void OnPrepared() {
81      base.OnPrepared();
82      this.ExecutionTimeOnHive = TimeSpan.Zero;
83    }
84
85    public event EventHandler ExecutionTimeOnHiveChanged;
86    protected virtual void OnExecutionTimeOnHiveChanged() {
87      var handler = ExecutionTimeOnHiveChanged;
88      if (handler != null) handler(this, EventArgs.Empty);
89    }
90    #endregion
91
92    protected override void Run(CancellationToken cancellationToken) {
93      this.cancellationToken = cancellationToken;
94      Run(ExecutionStack);
95    }
96
97    private void Run(object state) {
98      Stack<IOperation> executionStack = (Stack<IOperation>)state;
99      IOperation next;
100      OperationCollection coll;
101      IAtomicOperation operation;
102      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
103
104      this.OnlinePlugins = ServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.IsLocal == false).ToList();
105      this.AlreadyUploadedPlugins = new List<Plugin>();
106
107      while (ExecutionStack.Count > 0) {
108        cancellationToken.ThrowIfCancellationRequested();
109
110        next = ExecutionStack.Pop();
111        if (next is OperationCollection) {
112          coll = (OperationCollection)next;
113          if (coll.Parallel) {
114            // clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
115            IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
116            parentScopeClone.SubScopes.Clear();
117            parentScopeClone.ClearParentScopes();
118
119            EngineJob[] jobs = new EngineJob[coll.Count];
120            for (int i = 0; i < coll.Count; i++) {
121              jobs[i] = new EngineJob(coll[i], new SequentialEngine.SequentialEngine());
122            }
123
124            IScope[] scopes = ExecuteOnHive(jobs, parentScopeClone, cancellationToken);
125            //IScope[] scopes = ExecuteLocally(jobs, parentScopeClone, cancellationToken);
126           
127            for (int i = 0; i < coll.Count; i++) {
128              if (coll[i] is IAtomicOperation) {
129                ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
130              } else if (coll[i] is OperationCollection) {
131                // todo ??
132              }
133            }
134          } else {
135            for (int i = coll.Count - 1; i >= 0; i--)
136              if (coll[i] != null) executionStack.Push(coll[i]);
137          }
138        } else if (next is IAtomicOperation) {
139          operation = (IAtomicOperation)next;
140          try {
141            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
142          }
143          catch (Exception ex) {
144            ExecutionStack.Push(operation);
145            if (ex is OperationCanceledException) throw ex;
146            else throw new OperatorExecutionException(operation.Operator, ex);
147          }
148          if (next != null) ExecutionStack.Push(next);
149
150          if (operation.Operator.Breakpoint) {
151            LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
152            Pause();
153          }
154        }
155      }
156    }
157
158    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
159      e.SetObserved(); // avoid crash of process
160    }
161
162    private IRandom FindRandomParameter(IExecutionContext ec) {
163      try {
164        if (ec == null)
165          return null;
166
167        foreach (var p in ec.Parameters) {
168          if (p.Name == "Random" && p is IValueParameter)
169            return ((IValueParameter)p).Value as IRandom;
170        }
171        return FindRandomParameter(ec.Parent);
172      }
173      catch { return null; }
174    }
175
176    private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
177      ExchangeScope(source.Scope, target.Scope);
178    }
179
180    private static void ExchangeScope(IScope source, IScope target) {
181      target.Variables.Clear();
182      target.Variables.AddRange(source.Variables);
183      target.SubScopes.Clear();
184      target.SubScopes.AddRange(source.SubScopes);
185      // TODO: validate if parent scopes match - otherwise source is invalid
186    }
187
188    private IScope[] ExecuteLocally(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
189      IScope[] scopes = new Scope[jobs.Length];
190
191      for (int i = 0; i < jobs.Length; i++) {
192        var job = (EngineJob)jobs[i].Clone();
193        job.Start();
194        while (job.ExecutionState != ExecutionState.Stopped) {
195          Thread.Sleep(100);
196        }
197        scopes[i] = ((IAtomicOperation)job.InitialOperation).Scope;
198      }
199
200      return scopes;
201    }
202
203    /// <summary>
204    /// This method blocks until all jobs are finished
205    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
206    /// </summary>
207    /// <param name="jobs"></param>
208    private IScope[] ExecuteOnHive(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
209      LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
210      IScope[] scopes = new Scope[jobs.Length];
211      object locker = new object();
212      IDictionary<Guid, int> jobIndices = new Dictionary<Guid, int>();
213
214      try {
215        List<Guid> remainingJobIds = new List<Guid>();
216        List<LightweightJob> lightweightJobs;
217
218        int finishedCount = 0;
219        int uploadCount = 0;
220
221        // create upload-tasks
222        var uploadTasks = new List<Task<Job>>();
223        for (int i = 0; i < jobs.Length; i++) {
224          var job = jobs[i];
225
226          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
227          IRandom random = FindRandomParameter(job.InitialOperation as IExecutionContext);
228          if (random != null)
229            random.Reset(random.Next());
230
231          uploadTasks.Add(Task.Factory.StartNew<Job>((keyValuePairObj) => {
232            return UploadJob(keyValuePairObj, parentScopeClone, cancellationToken, GetResourceIds());
233          }, new KeyValuePair<int, EngineJob>(i, job), cancellationToken));
234        }
235
236        Task processUploadedJobsTask = new Task(() => {
237          // process finished upload-tasks
238          int uploadTasksCount = uploadTasks.Count;
239          for (int i = 0; i < uploadTasksCount; i++) {
240            cancellationToken.ThrowIfCancellationRequested();
241
242            var uploadTasksArray = uploadTasks.ToArray();
243            var task = uploadTasksArray[Task.WaitAny(uploadTasksArray)];
244            if (task.Status == TaskStatus.Faulted) {
245              LogException(task.Exception);
246              throw task.Exception;
247            }
248
249            int key = ((KeyValuePair<int, EngineJob>)task.AsyncState).Key;
250            Job job = task.Result;
251            lock (locker) {
252              uploadCount++;
253              jobIndices.Add(job.Id, key);
254              remainingJobIds.Add(job.Id);
255            }
256            jobs[key] = null; // relax memory
257            LogMessage(string.Format("Uploaded job #{0}", key + 1, job.Id));
258            uploadTasks.Remove(task);
259          }
260        }, cancellationToken, TaskCreationOptions.PreferFairness);
261        processUploadedJobsTask.Start();
262
263        // poll job-statuses and create tasks for those which are finished
264        var downloadTasks = new List<Task<EngineJob>>();
265        var executionTimes = new List<TimeSpan>();
266        var executionTimeOnHiveBefore = executionTimeOnHive;
267        while (processUploadedJobsTask.Status != TaskStatus.RanToCompletion || remainingJobIds.Count > 0) {
268          cancellationToken.ThrowIfCancellationRequested();
269
270          Thread.Sleep(10000);
271          try {
272            lightweightJobs = ServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobs(remainingJobIds));
273
274            var jobsFinished = lightweightJobs.Where(j => j.State == JobState.Finished || j.State == JobState.Failed || j.State == JobState.Aborted);
275            finishedCount += jobsFinished.Count();
276            if (jobsFinished.Count() > 0) LogMessage(string.Format("Finished: {0}/{1}", finishedCount, jobs.Length));
277            ExecutionTimeOnHive = executionTimeOnHiveBefore + executionTimes.Sum() + lightweightJobs.Select(x => x.ExecutionTime.HasValue ? x.ExecutionTime.Value : TimeSpan.Zero).Sum();
278
279            foreach (var result in jobsFinished) {
280              if (result.State == JobState.Finished) {
281                downloadTasks.Add(Task.Factory.StartNew<EngineJob>((jobIdObj) => {
282                  return DownloadJob(jobIndices, jobIdObj, cancellationToken);
283                }, result.Id, cancellationToken));
284              } else if (result.State == JobState.Aborted) {
285                LogMessage(string.Format("Job #{0} aborted (id: {1})", jobIndices[result.Id] + 1, result.Id));
286              } else if (result.State == JobState.Failed) {
287                LogMessage(string.Format("Job #{0} failed (id: {1}): {2}", jobIndices[result.Id] + 1, result.Id, result.CurrentStateLog != null ? result.CurrentStateLog.Exception : string.Empty));
288              }
289              remainingJobIds.Remove(result.Id);
290              executionTimes.Add(result.ExecutionTime.HasValue ? result.ExecutionTime.Value : TimeSpan.Zero);
291            }
292          }
293          catch (Exception e) {
294            LogException(e);
295          }
296        }
297
298        // process finished download-tasks
299        int downloadTasksCount = downloadTasks.Count;
300        for (int i = 0; i < downloadTasksCount; i++) {
301          cancellationToken.ThrowIfCancellationRequested();
302
303          var downloadTasksArray = downloadTasks.ToArray();
304          var task = downloadTasksArray[Task.WaitAny(downloadTasksArray)];
305          var jobId = (Guid)task.AsyncState;
306          if (task.Status == TaskStatus.Faulted) {
307            LogException(task.Exception);
308            throw task.Exception;
309          }
310          scopes[jobIndices[(Guid)task.AsyncState]] = ((IAtomicOperation)task.Result.InitialOperation).Scope;
311          downloadTasks.Remove(task);
312        }
313
314        LogMessage(string.Format("All jobs finished (TotalExecutionTime: {0}).", executionTimes.Sum()));
315        DeleteJobs(jobIndices);
316
317        return scopes;
318      }
319      catch (OperationCanceledException e) {
320        lock (locker) {
321          if (jobIndices != null) DeleteJobs(jobIndices);
322        }
323        throw e;
324      }
325      catch (Exception e) {
326        lock (locker) {
327          if (jobIndices != null) DeleteJobs(jobIndices);
328        }
329        LogException(e);
330        throw e;
331      }
332    }
333
334    private void DeleteJobs(IDictionary<Guid, int> jobIndices) {
335      if (jobIndices.Count > 0) {
336        TryAndRepeat(() => {
337          LogMessage(string.Format("Deleting {0} jobs on hive.", jobIndices.Count));
338          ServiceLocator.Instance.CallHiveService(service => {
339            foreach (Guid jobId in jobIndices.Keys) {
340              service.DeleteJob(jobId);
341            }
342            jobIndices.Clear();
343          });
344        }, 5, string.Format("Could not delete {0} jobs", jobIndices.Count));
345      }
346    }
347
348    private static object locker = new object();
349    private Job UploadJob(object keyValuePairObj, IScope parentScopeClone, CancellationToken cancellationToken, List<Guid> resourceIds) {
350      var keyValuePair = (KeyValuePair<int, EngineJob>)keyValuePairObj;
351      Job job = new Job();
352
353      try {
354        maxSerializedJobsInMemory.WaitOne();
355        JobData jobData = new JobData();
356        IEnumerable<Type> usedTypes;
357
358        // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems
359        lock (locker) {
360          ((IAtomicOperation)keyValuePair.Value.InitialOperation).Scope.Parent = parentScopeClone;
361          keyValuePair.Value.InitialOperation = (IOperation)keyValuePair.Value.InitialOperation.Clone();
362          if (keyValuePair.Value.InitialOperation is IAtomicOperation)
363            ((IAtomicOperation)keyValuePair.Value.InitialOperation).Scope.ClearParentScopes();
364          jobData.Data = PersistenceUtil.Serialize(keyValuePair.Value, out usedTypes);
365        }
366        var neededPlugins = new List<IPluginDescription>();
367        PluginUtil.CollectDeclaringPlugins(neededPlugins, usedTypes);
368
369        job.CoresNeeded = 1;
370        job.PluginsNeededIds = ServiceLocator.Instance.CallHiveService(s => PluginUtil.GetPluginDependencies(s, this.OnlinePlugins, this.AlreadyUploadedPlugins, neededPlugins, false));
371        job.Priority = priority;
372
373        try {
374          maxConcurrentConnections.WaitOne();
375          while (job.Id == Guid.Empty) { // repeat until success
376            cancellationToken.ThrowIfCancellationRequested();
377            try {
378              job.Id = ServiceLocator.Instance.CallHiveService(s => s.AddJob(job, jobData, resourceIds));
379            }
380            catch (Exception e) {
381              LogException(e);
382              LogMessage("Repeating upload");
383            }
384          }
385        }
386        finally {
387          maxConcurrentConnections.Release();
388        }
389      }
390      finally {
391        maxSerializedJobsInMemory.Release();
392      }
393      return job;
394    }
395
396    private List<Guid> GetResourceIds() {
397      return ServiceLocator.Instance.CallHiveService(service => {
398        var resourceNames = ResourceIds.Split(';');
399        var resourceIds = new List<Guid>();
400        foreach (var resourceName in resourceNames) {
401          Guid resourceId = service.GetResourceId(resourceName);
402          if (resourceId == Guid.Empty) {
403            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
404          }
405          resourceIds.Add(resourceId);
406        }
407        return resourceIds;
408      });
409    }
410
411    private EngineJob DownloadJob(IDictionary<Guid, int> jobIndices, object jobIdObj, CancellationToken cancellationToken) {
412      Guid jobId = (Guid)jobIdObj;
413      JobData jobData = null;
414      EngineJob engineJob = null;
415      try {
416        maxSerializedJobsInMemory.WaitOne();
417        maxConcurrentConnections.WaitOne();
418        while (jobData == null) { // repeat until success
419          cancellationToken.ThrowIfCancellationRequested();
420          try {
421            jobData = ServiceLocator.Instance.CallHiveService(s => s.GetJobData(jobId));
422          }
423          catch (Exception e) {
424            LogException(e);
425            LogMessage("Repeating download");
426          }
427        }
428        engineJob = PersistenceUtil.Deserialize<EngineJob>(jobData.Data);
429        jobData = null;
430        LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId));
431      }
432      finally {
433        maxConcurrentConnections.Release();
434        maxSerializedJobsInMemory.Release();
435      }
436      return engineJob;
437    }
438
439    /// <summary>
440    /// Threadsafe message logging
441    /// </summary>
442    private void LogMessage(string message) {
443      lock (Log) {
444        Log.LogMessage(message);
445      }
446    }
447
448    /// <summary>
449    /// Threadsafe exception logging
450    /// </summary>
451    private void LogException(Exception exception) {
452      lock (Log) {
453        Log.LogException(exception);
454      }
455    }
456
457    /// <summary>
458    /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
459    /// If repetitions is -1, it is repeated infinitely.
460    /// </summary>
461    private static void TryAndRepeat(Action action, int repetitions, string errorMessage) {
462      try { action(); }
463      catch (Exception e) {
464        repetitions--;
465        if (repetitions <= 0)
466          throw new HiveEngineException(errorMessage, e);
467        TryAndRepeat(action, repetitions, errorMessage);
468      }
469    }
470  }
471
472  public static class ScopeExtensions {
473    public static void ClearParentScopes(this IScope scope) {
474      scope.ClearParentScopes(null);
475    }
476
477    public static void ClearParentScopes(this IScope scope, IScope childScope) {
478      if (childScope != null) {
479        scope.SubScopes.Clear();
480        scope.SubScopes.Add(childScope);
481      }
482      if (scope.Parent != null)
483        scope.Parent.ClearParentScopes(scope);
484    }
485  }
486
487  public static class EnumerableExtensions {
488    public static TimeSpan Sum(this IEnumerable<TimeSpan> times) {
489      return TimeSpan.FromMilliseconds(times.Select(e => e.TotalMilliseconds).Sum());
490    }
491  }
492}
Note: See TracBrowser for help on using the repository browser.