Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs @ 5399

Last change on this file since 5399 was 5399, checked in by cneumuel, 13 years ago

#1260

  • fixed wiring of textboxes in HiveExperimentManager
  • robustified results polling in HiveExperimentManager
  • robustified HiveEngine
File size: 18.6 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
6using HeuristicLab.Core;
7using HeuristicLab.Common;
8using HeuristicLab.Hive.Contracts.Interfaces;
9using HeuristicLab.Clients.Common;
10using HeuristicLab.Hive.ExperimentManager;
11using HeuristicLab.Hive.Contracts.BusinessObjects;
12using HeuristicLab.PluginInfrastructure;
13using HeuristicLab.Hive.Contracts.ResponseObjects;
14using System.Threading;
15using HeuristicLab.Random;
16using System.Threading.Tasks;
17
18namespace HeuristicLab.HiveEngine {
19  /// <summary>
20  /// Represents an engine that executes operations which can be executed in parallel on the hive
21  /// </summary>
22  [StorableClass]
23  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
24  public class HiveEngine : Engine {
25    private Semaphore maxConcurrentConnections = new Semaphore(4, 4); // avoid too many connections
26    private Semaphore maxSerializedJobsInMemory = new Semaphore(4, 4); // avoid memory problems
27    private CancellationToken cancellationToken;
28
29    [Storable]
30    private IOperator currentOperator;
31
32    [Storable]
33    public string ResourceIds { get; set; }
34
35    [Storable]
36    private int priority;
37    public int Priority {
38      get { return priority; }
39      set { priority = value; }
40    }
41
42    [Storable]
43    private TimeSpan executionTimeOnHive;
44    public TimeSpan ExecutionTimeOnHive {
45      get { return executionTimeOnHive; }
46      set {
47        if (value != executionTimeOnHive) {
48          executionTimeOnHive = value;
49          OnExecutionTimeOnHiveChanged();
50        }
51      }
52    }
53
54    #region constructors and cloning
55    public HiveEngine() {
56      ResourceIds = "HEAL";
57    }
58
59    [StorableConstructor]
60    protected HiveEngine(bool deserializing) : base(deserializing) { }
61    protected HiveEngine(HiveEngine original, Cloner cloner)
62      : base(original, cloner) {
63      this.ResourceIds = original.ResourceIds;
64      this.currentOperator = cloner.Clone(original.currentOperator);
65      this.priority = original.priority;
66      this.executionTimeOnHive = original.executionTimeOnHive;
67    }
68    public override IDeepCloneable Clone(Cloner cloner) {
69      return new HiveEngine(this, cloner);
70    }
71    #endregion
72
73    #region Events
74    protected override void OnPrepared() {
75      base.OnPrepared();
76      this.ExecutionTimeOnHive = TimeSpan.Zero;
77    }
78
79    public event EventHandler ExecutionTimeOnHiveChanged;
80    protected virtual void OnExecutionTimeOnHiveChanged() {
81      var handler = ExecutionTimeOnHiveChanged;
82      if (handler != null) handler(this, EventArgs.Empty);
83    }
84    #endregion
85
86    protected override void Run(CancellationToken cancellationToken) {
87      this.cancellationToken = cancellationToken;
88      Run(ExecutionStack);
89    }
90
91    private void Run(object state) {
92      Stack<IOperation> executionStack = (Stack<IOperation>)state;
93      IOperation next;
94      OperationCollection coll;
95      IAtomicOperation operation;
96      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
97
98      while (ExecutionStack.Count > 0) {
99        cancellationToken.ThrowIfCancellationRequested();
100
101        next = ExecutionStack.Pop();
102        if (next is OperationCollection) {
103          coll = (OperationCollection)next;
104          if (coll.Parallel) {
105            // clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
106            IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
107            parentScopeClone.SubScopes.Clear();
108            parentScopeClone.ClearParentScopes();
109
110            OperationJob[] jobs = new OperationJob[coll.Count];
111            for (int i = 0; i < coll.Count; i++) {
112              jobs[i] = new OperationJob(coll[i]);
113            }
114
115            IScope[] scopes = ExecuteOnHive(jobs, parentScopeClone, cancellationToken);
116
117            for (int i = 0; i < coll.Count; i++) {
118              if (coll[i] is IAtomicOperation) {
119                ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
120              } else if (coll[i] is OperationCollection) {
121                // todo ??
122              }
123            }
124          } else {
125            for (int i = coll.Count - 1; i >= 0; i--)
126              if (coll[i] != null) executionStack.Push(coll[i]);
127          }
128        } else if (next is IAtomicOperation) {
129          operation = (IAtomicOperation)next;
130          try {
131            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
132          }
133          catch (Exception ex) {
134            ExecutionStack.Push(operation);
135            if (ex is OperationCanceledException) throw ex;
136            else throw new OperatorExecutionException(operation.Operator, ex);
137          }
138          if (next != null) ExecutionStack.Push(next);
139
140          if (operation.Operator.Breakpoint) {
141            LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
142            Pause();
143          }
144        }
145      }
146    }
147
148    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
149      e.SetObserved(); // avoid crash of process
150    }
151
152    private IRandom FindRandomParameter(IExecutionContext ec) {
153      try {
154        if (ec == null)
155          return null;
156
157        foreach (var p in ec.Parameters) {
158          if (p.Name == "Random" && p is IValueParameter)
159            return ((IValueParameter)p).Value as IRandom;
160        }
161        return FindRandomParameter(ec.Parent);
162      }
163      catch { return null; }
164    }
165
166    private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
167      ExchangeScope(source.Scope, target.Scope);
168    }
169
170    private static void ExchangeScope(IScope source, IScope target) {
171      target.Variables.Clear();
172      target.Variables.AddRange(source.Variables);
173      target.SubScopes.Clear();
174      target.SubScopes.AddRange(source.SubScopes);
175      // TODO: validate if parent scopes match - otherwise source is invalid
176    }
177
178    /// <summary>
179    /// This method blocks until all jobs are finished
180    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
181    /// </summary>
182    /// <param name="jobs"></param>
183    private IScope[] ExecuteOnHive(OperationJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
184      LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
185      IScope[] scopes = new Scope[jobs.Length];
186      object locker = new object();
187      IDictionary<Guid, int> jobIndices = new Dictionary<Guid, int>();
188
189      try {
190        List<Guid> remainingJobIds = new List<Guid>();
191        JobResultList results;
192        var pluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList();
193        int finishedCount = 0;
194        int uploadCount = 0;
195
196        // create upload-tasks
197        var uploadTasks = new List<Task<JobDto>>();
198        for (int i = 0; i < jobs.Length; i++) {
199          var job = jobs[i];
200
201          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
202          IRandom random = FindRandomParameter(job.Operation as IExecutionContext);
203          if (random != null)
204            random.Reset(random.Next());
205
206          uploadTasks.Add(Task.Factory.StartNew<JobDto>((keyValuePairObj) => {
207            return UploadJob(pluginsNeeded, keyValuePairObj, parentScopeClone, cancellationToken);
208          }, new KeyValuePair<int, OperationJob>(i, job), cancellationToken));
209        }
210
211        Task processUploadedJobsTask = new Task(() => {
212          // process finished upload-tasks
213          int uploadTasksCount = uploadTasks.Count;
214          for (int i = 0; i < uploadTasksCount; i++) {
215            cancellationToken.ThrowIfCancellationRequested();
216
217            var uploadTasksArray = uploadTasks.ToArray();
218            var task = uploadTasksArray[Task.WaitAny(uploadTasksArray)];
219            if (task.Status == TaskStatus.Faulted) {
220              LogException(task.Exception);
221              throw task.Exception;
222            }
223
224            int key = ((KeyValuePair<int, OperationJob>)task.AsyncState).Key;
225            JobDto jobDto = task.Result;
226            lock (locker) {
227              uploadCount++;
228              jobIndices.Add(jobDto.Id, key);
229              remainingJobIds.Add(jobDto.Id);
230            }
231            jobs[key] = null; // relax memory
232            LogMessage(string.Format("Uploaded job #{0}", key + 1, jobDto.Id));
233            uploadTasks.Remove(task);
234          }
235        }, cancellationToken, TaskCreationOptions.PreferFairness);
236        processUploadedJobsTask.Start();
237
238        // poll job-statuses and create tasks for those which are finished
239        var downloadTasks = new List<Task<OperationJob>>();
240        var executionTimes = new List<TimeSpan>();
241        var executionTimeOnHiveBefore = executionTimeOnHive;
242        while (processUploadedJobsTask.Status != TaskStatus.RanToCompletion || remainingJobIds.Count > 0) {
243          cancellationToken.ThrowIfCancellationRequested();
244
245          Thread.Sleep(10000);
246          try {
247            using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
248              results = service.Obj.GetJobResults(remainingJobIds).Obj;
249            }
250            var jobsFinished = results.Where(j => j.State == JobState.Finished || j.State == JobState.Failed || j.State == JobState.Aborted);
251            finishedCount += jobsFinished.Count();
252            if (jobsFinished.Count() > 0) LogMessage(string.Format("Finished: {0}/{1}", finishedCount, jobs.Length));
253            ExecutionTimeOnHive = executionTimeOnHiveBefore + executionTimes.Sum() + results.Select(x => x.ExecutionTime).Sum();
254
255            foreach (var result in jobsFinished) {
256              if (result.State == JobState.Finished) {
257                downloadTasks.Add(Task.Factory.StartNew<OperationJob>((jobIdObj) => {
258                  return DownloadJob(jobIndices, jobIdObj, cancellationToken);
259                }, result.Id, cancellationToken));
260              } else if (result.State == JobState.Aborted) {
261                LogMessage(string.Format("Job #{0} aborted (id: {1})", jobIndices[result.Id] + 1, result.Id));
262              } else if (result.State == JobState.Failed) {
263                LogMessage(string.Format("Job #{0} failed (id: {1}): {2}", jobIndices[result.Id] + 1, result.Id, result.Exception));
264              }
265              remainingJobIds.Remove(result.Id);
266              executionTimes.Add(result.ExecutionTime);
267            }
268          }
269          catch (Exception e) {
270            LogException(e);
271          }
272        }
273
274        // process finished download-tasks
275        int downloadTasksCount = downloadTasks.Count;
276        for (int i = 0; i < downloadTasksCount; i++) {
277          cancellationToken.ThrowIfCancellationRequested();
278
279          var downloadTasksArray = downloadTasks.ToArray();
280          var task = downloadTasksArray[Task.WaitAny(downloadTasksArray)];
281          var jobId = (Guid)task.AsyncState;
282          if (task.Status == TaskStatus.Faulted) {
283            LogException(task.Exception);
284            throw task.Exception;
285          }
286          scopes[jobIndices[(Guid)task.AsyncState]] = ((IAtomicOperation)task.Result.Operation).Scope;
287          downloadTasks.Remove(task);
288        }
289
290        LogMessage(string.Format("All jobs finished (TotalExecutionTime: {0}).", executionTimes.Sum()));
291        DeleteJobs(jobIndices);
292
293        return scopes;
294      }
295      catch (OperationCanceledException e) {
296        lock (locker) {
297          if (jobIndices != null) DeleteJobs(jobIndices);
298        }
299        throw e;
300      }
301      catch (Exception e) {
302        lock (locker) {
303          if (jobIndices != null) DeleteJobs(jobIndices);
304        }
305        LogException(e);
306        throw e;
307      }
308    }
309
310    private void DeleteJobs(IDictionary<Guid, int> jobIndices) {
311      if (jobIndices.Count > 0) {
312        TryAndRepeat(() => {
313          LogMessage(string.Format("Deleting {0} jobs on hive.", jobIndices.Count));
314          using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
315            foreach (Guid jobId in jobIndices.Keys) {
316              service.Obj.DeleteJob(jobId);
317            }
318            jobIndices.Clear();
319          }
320        }, 5, string.Format("Could not delete {0} jobs", jobIndices.Count));
321      }
322    }
323
324    private static object locker = new object();
325    private JobDto UploadJob(List<HivePluginInfoDto> pluginsNeeded, object keyValuePairObj, IScope parentScopeClone, CancellationToken cancellationToken) {
326      var keyValuePair = (KeyValuePair<int, OperationJob>)keyValuePairObj;
327      var groups = ResourceIds.Split(';');
328      ResponseObject<JobDto> response = null;
329      try {
330        maxSerializedJobsInMemory.WaitOne();
331        SerializedJob serializedJob = null;
332        while (serializedJob == null) { // repeat until success; rare race-conditions occur at serializations (enumeration was changed-exceptions); maybe this is because all the parent-scopes and execution-contexts at some point contain the hiveengine and the Log in here
333          cancellationToken.ThrowIfCancellationRequested();
334          try {
335            lock (Log) {
336              serializedJob = new SerializedJob();
337            }
338          }
339          catch (Exception e) {
340            LogException(e);
341          }
342        }
343        // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems
344        lock (locker) {
345          ((IAtomicOperation)keyValuePair.Value.Operation).Scope.Parent = parentScopeClone;
346          keyValuePair.Value.Operation = (IOperation)keyValuePair.Value.Operation.Clone();
347          if (keyValuePair.Value.Operation is IAtomicOperation)
348            ((IAtomicOperation)keyValuePair.Value.Operation).Scope.ClearParentScopes();
349          serializedJob.SerializedJobData = SerializedJob.Serialize(keyValuePair.Value);
350        }
351        serializedJob.JobInfo = new JobDto();
352        serializedJob.JobInfo.State = JobState.Offline;
353        serializedJob.JobInfo.CoresNeeded = 1;
354        serializedJob.JobInfo.PluginsNeeded = pluginsNeeded;
355        serializedJob.JobInfo.Priority = priority;
356        try {
357          maxConcurrentConnections.WaitOne();
358          while (response == null) { // repeat until success
359            cancellationToken.ThrowIfCancellationRequested();
360            try {
361              using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
362                response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
363                serializedJob = null;
364              }
365            }
366            catch (Exception e) {
367              LogException(e);
368              LogMessage("Repeating upload");
369            }
370          }
371        }
372        finally {
373          maxConcurrentConnections.Release();
374        }
375      }
376      finally {
377        maxSerializedJobsInMemory.Release();
378      }
379      return response.Obj;
380    }
381
382    private OperationJob DownloadJob(IDictionary<Guid, int> jobIndices, object jobIdObj, CancellationToken cancellationToken) {
383      Guid jobId = (Guid)jobIdObj;
384      SerializedJob serializedJob = null;
385      OperationJob operationJob = null;
386      try {
387        maxSerializedJobsInMemory.WaitOne();
388        maxConcurrentConnections.WaitOne();
389        while (serializedJob == null) { // repeat until success
390          cancellationToken.ThrowIfCancellationRequested();
391          try {
392            using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
393              serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
394            }
395          }
396          catch (Exception e) {
397            LogException(e);
398            LogMessage("Repeating download");
399          }
400        }
401        operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData);
402        serializedJob = null;
403        LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId));
404      }
405      finally {
406        maxConcurrentConnections.Release();
407        maxSerializedJobsInMemory.Release();
408      }
409      return operationJob;
410    }
411
412    /// <summary>
413    /// Threadsafe message logging
414    /// </summary>
415    private void LogMessage(string message) {
416      lock (Log) {
417        Log.LogMessage(message);
418      }
419    }
420
421    /// <summary>
422    /// Threadsafe exception logging
423    /// </summary>
424    private void LogException(Exception exception) {
425      lock (Log) {
426        Log.LogException(exception);
427      }
428    }
429
430    /// <summary>
431    /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
432    /// If repetitions is -1, it is repeated infinitely.
433    /// </summary>
434    private static void TryAndRepeat(Action action, int repetitions, string errorMessage) {
435      while (repetitions > 0) {
436        try { action(); }
437        catch (Exception e) {
438          repetitions--;
439          if (repetitions == 0) {
440            throw new HiveEngineException(errorMessage, e);
441          }
442        }
443      }
444    }
445
446    private static void TryAndRepeat(Action action) {
447      TryAndRepeat(action, -1, string.Empty);
448    }
449  }
450
451  public static class ScopeExtensions {
452    public static void ClearParentScopes(this IScope scope) {
453      scope.ClearParentScopes(null);
454    }
455
456    public static void ClearParentScopes(this IScope scope, IScope childScope) {
457      if (childScope != null) {
458        scope.SubScopes.Clear();
459        scope.SubScopes.Add(childScope);
460      }
461      if (scope.Parent != null)
462        scope.Parent.ClearParentScopes(scope);
463    }
464  }
465
466  public static class EnumerableExtensions {
467    public static TimeSpan Sum(this IEnumerable<TimeSpan> times) {
468      return TimeSpan.FromMilliseconds(times.Select(e => e.TotalMilliseconds).Sum());
469    }
470  }
471}
Note: See TracBrowser for help on using the repository browser.