Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs @ 5329

Last change on this file since 5329 was 5329, checked in by cneumuel, 14 years ago

#1260

  • robustified HiveEngine and HiveJobDownloader (handling of unobserved exceptions from tasks)
File size: 16.5 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
6using HeuristicLab.Core;
7using HeuristicLab.Common;
8using HeuristicLab.Hive.Contracts.Interfaces;
9using HeuristicLab.Clients.Common;
10using HeuristicLab.Hive.ExperimentManager;
11using HeuristicLab.Hive.Contracts.BusinessObjects;
12using HeuristicLab.PluginInfrastructure;
13using HeuristicLab.Hive.Contracts.ResponseObjects;
14using System.Threading;
15using HeuristicLab.Random;
16using System.Threading.Tasks;
17
18namespace HeuristicLab.HiveEngine {
19  /// <summary>
20  /// Represents an engine that executes operations which can be executed in parallel on the hive
21  /// </summary>
22  [StorableClass]
23  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
24  public class HiveEngine : Engine {
25    private Semaphore maxConcurrentConnections = new Semaphore(4, 4); // avoid too many connections
26    private Semaphore maxSerializedJobsInMemory = new Semaphore(4, 4); // avoid memory problems
27    private CancellationToken cancellationToken;
28
29    [Storable]
30    private IOperator currentOperator;
31
32    [Storable]
33    public string ResourceIds { get; set; }
34
35    [Storable]
36    private int priority;
37    public int Priority {
38      get { return priority; }
39      set { priority = value; }
40    }
41   
42    #region constructors and cloning
43    public HiveEngine() {
44      ResourceIds = "HEAL";
45    }
46    [StorableConstructor]
47    protected HiveEngine(bool deserializing) : base(deserializing) { }
48    protected HiveEngine(HiveEngine original, Cloner cloner)
49      : base(original, cloner) {
50      this.ResourceIds = original.ResourceIds;
51      this.currentOperator = cloner.Clone(original.currentOperator);
52      this.priority = original.priority;
53    }
54    public override IDeepCloneable Clone(Cloner cloner) {
55      return new HiveEngine(this, cloner);
56    }
57    #endregion
58
59    protected override void Run(CancellationToken cancellationToken) {
60      this.cancellationToken = cancellationToken;
61      Run(ExecutionStack);
62    }
63
64    private void Run(object state) {
65      Stack<IOperation> executionStack = (Stack<IOperation>)state;
66      IOperation next;
67      OperationCollection coll;
68      IAtomicOperation operation;
69      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
70
71      while (ExecutionStack.Count > 0) {
72        cancellationToken.ThrowIfCancellationRequested();
73
74        next = ExecutionStack.Pop();
75        if (next is OperationCollection) {
76          coll = (OperationCollection)next;
77          if (coll.Parallel) {
78            // clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
79            IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
80            parentScopeClone.SubScopes.Clear();
81            parentScopeClone.ClearParentScopes();
82
83            OperationJob[] jobs = new OperationJob[coll.Count];
84            for (int i = 0; i < coll.Count; i++) {
85              jobs[i] = new OperationJob(coll[i]);
86            }
87
88            IScope[] scopes = ExecuteOnHive(jobs, parentScopeClone, cancellationToken);
89
90            for (int i = 0; i < coll.Count; i++) {
91              if (coll[i] is IAtomicOperation) {
92                ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
93              } else if (coll[i] is OperationCollection) {
94                // todo ??
95              }
96            }
97          } else {
98            for (int i = coll.Count - 1; i >= 0; i--)
99              if (coll[i] != null) executionStack.Push(coll[i]);
100          }
101        } else if (next is IAtomicOperation) {
102          operation = (IAtomicOperation)next;
103          try {
104            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
105          }
106          catch (Exception ex) {
107            ExecutionStack.Push(operation);
108            if (ex is OperationCanceledException) throw ex;
109            else throw new OperatorExecutionException(operation.Operator, ex);
110          }
111          if (next != null) ExecutionStack.Push(next);
112
113          if (operation.Operator.Breakpoint) {
114            LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
115            Pause();
116          }
117        }
118      }
119    }
120
121    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
122      e.SetObserved(); // avoid crash of process
123    }
124
125    private IRandom FindRandomParameter(IExecutionContext ec) {
126      try {
127        if (ec == null)
128          return null;
129
130        foreach (var p in ec.Parameters) {
131          if (p.Name == "Random" && p is IValueParameter)
132            return ((IValueParameter)p).Value as IRandom;
133        }
134        return FindRandomParameter(ec.Parent);
135      }
136      catch { return null; }
137    }
138
139    private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
140      ExchangeScope(source.Scope, target.Scope);
141    }
142
143    private static void ExchangeScope(IScope source, IScope target) {
144      target.Variables.Clear();
145      target.Variables.AddRange(source.Variables);
146      target.SubScopes.Clear();
147      target.SubScopes.AddRange(source.SubScopes);
148      // TODO: validate if parent scopes match - otherwise source is invalid
149    }
150
151    /// <summary>
152    /// This method blocks until all jobs are finished
153    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
154    /// </summary>
155    /// <param name="jobs"></param>
156    private IScope[] ExecuteOnHive(OperationJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
157      LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
158      IScope[] scopes = new Scope[jobs.Length];
159      object locker = new object();
160
161      try {
162        IDictionary<Guid, int> jobIndices = new Dictionary<Guid, int>();
163        List<Guid> remainingJobIds = new List<Guid>();
164        JobResultList results;
165        var pluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList();
166        int finishedCount = 0;
167        int uploadCount = 0;
168
169        // create upload-tasks
170        var uploadTasks = new List<Task<JobDto>>();
171        for (int i = 0; i < jobs.Length; i++) {
172          var job = jobs[i];
173
174          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
175          IRandom random = FindRandomParameter(job.Operation as IExecutionContext);
176          if (random != null)
177            random.Reset(random.Next());
178
179          uploadTasks.Add(Task.Factory.StartNew<JobDto>((keyValuePairObj) => {
180            return UploadJob(pluginsNeeded, keyValuePairObj, parentScopeClone, cancellationToken);
181          }, new KeyValuePair<int, OperationJob>(i, job), cancellationToken));
182        }
183
184        Task processUploadedJobsTask = new Task(() => {
185          // process finished upload-tasks
186          int uploadTasksCount = uploadTasks.Count;
187          for (int i = 0; i < uploadTasksCount; i++) {
188            cancellationToken.ThrowIfCancellationRequested();
189
190            var uploadTasksArray = uploadTasks.ToArray();
191            var task = uploadTasksArray[Task.WaitAny(uploadTasksArray)];
192            if (task.Status == TaskStatus.Faulted) {
193              LogException(task.Exception);
194              throw task.Exception;
195            }
196
197            int key = ((KeyValuePair<int, OperationJob>)task.AsyncState).Key;
198            JobDto jobDto = task.Result;
199            lock (locker) {
200              uploadCount++;
201              jobIndices.Add(jobDto.Id, key);
202              remainingJobIds.Add(jobDto.Id);
203            }
204            jobs[key] = null; // relax memory
205            LogMessage(string.Format("Submitted job #{0}", key + 1, jobDto.Id));
206            uploadTasks.Remove(task);
207          }
208        }, cancellationToken, TaskCreationOptions.PreferFairness);
209        processUploadedJobsTask.Start();
210
211        // poll job-statuses and create tasks for those which are finished
212        var downloadTasks = new List<Task<OperationJob>>();
213        var executionTimes = new List<TimeSpan>();
214        while (processUploadedJobsTask.Status != TaskStatus.RanToCompletion || remainingJobIds.Count > 0) {
215          cancellationToken.ThrowIfCancellationRequested();
216
217          Thread.Sleep(10000);
218          try {
219            using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
220              results = service.Obj.GetJobResults(remainingJobIds).Obj;
221            }
222            var jobsFinished = results.Where(j => j.State == JobState.Finished || j.State == JobState.Failed || j.State == JobState.Aborted);
223            finishedCount += jobsFinished.Count();
224            var totalExecutionTime = TimeSpan.FromMilliseconds(results.Select(j => j.ExecutionTime).Union(executionTimes).Select(e => e.TotalMilliseconds).Sum());
225            LogMessage(string.Format("Results polled. Jobs finished: {0}/{1}, TotalExecutionTime: {2}", finishedCount, jobs.Length, totalExecutionTime));
226            foreach (var result in jobsFinished) {
227              if (result.State == JobState.Finished) {
228                downloadTasks.Add(Task.Factory.StartNew<OperationJob>((jobIdObj) => {
229                  return DownloadJob(jobIndices, jobIdObj, cancellationToken);
230                }, result.Id, cancellationToken));
231              } else if (result.State == JobState.Aborted) {
232                LogMessage(string.Format("Job #{0} aborted (id: {1})", jobIndices[result.Id] + 1, result.Id));
233              } else if (result.State == JobState.Failed) {
234                LogMessage(string.Format("Job {0} failed (id: {1}): {2}", jobIndices[result.Id] + 1, result.Id, result.Exception));
235              }
236              remainingJobIds.Remove(result.Id);
237              executionTimes.Add(result.ExecutionTime);
238            }
239          }
240          catch (Exception e) {
241            LogException(e);
242          }
243        }
244
245        // process finished download-tasks
246        int downloadTasksCount = downloadTasks.Count;
247        for (int i = 0; i < downloadTasksCount; i++) {
248          cancellationToken.ThrowIfCancellationRequested();
249
250          var downloadTasksArray = downloadTasks.ToArray();
251          var task = downloadTasksArray[Task.WaitAny(downloadTasksArray)];
252          var jobId = (Guid)task.AsyncState;
253          if (task.Status == TaskStatus.Faulted) {
254            LogException(task.Exception);
255            throw task.Exception;
256          }
257          scopes[jobIndices[(Guid)task.AsyncState]] = ((IAtomicOperation)task.Result.Operation).Scope;
258          downloadTasks.Remove(task);
259        }
260
261        LogMessage(string.Format("All jobs finished (TotalExecutionTime: {0}). Deleting jobs on hive.", TimeSpan.FromMilliseconds(executionTimes.Select(e => e.TotalMilliseconds).Sum())));
262        // delete jobs
263        using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
264          foreach (Guid jobId in jobIndices.Keys) {
265            service.Obj.DeleteJob(jobId);
266          }
267        }
268
269        LogMessage(string.Format("Operations on the hive finished.", jobs.Length));
270        return scopes;
271      }
272      catch (Exception e) {
273        LogException(e);
274        throw e;
275      }
276    }
277
278    private static object locker = new object();
279    private JobDto UploadJob(List<HivePluginInfoDto> pluginsNeeded, object keyValuePairObj, IScope parentScopeClone, CancellationToken cancellationToken) {
280      var keyValuePair = (KeyValuePair<int, OperationJob>)keyValuePairObj;
281      var groups = ResourceIds.Split(';');
282      ResponseObject<JobDto> response = null;
283      try {
284        maxSerializedJobsInMemory.WaitOne();
285        SerializedJob serializedJob = null;
286        while (serializedJob == null) { // repeat until success; rare race-conditions occur at serializations (enumeration was changed-exceptions); maybe this is because all the parent-scopes and execution-contexts at some point contain the hiveengine and the Log in here
287          cancellationToken.ThrowIfCancellationRequested();
288          try {
289            lock (Log) {
290              serializedJob = new SerializedJob();
291            }
292          }
293          catch (Exception e) {
294            LogException(e);
295          }
296        }
297        // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems
298        lock (locker) {
299          ((IAtomicOperation)keyValuePair.Value.Operation).Scope.Parent = parentScopeClone;
300          keyValuePair.Value.Operation = (IOperation)keyValuePair.Value.Operation.Clone();
301          if (keyValuePair.Value.Operation is IAtomicOperation)
302            ((IAtomicOperation)keyValuePair.Value.Operation).Scope.ClearParentScopes();
303          serializedJob.SerializedJobData = SerializedJob.Serialize(keyValuePair.Value);
304        }
305        serializedJob.JobInfo = new JobDto();
306        serializedJob.JobInfo.State = JobState.Offline;
307        serializedJob.JobInfo.CoresNeeded = 1;
308        serializedJob.JobInfo.PluginsNeeded = pluginsNeeded;
309        serializedJob.JobInfo.Priority = priority;
310        try {
311          maxConcurrentConnections.WaitOne();
312          while (response == null) { // repeat until success
313            cancellationToken.ThrowIfCancellationRequested();
314            try {
315              using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
316                response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
317                serializedJob = null;
318              }
319            }
320            catch (Exception e) {
321              LogException(e);
322            }
323          }
324        }
325        finally {
326          maxSerializedJobsInMemory.Release();
327        }
328      }
329      finally {
330        maxConcurrentConnections.Release();
331      }
332      return response.Obj;
333    }
334
335    private OperationJob DownloadJob(IDictionary<Guid, int> jobIndices, object jobIdObj, CancellationToken cancellationToken) {
336      Guid jobId = (Guid)jobIdObj;
337      SerializedJob serializedJob = null;
338      OperationJob operationJob = null;
339      try {
340        maxSerializedJobsInMemory.WaitOne();
341        maxConcurrentConnections.WaitOne();
342        while (serializedJob == null) { // repeat until success
343          cancellationToken.ThrowIfCancellationRequested();
344          try {
345            using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
346              serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
347            }
348          }
349          catch (Exception e) {
350            LogException(e);
351          }
352        }
353        operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData);
354        serializedJob = null;
355        LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId));
356      }
357      finally {
358        maxConcurrentConnections.Release();
359        maxSerializedJobsInMemory.Release();
360      }
361      return operationJob;
362    }
363
364    /// <summary>
365    /// Threadsafe message logging
366    /// </summary>
367    private void LogMessage(string message) {
368      lock (Log) {
369        Log.LogMessage(message);
370      }
371    }
372
373    /// <summary>
374    /// Threadsafe exception logging
375    /// </summary>
376    private void LogException(Exception exception) {
377      lock (Log) {
378        Log.LogException(exception);
379      }
380    }
381  }
382
383  public static class ScopeExtensions {
384    public static void ClearParentScopes(this IScope scope) {
385      scope.ClearParentScopes(null);
386    }
387
388    public static void ClearParentScopes(this IScope scope, IScope childScope) {
389      if (childScope != null) {
390        scope.SubScopes.Clear();
391        scope.SubScopes.Add(childScope);
392      }
393      if (scope.Parent != null)
394        scope.Parent.ClearParentScopes(scope);
395    }
396  }
397}
Note: See TracBrowser for help on using the repository browser.