Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs @ 5707

Last change on this file since 5707 was 5707, checked in by cneumuel, 13 years ago

#1260

  • some changes due to the removal of Disposable (r5706)
  • copy PluginInfrastructure files into PluginCache folder in slaves (needed due to r5703)
File size: 18.6 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Threading;
5using System.Threading.Tasks;
6using HeuristicLab.Common;
7using HeuristicLab.Core;
8using HeuristicLab.Hive.Contracts;
9using HeuristicLab.Hive.Contracts.BusinessObjects;
10using HeuristicLab.Hive.Contracts.Interfaces;
11using HeuristicLab.Hive.Contracts.ResponseObjects;
12using HeuristicLab.Hive.ExperimentManager;
13using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
14using HeuristicLab.PluginInfrastructure;
15
16namespace HeuristicLab.HiveEngine {
17  /// <summary>
18  /// Represents an engine that executes operations which can be executed in parallel on the hive
19  /// </summary>
20  [StorableClass]
21  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
22  public class HiveEngine : Engine {
23    private Semaphore maxConcurrentConnections = new Semaphore(4, 4); // avoid too many connections
24    private Semaphore maxSerializedJobsInMemory = new Semaphore(4, 4); // avoid memory problems
25    private CancellationToken cancellationToken;
26
27    [Storable]
28    private IOperator currentOperator;
29
30    [Storable]
31    public string ResourceIds { get; set; }
32
33    [Storable]
34    private int priority;
35    public int Priority {
36      get { return priority; }
37      set { priority = value; }
38    }
39
40    [Storable]
41    private TimeSpan executionTimeOnHive;
42    public TimeSpan ExecutionTimeOnHive {
43      get { return executionTimeOnHive; }
44      set {
45        if (value != executionTimeOnHive) {
46          executionTimeOnHive = value;
47          OnExecutionTimeOnHiveChanged();
48        }
49      }
50    }
51
52    #region constructors and cloning
53    public HiveEngine() {
54      ResourceIds = "HEAL";
55    }
56
57    [StorableConstructor]
58    protected HiveEngine(bool deserializing) : base(deserializing) { }
59    protected HiveEngine(HiveEngine original, Cloner cloner)
60      : base(original, cloner) {
61      this.ResourceIds = original.ResourceIds;
62      this.currentOperator = cloner.Clone(original.currentOperator);
63      this.priority = original.priority;
64      this.executionTimeOnHive = original.executionTimeOnHive;
65    }
66    public override IDeepCloneable Clone(Cloner cloner) {
67      return new HiveEngine(this, cloner);
68    }
69    #endregion
70
71    #region Events
72    protected override void OnPrepared() {
73      base.OnPrepared();
74      this.ExecutionTimeOnHive = TimeSpan.Zero;
75    }
76
77    public event EventHandler ExecutionTimeOnHiveChanged;
78    protected virtual void OnExecutionTimeOnHiveChanged() {
79      var handler = ExecutionTimeOnHiveChanged;
80      if (handler != null) handler(this, EventArgs.Empty);
81    }
82    #endregion
83
84    protected override void Run(CancellationToken cancellationToken) {
85      this.cancellationToken = cancellationToken;
86      Run(ExecutionStack);
87    }
88
89    private void Run(object state) {
90      Stack<IOperation> executionStack = (Stack<IOperation>)state;
91      IOperation next;
92      OperationCollection coll;
93      IAtomicOperation operation;
94      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
95
96      while (ExecutionStack.Count > 0) {
97        cancellationToken.ThrowIfCancellationRequested();
98
99        next = ExecutionStack.Pop();
100        if (next is OperationCollection) {
101          coll = (OperationCollection)next;
102          if (coll.Parallel) {
103            // clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
104            IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
105            parentScopeClone.SubScopes.Clear();
106            parentScopeClone.ClearParentScopes();
107
108            OperationJob[] jobs = new OperationJob[coll.Count];
109            for (int i = 0; i < coll.Count; i++) {
110              jobs[i] = new OperationJob(coll[i]);
111            }
112
113            IScope[] scopes = ExecuteOnHive(jobs, parentScopeClone, cancellationToken);
114
115            for (int i = 0; i < coll.Count; i++) {
116              if (coll[i] is IAtomicOperation) {
117                ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
118              } else if (coll[i] is OperationCollection) {
119                // todo ??
120              }
121            }
122          } else {
123            for (int i = coll.Count - 1; i >= 0; i--)
124              if (coll[i] != null) executionStack.Push(coll[i]);
125          }
126        } else if (next is IAtomicOperation) {
127          operation = (IAtomicOperation)next;
128          try {
129            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
130          }
131          catch (Exception ex) {
132            ExecutionStack.Push(operation);
133            if (ex is OperationCanceledException) throw ex;
134            else throw new OperatorExecutionException(operation.Operator, ex);
135          }
136          if (next != null) ExecutionStack.Push(next);
137
138          if (operation.Operator.Breakpoint) {
139            LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
140            Pause();
141          }
142        }
143      }
144    }
145
146    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
147      e.SetObserved(); // avoid crash of process
148    }
149
150    private IRandom FindRandomParameter(IExecutionContext ec) {
151      try {
152        if (ec == null)
153          return null;
154
155        foreach (var p in ec.Parameters) {
156          if (p.Name == "Random" && p is IValueParameter)
157            return ((IValueParameter)p).Value as IRandom;
158        }
159        return FindRandomParameter(ec.Parent);
160      }
161      catch { return null; }
162    }
163
164    private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
165      ExchangeScope(source.Scope, target.Scope);
166    }
167
168    private static void ExchangeScope(IScope source, IScope target) {
169      target.Variables.Clear();
170      target.Variables.AddRange(source.Variables);
171      target.SubScopes.Clear();
172      target.SubScopes.AddRange(source.SubScopes);
173      // TODO: validate if parent scopes match - otherwise source is invalid
174    }
175
176    /// <summary>
177    /// This method blocks until all jobs are finished
178    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
179    /// </summary>
180    /// <param name="jobs"></param>
181    private IScope[] ExecuteOnHive(OperationJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
182      LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
183      IScope[] scopes = new Scope[jobs.Length];
184      object locker = new object();
185      IDictionary<Guid, int> jobIndices = new Dictionary<Guid, int>();
186
187      try {
188        List<Guid> remainingJobIds = new List<Guid>();
189        JobResultList results;
190        var pluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList();
191        int finishedCount = 0;
192        int uploadCount = 0;
193
194        // create upload-tasks
195        var uploadTasks = new List<Task<JobDto>>();
196        for (int i = 0; i < jobs.Length; i++) {
197          var job = jobs[i];
198
199          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
200          IRandom random = FindRandomParameter(job.Operation as IExecutionContext);
201          if (random != null)
202            random.Reset(random.Next());
203
204          uploadTasks.Add(Task.Factory.StartNew<JobDto>((keyValuePairObj) => {
205            return UploadJob(pluginsNeeded, keyValuePairObj, parentScopeClone, cancellationToken);
206          }, new KeyValuePair<int, OperationJob>(i, job), cancellationToken));
207        }
208
209        Task processUploadedJobsTask = new Task(() => {
210          // process finished upload-tasks
211          int uploadTasksCount = uploadTasks.Count;
212          for (int i = 0; i < uploadTasksCount; i++) {
213            cancellationToken.ThrowIfCancellationRequested();
214
215            var uploadTasksArray = uploadTasks.ToArray();
216            var task = uploadTasksArray[Task.WaitAny(uploadTasksArray)];
217            if (task.Status == TaskStatus.Faulted) {
218              LogException(task.Exception);
219              throw task.Exception;
220            }
221
222            int key = ((KeyValuePair<int, OperationJob>)task.AsyncState).Key;
223            JobDto jobDto = task.Result;
224            lock (locker) {
225              uploadCount++;
226              jobIndices.Add(jobDto.Id, key);
227              remainingJobIds.Add(jobDto.Id);
228            }
229            jobs[key] = null; // relax memory
230            LogMessage(string.Format("Uploaded job #{0}", key + 1, jobDto.Id));
231            uploadTasks.Remove(task);
232          }
233        }, cancellationToken, TaskCreationOptions.PreferFairness);
234        processUploadedJobsTask.Start();
235
236        // poll job-statuses and create tasks for those which are finished
237        var downloadTasks = new List<Task<OperationJob>>();
238        var executionTimes = new List<TimeSpan>();
239        var executionTimeOnHiveBefore = executionTimeOnHive;
240        while (processUploadedJobsTask.Status != TaskStatus.RanToCompletion || remainingJobIds.Count > 0) {
241          cancellationToken.ThrowIfCancellationRequested();
242
243          Thread.Sleep(10000);
244          try {
245            using (DisposableWrapper<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
246              results = service.Obj.GetJobResults(remainingJobIds).Obj;
247            }
248            var jobsFinished = results.Where(j => j.State == JobState.Finished || j.State == JobState.Failed || j.State == JobState.Aborted);
249            finishedCount += jobsFinished.Count();
250            if (jobsFinished.Count() > 0) LogMessage(string.Format("Finished: {0}/{1}", finishedCount, jobs.Length));
251            ExecutionTimeOnHive = executionTimeOnHiveBefore + executionTimes.Sum() + results.Select(x => x.ExecutionTime).Sum();
252
253            foreach (var result in jobsFinished) {
254              if (result.State == JobState.Finished) {
255                downloadTasks.Add(Task.Factory.StartNew<OperationJob>((jobIdObj) => {
256                  return DownloadJob(jobIndices, jobIdObj, cancellationToken);
257                }, result.Id, cancellationToken));
258              } else if (result.State == JobState.Aborted) {
259                LogMessage(string.Format("Job #{0} aborted (id: {1})", jobIndices[result.Id] + 1, result.Id));
260              } else if (result.State == JobState.Failed) {
261                LogMessage(string.Format("Job #{0} failed (id: {1}): {2}", jobIndices[result.Id] + 1, result.Id, result.Exception));
262              }
263              remainingJobIds.Remove(result.Id);
264              executionTimes.Add(result.ExecutionTime);
265            }
266          }
267          catch (Exception e) {
268            LogException(e);
269          }
270        }
271
272        // process finished download-tasks
273        int downloadTasksCount = downloadTasks.Count;
274        for (int i = 0; i < downloadTasksCount; i++) {
275          cancellationToken.ThrowIfCancellationRequested();
276
277          var downloadTasksArray = downloadTasks.ToArray();
278          var task = downloadTasksArray[Task.WaitAny(downloadTasksArray)];
279          var jobId = (Guid)task.AsyncState;
280          if (task.Status == TaskStatus.Faulted) {
281            LogException(task.Exception);
282            throw task.Exception;
283          }
284          scopes[jobIndices[(Guid)task.AsyncState]] = ((IAtomicOperation)task.Result.Operation).Scope;
285          downloadTasks.Remove(task);
286        }
287
288        LogMessage(string.Format("All jobs finished (TotalExecutionTime: {0}).", executionTimes.Sum()));
289        DeleteJobs(jobIndices);
290
291        return scopes;
292      }
293      catch (OperationCanceledException e) {
294        lock (locker) {
295          if (jobIndices != null) DeleteJobs(jobIndices);
296        }
297        throw e;
298      }
299      catch (Exception e) {
300        lock (locker) {
301          if (jobIndices != null) DeleteJobs(jobIndices);
302        }
303        LogException(e);
304        throw e;
305      }
306    }
307
308    private void DeleteJobs(IDictionary<Guid, int> jobIndices) {
309      if (jobIndices.Count > 0) {
310        TryAndRepeat(() => {
311          LogMessage(string.Format("Deleting {0} jobs on hive.", jobIndices.Count));
312          using (DisposableWrapper<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
313            foreach (Guid jobId in jobIndices.Keys) {
314              service.Obj.DeleteJob(jobId);
315            }
316            jobIndices.Clear();
317          }
318        }, 5, string.Format("Could not delete {0} jobs", jobIndices.Count));
319      }
320    }
321
322    private static object locker = new object();
323    private JobDto UploadJob(List<HivePluginInfoDto> pluginsNeeded, object keyValuePairObj, IScope parentScopeClone, CancellationToken cancellationToken) {
324      var keyValuePair = (KeyValuePair<int, OperationJob>)keyValuePairObj;
325      var groups = ResourceIds.Split(';');
326      ResponseObject<JobDto> response = null;
327      try {
328        maxSerializedJobsInMemory.WaitOne();
329        SerializedJob serializedJob = null;
330        while (serializedJob == null) { // repeat until success; rare race-conditions occur at serializations (enumeration was changed-exceptions); maybe this is because all the parent-scopes and execution-contexts at some point contain the hiveengine and the Log in here
331          cancellationToken.ThrowIfCancellationRequested();
332          try {
333            lock (Log) {
334              serializedJob = new SerializedJob();
335            }
336          }
337          catch (Exception e) {
338            LogException(e);
339          }
340        }
341        // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems
342        lock (locker) {
343          ((IAtomicOperation)keyValuePair.Value.Operation).Scope.Parent = parentScopeClone;
344          keyValuePair.Value.Operation = (IOperation)keyValuePair.Value.Operation.Clone();
345          if (keyValuePair.Value.Operation is IAtomicOperation)
346            ((IAtomicOperation)keyValuePair.Value.Operation).Scope.ClearParentScopes();
347          serializedJob.SerializedJobData = SerializedJob.Serialize(keyValuePair.Value);
348        }
349        serializedJob.JobInfo = new JobDto();
350        serializedJob.JobInfo.State = JobState.Offline;
351        serializedJob.JobInfo.CoresNeeded = 1;
352        serializedJob.JobInfo.PluginsNeeded = pluginsNeeded;
353        serializedJob.JobInfo.Priority = priority;
354        try {
355          maxConcurrentConnections.WaitOne();
356          while (response == null) { // repeat until success
357            cancellationToken.ThrowIfCancellationRequested();
358            try {
359              using (DisposableWrapper<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
360                response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
361                serializedJob = null;
362              }
363            }
364            catch (Exception e) {
365              LogException(e);
366              LogMessage("Repeating upload");
367            }
368          }
369        }
370        finally {
371          maxConcurrentConnections.Release();
372        }
373      }
374      finally {
375        maxSerializedJobsInMemory.Release();
376      }
377      return response.Obj;
378    }
379
380    private OperationJob DownloadJob(IDictionary<Guid, int> jobIndices, object jobIdObj, CancellationToken cancellationToken) {
381      Guid jobId = (Guid)jobIdObj;
382      SerializedJob serializedJob = null;
383      OperationJob operationJob = null;
384      try {
385        maxSerializedJobsInMemory.WaitOne();
386        maxConcurrentConnections.WaitOne();
387        while (serializedJob == null) { // repeat until success
388          cancellationToken.ThrowIfCancellationRequested();
389          try {
390            using (DisposableWrapper<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
391              serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
392            }
393          }
394          catch (Exception e) {
395            LogException(e);
396            LogMessage("Repeating download");
397          }
398        }
399        operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData);
400        serializedJob = null;
401        LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId));
402      }
403      finally {
404        maxConcurrentConnections.Release();
405        maxSerializedJobsInMemory.Release();
406      }
407      return operationJob;
408    }
409
410    /// <summary>
411    /// Threadsafe message logging
412    /// </summary>
413    private void LogMessage(string message) {
414      lock (Log) {
415        Log.LogMessage(message);
416      }
417    }
418
419    /// <summary>
420    /// Threadsafe exception logging
421    /// </summary>
422    private void LogException(Exception exception) {
423      lock (Log) {
424        Log.LogException(exception);
425      }
426    }
427
428    /// <summary>
429    /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
430    /// If repetitions is -1, it is repeated infinitely.
431    /// </summary>
432    private static void TryAndRepeat(Action action, int repetitions, string errorMessage) {
433      while (repetitions > 0) {
434        try { action(); }
435        catch (Exception e) {
436          repetitions--;
437          if (repetitions == 0) {
438            throw new HiveEngineException(errorMessage, e);
439          }
440        }
441      }
442    }
443
444    private static void TryAndRepeat(Action action) {
445      TryAndRepeat(action, -1, string.Empty);
446    }
447  }
448
449  public static class ScopeExtensions {
450    public static void ClearParentScopes(this IScope scope) {
451      scope.ClearParentScopes(null);
452    }
453
454    public static void ClearParentScopes(this IScope scope, IScope childScope) {
455      if (childScope != null) {
456        scope.SubScopes.Clear();
457        scope.SubScopes.Add(childScope);
458      }
459      if (scope.Parent != null)
460        scope.Parent.ClearParentScopes(scope);
461    }
462  }
463
464  public static class EnumerableExtensions {
465    public static TimeSpan Sum(this IEnumerable<TimeSpan> times) {
466      return TimeSpan.FromMilliseconds(times.Select(e => e.TotalMilliseconds).Sum());
467    }
468  }
469}
Note: See TracBrowser for help on using the repository browser.