Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.HiveEngine/3.4/HiveEngine.cs @ 6198

Last change on this file since 6198 was 6198, checked in by cneumuel, 13 years ago

#1233

  • small fixes for HiveEngine
File size: 16.9 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Threading;
5using System.Threading.Tasks;
6using HeuristicLab.Clients.Hive;
7using HeuristicLab.Common;
8using HeuristicLab.Core;
9using HeuristicLab.Hive;
10using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
11using HeuristicLab.PluginInfrastructure;
12
13namespace HeuristicLab.HiveEngine {
14  /// <summary>
15  /// Represents an engine that executes operations which can be executed in parallel on the hive
16  /// </summary>
17  [StorableClass]
18  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
19  public class HiveEngine : Engine {
20    private Semaphore maxConcurrentConnections = new Semaphore(4, 4); // avoid too many connections
21    private Semaphore maxSerializedJobsInMemory = new Semaphore(4, 4); // avoid memory problems
22    private CancellationToken cancellationToken;
23
24    [Storable]
25    private IOperator currentOperator;
26
27    [Storable]
28    public string ResourceNames { get; set; }
29
30    [Storable]
31    private int priority;
32    public int Priority {
33      get { return priority; }
34      set { priority = value; }
35    }
36
37    [Storable]
38    private TimeSpan executionTimeOnHive;
39    public TimeSpan ExecutionTimeOnHive {
40      get { return executionTimeOnHive; }
41      set {
42        if (value != executionTimeOnHive) {
43          executionTimeOnHive = value;
44          OnExecutionTimeOnHiveChanged();
45        }
46      }
47    }
48
49    [Storable]
50    private bool useLocalPlugins;
51    public bool UseLocalPlugins {
52      get { return useLocalPlugins; }
53      set { useLocalPlugins = value; }
54    }
55
56    [Storable]
57    private ItemCollection<RefreshableHiveExperiment> hiveExperiments;
58    public ItemCollection<RefreshableHiveExperiment> HiveExperiments {
59      get { return hiveExperiments; }
60      set { hiveExperiments = value; }
61    }
62
63    private List<Plugin> onlinePlugins;
64    public List<Plugin> OnlinePlugins {
65      get { return onlinePlugins; }
66      set { onlinePlugins = value; }
67    }
68
69    private List<Plugin> alreadyUploadedPlugins;
70    public List<Plugin> AlreadyUploadedPlugins {
71      get { return alreadyUploadedPlugins; }
72      set { alreadyUploadedPlugins = value; }
73    }
74
75    #region constructors and cloning
76    public HiveEngine() {
77      ResourceNames = "HEAL";
78      HiveExperiments = new ItemCollection<RefreshableHiveExperiment>();
79      Priority = 0;
80    }
81
82    [StorableConstructor]
83    protected HiveEngine(bool deserializing) : base(deserializing) { }
84    protected HiveEngine(HiveEngine original, Cloner cloner)
85      : base(original, cloner) {
86      this.ResourceNames = original.ResourceNames;
87      this.currentOperator = cloner.Clone(original.currentOperator);
88      this.priority = original.priority;
89      this.executionTimeOnHive = original.executionTimeOnHive;
90      this.useLocalPlugins = original.useLocalPlugins;
91    }
92    public override IDeepCloneable Clone(Cloner cloner) {
93      return new HiveEngine(this, cloner);
94    }
95    #endregion
96
97    #region Events
98    protected override void OnPrepared() {
99      base.OnPrepared();
100      this.ExecutionTimeOnHive = TimeSpan.Zero;
101    }
102
103    public event EventHandler ExecutionTimeOnHiveChanged;
104    protected virtual void OnExecutionTimeOnHiveChanged() {
105      var handler = ExecutionTimeOnHiveChanged;
106      if (handler != null) handler(this, EventArgs.Empty);
107    }
108    #endregion
109
110    protected override void Run(CancellationToken cancellationToken) {
111      this.cancellationToken = cancellationToken;
112      Run(ExecutionStack);
113    }
114
115    private void Run(object state) {
116      Stack<IOperation> executionStack = (Stack<IOperation>)state;
117      IOperation next;
118      OperationCollection coll;
119      IAtomicOperation operation;
120      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
121
122      this.OnlinePlugins = ServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.IsLocal == false).ToList();
123      this.AlreadyUploadedPlugins = new List<Plugin>();
124
125      while (ExecutionStack.Count > 0) {
126        cancellationToken.ThrowIfCancellationRequested();
127
128        next = ExecutionStack.Pop();
129        if (next is OperationCollection) {
130          coll = (OperationCollection)next;
131          if (coll.Parallel) {
132            // clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
133            IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
134            parentScopeClone.SubScopes.Clear();
135            parentScopeClone.ClearParentScopes();
136
137            EngineJob[] jobs = new EngineJob[coll.Count];
138            for (int i = 0; i < coll.Count; i++) {
139              jobs[i] = new EngineJob(coll[i], new SequentialEngine.SequentialEngine());
140            }
141
142            IScope[] scopes = ExecuteOnHive(jobs, parentScopeClone, cancellationToken);
143            //IScope[] scopes = ExecuteLocally(jobs, parentScopeClone, cancellationToken);
144
145            for (int i = 0; i < coll.Count; i++) {
146              if (coll[i] is IAtomicOperation) {
147                ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
148              } else if (coll[i] is OperationCollection) {
149                // todo ??
150              }
151            }
152          } else {
153            for (int i = coll.Count - 1; i >= 0; i--)
154              if (coll[i] != null) executionStack.Push(coll[i]);
155          }
156        } else if (next is IAtomicOperation) {
157          operation = (IAtomicOperation)next;
158          try {
159            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
160          }
161          catch (Exception ex) {
162            ExecutionStack.Push(operation);
163            if (ex is OperationCanceledException) throw ex;
164            else throw new OperatorExecutionException(operation.Operator, ex);
165          }
166          if (next != null) ExecutionStack.Push(next);
167
168          if (operation.Operator.Breakpoint) {
169            LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
170            Pause();
171          }
172        }
173      }
174    }
175
176    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
177      e.SetObserved(); // avoid crash of process
178    }
179
180    private IRandom FindRandomParameter(IExecutionContext ec) {
181      try {
182        if (ec == null)
183          return null;
184
185        foreach (var p in ec.Parameters) {
186          if (p.Name == "Random" && p is IValueParameter)
187            return ((IValueParameter)p).Value as IRandom;
188        }
189        return FindRandomParameter(ec.Parent);
190      }
191      catch { return null; }
192    }
193
194    private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
195      ExchangeScope(source.Scope, target.Scope);
196    }
197
198    private static void ExchangeScope(IScope source, IScope target) {
199      target.Variables.Clear();
200      target.Variables.AddRange(source.Variables);
201      target.SubScopes.Clear();
202      target.SubScopes.AddRange(source.SubScopes);
203      // TODO: validate if parent scopes match - otherwise source is invalid
204    }
205
206    // testfunction:
207    private IScope[] ExecuteLocally(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
208      IScope[] scopes = new Scope[jobs.Length];
209      for (int i = 0; i < jobs.Length; i++) {
210        var serialized = PersistenceUtil.Serialize(jobs[i]);
211        var deserialized = PersistenceUtil.Deserialize<IJob>(serialized);
212        deserialized.Start();
213        while (deserialized.ExecutionState != ExecutionState.Stopped) {
214          Thread.Sleep(100);
215        }
216        var serialized2 = PersistenceUtil.Serialize(deserialized);
217        var deserialized2 = PersistenceUtil.Deserialize<EngineJob>(serialized2);
218        var newScope = ((IAtomicOperation)deserialized2.InitialOperation).Scope;
219        scopes[i] = newScope;
220      }
221      return scopes;
222    }
223
224    /// <summary>
225    /// This method blocks until all jobs are finished
226    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
227    /// </summary>
228    /// <param name="jobs"></param>
229    private IScope[] ExecuteOnHive(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
230      LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
231      IScope[] scopes = new Scope[jobs.Length];
232      object locker = new object();
233      IDictionary<Guid, int> jobIndices = new Dictionary<Guid, int>();
234      var hiveExperiment = new HiveExperiment();
235
236      try {
237        List<Guid> remainingJobIds = new List<Guid>();
238
239        // create hive experiment
240        hiveExperiment.Name = "HiveEngine Run " + hiveExperiments.Count;
241        hiveExperiment.DateCreated = DateTime.Now;
242        hiveExperiment.UseLocalPlugins = this.UseLocalPlugins;
243        hiveExperiment.ResourceNames = this.ResourceNames;
244        var refreshableHiveExperiment = new RefreshableHiveExperiment(hiveExperiment);
245        refreshableHiveExperiment.IsControllable = false;
246        hiveExperiments.Add(refreshableHiveExperiment);
247
248        // create upload-tasks
249        var uploadTasks = new List<Task<Job>>();
250        for (int i = 0; i < jobs.Length; i++) {
251          hiveExperiment.HiveJobs.Add(new EngineHiveJob(jobs[i], parentScopeClone));
252
253          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
254          IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext);
255          if (random != null)
256            random.Reset(random.Next());
257        }
258        ExperimentManagerClient.StartExperiment((e) => { throw e; }, refreshableHiveExperiment);
259        // do polling until experiment is finished and all jobs are downloaded
260        while (!refreshableHiveExperiment.AllJobsFinished()) {
261          Thread.Sleep(500);
262          this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(hiveExperiments.Sum(x => x.HiveExperiment.ExecutionTime.TotalMilliseconds));
263          cancellationToken.ThrowIfCancellationRequested();
264        }
265        LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableHiveExperiment.ToString(), refreshableHiveExperiment.HiveExperiment.ExecutionTime));
266
267        // get scopes
268        int j = 0;
269        foreach (var hiveJob in hiveExperiment.HiveJobs) {
270          var scope = ((IAtomicOperation) ((EngineJob)hiveJob.ItemJob).InitialOperation).Scope;
271          scopes[j++] = scope;
272        }
273        refreshableHiveExperiment.RefreshAutomatically = false;
274        DeleteHiveExperiment(hiveExperiment.Id);
275        return scopes;
276      }
277      catch (OperationCanceledException e) {
278        lock (locker) {
279          if (jobIndices != null) DeleteHiveExperiment(hiveExperiment.Id);
280        }
281        throw e;
282      }
283      catch (Exception e) {
284        lock (locker) {
285          if (jobIndices != null) DeleteHiveExperiment(hiveExperiment.Id);
286        }
287        LogException(e);
288        throw e;
289      }
290    }
291
292    private void DeleteHiveExperiment(Guid hiveExperimentId) {
293      TryAndRepeat(() => {
294        ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(hiveExperimentId));
295      }, 5, string.Format("Could not delete jobs"));
296    }
297
298    private static object locker = new object();
299    private Job UploadJob(object keyValuePairObj, IScope parentScopeClone, CancellationToken cancellationToken, List<Guid> resourceIds, Guid hiveExperimentId) {
300      var keyValuePair = (KeyValuePair<int, EngineJob>)keyValuePairObj;
301      Job job = new Job();
302
303      try {
304        maxSerializedJobsInMemory.WaitOne();
305        JobData jobData = new JobData();
306        IEnumerable<Type> usedTypes;
307
308        // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems
309        lock (locker) {
310          ((IAtomicOperation)keyValuePair.Value.InitialOperation).Scope.Parent = parentScopeClone;
311          keyValuePair.Value.InitialOperation = (IOperation)keyValuePair.Value.InitialOperation.Clone();
312          if (keyValuePair.Value.InitialOperation is IAtomicOperation)
313            ((IAtomicOperation)keyValuePair.Value.InitialOperation).Scope.ClearParentScopes();
314          jobData.Data = PersistenceUtil.Serialize(keyValuePair.Value, out usedTypes);
315        }
316        var neededPlugins = new List<IPluginDescription>();
317
318        bool useAllLocalPlugins = true;
319        if (useAllLocalPlugins) {
320          // use all plugins
321          neededPlugins.AddRange(ApplicationManager.Manager.Plugins);
322        } else {
323          // use only
324          PluginUtil.CollectDeclaringPlugins(neededPlugins, usedTypes);
325        }
326
327        job.CoresNeeded = 1;
328        job.PluginsNeededIds = ServiceLocator.Instance.CallHiveService(s => PluginUtil.GetPluginDependencies(s, this.OnlinePlugins, this.AlreadyUploadedPlugins, neededPlugins, useLocalPlugins));
329        job.Priority = priority;
330        job.HiveExperimentId = hiveExperimentId;
331
332        try {
333          maxConcurrentConnections.WaitOne();
334          while (job.Id == Guid.Empty) { // repeat until success
335            cancellationToken.ThrowIfCancellationRequested();
336            try {
337              job.Id = ServiceLocator.Instance.CallHiveService(s => s.AddJob(job, jobData, resourceIds));
338            }
339            catch (Exception e) {
340              LogException(e);
341              LogMessage("Repeating upload");
342            }
343          }
344        }
345        finally {
346          maxConcurrentConnections.Release();
347        }
348      }
349      finally {
350        maxSerializedJobsInMemory.Release();
351      }
352      return job;
353    }
354
355    private List<Guid> GetResourceIds() {
356      return ServiceLocator.Instance.CallHiveService(service => {
357        var resourceNames = ResourceNames.Split(';');
358        var resourceIds = new List<Guid>();
359        foreach (var resourceName in resourceNames) {
360          Guid resourceId = service.GetResourceId(resourceName);
361          if (resourceId == Guid.Empty) {
362            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
363          }
364          resourceIds.Add(resourceId);
365        }
366        return resourceIds;
367      });
368    }
369
370    private EngineJob DownloadJob(IDictionary<Guid, int> jobIndices, object jobIdObj, CancellationToken cancellationToken) {
371      Guid jobId = (Guid)jobIdObj;
372      JobData jobData = null;
373      EngineJob engineJob = null;
374      try {
375        maxSerializedJobsInMemory.WaitOne();
376        maxConcurrentConnections.WaitOne();
377        while (jobData == null) { // repeat until success
378          cancellationToken.ThrowIfCancellationRequested();
379          try {
380            jobData = ServiceLocator.Instance.CallHiveService(s => s.GetJobData(jobId));
381          }
382          catch (Exception e) {
383            LogException(e);
384            LogMessage("Repeating download");
385          }
386        }
387        engineJob = PersistenceUtil.Deserialize<EngineJob>(jobData.Data);
388        jobData = null;
389        LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId));
390      }
391      finally {
392        maxConcurrentConnections.Release();
393        maxSerializedJobsInMemory.Release();
394      }
395      return engineJob;
396    }
397
398    /// <summary>
399    /// Threadsafe message logging
400    /// </summary>
401    private void LogMessage(string message) {
402      lock (Log) {
403        Log.LogMessage(message);
404      }
405    }
406
407    /// <summary>
408    /// Threadsafe exception logging
409    /// </summary>
410    private void LogException(Exception exception) {
411      lock (Log) {
412        Log.LogException(exception);
413      }
414    }
415
416    /// <summary>
417    /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
418    /// If repetitions is -1, it is repeated infinitely.
419    /// </summary>
420    private static void TryAndRepeat(Action action, int repetitions, string errorMessage) {
421      try { action(); }
422      catch (Exception e) {
423        repetitions--;
424        if (repetitions <= 0)
425          throw new HiveEngineException(errorMessage, e);
426        TryAndRepeat(action, repetitions, errorMessage);
427      }
428    }
429  }
430
431  public static class EnumerableExtensions {
432    public static TimeSpan Sum(this IEnumerable<TimeSpan> times) {
433      return TimeSpan.FromMilliseconds(times.Select(e => e.TotalMilliseconds).Sum());
434    }
435  }
436}
Note: See TracBrowser for help on using the repository browser.