Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HiveHiveEngine/HeuristicLab.HiveEngine/3.3/HiveEngine.cs @ 8694

Last change on this file since 8694 was 7417, checked in by ascheibe, 13 years ago

#1744 added an option so that the parent task can also wait on the slave for the child tasks to finish

File size: 17.1 KB
RevLine 
[7287]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Common;
28using HeuristicLab.Clients.Hive;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
32
33namespace HeuristicLab.HiveEngine {
34  /// <summary>
35  /// Represents an engine that executes operations which can be executed in parallel on the hive
36  /// </summary>
37  [StorableClass]
38  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
39  public class HiveEngine : Engine, IHiveEngine {
40    private static object locker = new object();
41    private CancellationToken cancellationToken;
42    private bool firstRun = true;
43    AutoResetEvent handle = new AutoResetEvent(false);
44
45    [Storable]
46    public Guid ParentTaskId { get; set; }
47
48    [Storable]
[7417]49    public bool PauseWhileCalculatingChilds { get; set; }
50
51    [Storable]
[7287]52    private IOperator currentOperator;
53
54    [Storable]
55    private int nrOfSentRuns;
56
57    [Storable]
58    private string username;
59    public string Username {
60      get { return username; }
61      set { username = value; }
62    }
63
64    [Storable]
65    private string password;
66    public string Password {
67      get { return password; }
68      set { password = value; }
69    }
70
71    [Storable]
72    public string ResourceNames { get; set; }
73
74    [Storable]
75    private int priority;
76    public int Priority {
77      get { return priority; }
78      set { priority = value; }
79    }
80
81    [Storable]
82    private TimeSpan executionTimeOnHive;
83    public TimeSpan ExecutionTimeOnHive {
84      get { return executionTimeOnHive; }
85      set {
86        if (value != executionTimeOnHive) {
87          executionTimeOnHive = value;
88          OnExecutionTimeOnHiveChanged();
89        }
90      }
91    }
92
93    [Storable]
94    private bool isPrivileged;
95    public bool IsPrivileged {
96      get { return isPrivileged; }
97      set { isPrivileged = value; }
98    }
99
100    // Task can't be storable, so RefreshableHiveExperiment can't be stored. But as previous runs are only informative it does not matter (only execution time on hive will be wrong because of that -> Todo)
101    private ItemCollection<RefreshableJob> jobs = new ItemCollection<RefreshableJob>();
102    public ItemCollection<RefreshableJob> Jobs {
103      get { return jobs; }
104      set { jobs = value; }
105    }
106
107    private List<Plugin> onlinePlugins;
108    public List<Plugin> OnlinePlugins {
109      get { return onlinePlugins; }
110      set { onlinePlugins = value; }
111    }
112
113    private List<Plugin> alreadyUploadedPlugins;
114    public List<Plugin> AlreadyUploadedPlugins {
115      get { return alreadyUploadedPlugins; }
116      set { alreadyUploadedPlugins = value; }
117    }
118
119    [Storable]
120    private OperationCollection continueCollection = null;
121
122    [Storable]
123    private Guid lastJobGuid = Guid.Empty;
124
125    public bool IsAllowedPrivileged { get; set; }
126
127    #region constructors and cloning
128    public HiveEngine() {
129      this.ResourceNames = "HEAL";
130      this.Priority = 0;
131      this.log = new ThreadSafeLog();
132      this.IsAllowedPrivileged = HiveServiceLocator.Instance.CallHiveService((s) => s.IsAllowedPrivileged());
133      HiveServiceClient cl = ClientFactory.CreateClient<HiveServiceClient, IHiveService>();
134      username = cl.ClientCredentials.UserName.UserName;
135      password = cl.ClientCredentials.UserName.Password;
[7417]136      PauseWhileCalculatingChilds = true;
[7287]137    }
138
139    [StorableConstructor]
140    protected HiveEngine(bool deserializing) : base(deserializing) { }
141
142    [StorableHook(HookType.AfterDeserialization)]
143    private void AfterDeserialization() {
144      if (username != string.Empty && password != string.Empty) {
145        HiveServiceLocator.Instance.Username = username;
146        HiveServiceLocator.Instance.Password = password;
147      }
148    }
149
150    protected HiveEngine(HiveEngine original, Cloner cloner)
151      : base(original, cloner) {
152      this.ResourceNames = original.ResourceNames;
153      this.currentOperator = cloner.Clone(original.currentOperator);
154      this.priority = original.priority;
155      this.executionTimeOnHive = original.executionTimeOnHive;
156      this.IsPrivileged = original.IsPrivileged;
157      this.username = original.username;
158      this.password = original.password;
159      this.continueCollection = original.continueCollection;
160      this.lastJobGuid = original.lastJobGuid;
161      this.nrOfSentRuns = original.nrOfSentRuns;
162      this.ParentTaskId = original.ParentTaskId;
[7417]163      this.PauseWhileCalculatingChilds = original.PauseWhileCalculatingChilds;
[7287]164      // do not clone jobs - otherwise they would be sent with every task
165    }
166    public override IDeepCloneable Clone(Cloner cloner) {
167      return new HiveEngine(this, cloner);
168    }
169    #endregion
170
171    #region Events
172    protected override void OnPrepared() {
173      base.OnPrepared();
174      this.ExecutionTimeOnHive = TimeSpan.Zero;
175      continueCollection = null;
176      lastJobGuid = Guid.Empty;
177    }
178
179    public event EventHandler ExecutionTimeOnHiveChanged;
180    protected virtual void OnExecutionTimeOnHiveChanged() {
181      var handler = ExecutionTimeOnHiveChanged;
182      if (handler != null) handler(this, EventArgs.Empty);
183    }
184    #endregion
185
186    protected override void Run(CancellationToken cancellationToken) {
187      this.cancellationToken = cancellationToken;
188      Run(ExecutionStack);
189    }
190
191    private void Run(object state) {
192      Stack<IOperation> executionStack = (Stack<IOperation>)state;
193      IOperation next;
194      OperationCollection coll;
195      IAtomicOperation operation;
196
197      if (firstRun) {
198        TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
199        this.OnlinePlugins = HiveServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.Hash != null).ToList();
200        this.AlreadyUploadedPlugins = new List<Plugin>();
201        firstRun = false;
202      }
203
204      while (executionStack.Count > 0) {
205        cancellationToken.ThrowIfCancellationRequested();
206
[7417]207        if (continueCollection != null && lastJobGuid != Guid.Empty && PauseWhileCalculatingChilds) {
[7287]208          EngineTask[] eTasks = RetrieveResultsFromHive(lastJobGuid);
209          RestoreStateFromExecutedHiveTasks(eTasks, continueCollection);
210          continueCollection = null;
211          lastJobGuid = Guid.Empty;
212        }
213
214        next = executionStack.Pop();
215        if (next is OperationCollection) {
216          coll = (OperationCollection)next;
217
218          if (coll.Parallel) {
219            try {
220              // clone the parent scope here and reuse it for each operation. otherwise for each task the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
221              IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
222              parentScopeClone.SubScopes.Clear();
223              parentScopeClone.ClearParentScopes();
224
225              EngineTask[] tasks = new EngineTask[coll.Count];
226              for (int i = 0; i < coll.Count; i++) {
227                tasks[i] = new EngineTask(coll[i], new SequentialEngine.SequentialEngine());
228              }
229
230              var experiment = CreateJob();
[7417]231              var engineTasks = ExecuteOnHive(experiment, tasks, parentScopeClone, new CancellationToken());
232              if (PauseWhileCalculatingChilds) {
233                continueCollection = coll;
234                Pause();
235              } else {
236                RestoreStateFromExecutedHiveTasks(engineTasks, coll);
237              }
[7287]238            }
239            catch {
240              executionStack.Push(coll); throw;
241            }
242          } else {
243            for (int i = coll.Count - 1; i >= 0; i--)
244              if (coll[i] != null) executionStack.Push(coll[i]);
245          }
246        } else if (next is IAtomicOperation) {
247          operation = (IAtomicOperation)next;
248          try {
249            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
250          }
251          catch (Exception ex) {
252            executionStack.Push(operation);
253            if (ex is OperationCanceledException) throw ex;
254            else throw new OperatorExecutionException(operation.Operator, ex);
255          }
256          if (next != null) executionStack.Push(next);
257
258          if (operation.Operator.Breakpoint) {
259            log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
260            Pause();
261          }
262        }
263      }
264    }
265
266    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
267      e.SetObserved(); // avoid crash of process
268    }
269
270    private IRandom FindRandomParameter(IExecutionContext ec) {
271      try {
272        if (ec == null)
273          return null;
274
275        foreach (var p in ec.Parameters) {
276          if (p.Name == "Random" && p is IValueParameter)
277            return ((IValueParameter)p).Value as IRandom;
278        }
279        return FindRandomParameter(ec.Parent);
280      }
281      catch { return null; }
282    }
283
284    private void RestoreStateFromExecutedHiveTasks(EngineTask[] tasks, OperationCollection coll) {
285      IScope[] scopes = new Scope[tasks.Length];
286
287      if (tasks.Count() != coll.Count) {
288        throw new ArgumentException("Retrieved tasks don't match operation collection");
289      }
290
291      int j = 0;
292      foreach (var hiveJob in tasks) {
293        var scope = ((IAtomicOperation)((EngineTask)hiveJob).InitialOperation).Scope;
294        scopes[j++] = scope;
295      }
296
297      for (int i = 0; i < coll.Count; i++) {
298        if (coll[i] is IAtomicOperation) {
299          ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
300        } else if (coll[i] is OperationCollection) {
301          // todo ??
302        }
303      }
304    }
305
306    private static void ExchangeScope(IScope source, IScope target) {
307      target.Variables.Clear();
308      target.Variables.AddRange(source.Variables);
309      target.SubScopes.Clear();
310      target.SubScopes.AddRange(source.SubScopes);
311      // TODO: validate if parent scopes match - otherwise source is invalid
312    }
313
314    /// <summary>
315    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
316    /// </summary>
317    /// <param name="tasks"></param>
[7417]318    private EngineTask[] ExecuteOnHive(RefreshableJob refreshableJob, EngineTask[] tasks, IScope parentScopeClone, CancellationToken cancellationToken) {
[7287]319      log.LogMessage(string.Format("Executing {0} operations on the hive.", tasks.Length));
320      IScope[] scopes = new Scope[tasks.Length];
321      object locker = new object();
322      var hiveExperiment = refreshableJob.Job;
323
324      try {
325        // create upload-tasks
326        for (int i = 0; i < tasks.Length; i++) {
327          var engineHiveTask = new EngineHiveTask(tasks[i], parentScopeClone);
328          engineHiveTask.Task.Priority = this.Priority;
[7417]329          if (ParentTaskId != Guid.Empty && PauseWhileCalculatingChilds) {
[7287]330            engineHiveTask.Task.ParentTaskId = ParentTaskId;
331          }
332          refreshableJob.HiveTasks.Add(engineHiveTask);
333
334          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
[7417]335          //IRandom random = FindRandomParameter(tasks[i].InitialOperation as IExecutionContext);
336          //if (random != null)
337          //random.Reset(random.Next());
[7287]338        }
339        HiveClient.StartJob((e) => { log.LogException(e); }, refreshableJob, cancellationToken);
340
341        while (refreshableJob.Job.Id == Guid.Empty) {
342          Thread.Sleep(500);
343        }
344        this.lastJobGuid = refreshableJob.Job.Id;
345        refreshableJob.Progress.Finished += new EventHandler(Progress_Finished);
346        handle.WaitOne();
347        refreshableJob.Progress.Finished -= new EventHandler(Progress_Finished);
[7417]348
349        if (!PauseWhileCalculatingChilds) {
350
351          while (!refreshableJob.AllJobsFinished()) {
352            Thread.Sleep(1000);
353          }
354
355          List<HiveTask> allHiveTasks = refreshableJob.HiveTasks.ToList();
356          allHiveTasks.ForEach(x => this.ExecutionTimeOnHive += x.Task.ExecutionTime);
357
358          var failedJobs = allHiveTasks.Where(x => x.Task.State != TaskState.Finished);
359          if (failedJobs.Count() > 0) {
360            throw new HiveEngineException("Task (" + failedJobs.First().Task.Id + ") failed: " + failedJobs.First().Task.StateLog.Last().Exception);
361          }
362
363          List<EngineTask> engineTasks = new List<EngineTask>();
364
365          foreach (var hTask in allHiveTasks) {
366            EngineTask ehTask = (EngineTask)hTask.ItemTask;
367            engineTasks.Add(ehTask);
368          }
369
370          jobs.Clear();
371          DeleteHiveExperiment(refreshableJob.Id);
372
373          return engineTasks.ToArray();
374        } else {
375          return null;
376        }
[7287]377      }
378      catch (OperationCanceledException e) {
379        throw e;
380      }
381      catch (Exception e) {
382        log.LogException(e);
383        throw e;
384      }
385    }
386
387    void Progress_Finished(object sender, EventArgs e) {
388      handle.Set();
389    }
390
391    private EngineTask[] RetrieveResultsFromHive(Guid jobGuid) {
392      Job job = HiveServiceLocator.Instance.CallHiveService<Job>(s => s.GetJob(jobGuid));
393      var allTasks = HiveServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobTasks(job.Id));
394      var totalJobCount = allTasks.Count();
395
396      TaskDownloader downloader = new TaskDownloader(allTasks.Select(x => x.Id));
397      downloader.StartAsync();
398      while (!downloader.IsFinished) {
399        Thread.Sleep(500);
400        if (downloader.IsFaulted) {
401          throw downloader.Exception;
402        }
403      }
404      List<HiveTask> allHiveTasks = downloader.Results.Values.ToList();
405      allHiveTasks.ForEach(x => this.ExecutionTimeOnHive += x.Task.ExecutionTime);
406
407      var failedJobs = allHiveTasks.Where(x => x.Task.State != TaskState.Finished);
408      if (failedJobs.Count() > 0) {
[7320]409        throw new HiveEngineException("Task (" + failedJobs.First().Task.Id + ") failed: " + failedJobs.First().Task.StateLog.Last().Exception);
[7287]410      }
411
412      List<EngineTask> engineTasks = new List<EngineTask>();
413
414      foreach (var hTask in allHiveTasks) {
415        EngineTask ehTask = (EngineTask)hTask.ItemTask;
416        engineTasks.Add(ehTask);
417      }
418
419      jobs.Clear();
420      DeleteHiveExperiment(jobGuid);
421      return engineTasks.ToArray();
422    }
423
424    private RefreshableJob CreateJob() {
425      lock (locker) {
426        var hiveExperiment = new Job();
427        hiveExperiment.Name = "HiveEngine Run " + nrOfSentRuns++;
428        hiveExperiment.DateCreated = DateTime.Now;
429        hiveExperiment.ResourceNames = this.ResourceNames;
430        hiveExperiment.IsPrivileged = this.IsPrivileged;
431        var refreshableHiveExperiment = new RefreshableJob(hiveExperiment);
432        //refreshableHiveExperiment.RefreshAutomatically = false;
433        refreshableHiveExperiment.IsDownloadable = false; // download happens automatically so disable button
434        jobs.Add(refreshableHiveExperiment);
435        return refreshableHiveExperiment;
436      }
437    }
438
439    private void DeleteHiveExperiment(Guid jobId) {
440      HiveClient.TryAndRepeat(() => {
441        HiveServiceLocator.Instance.CallHiveService(s => s.DeleteJob(jobId));
442      }, 5, string.Format("Could not delete jobs"));
443    }
444
445    private List<Guid> GetResourceIds() {
446      return HiveServiceLocator.Instance.CallHiveService(service => {
447        var resourceNames = ResourceNames.Split(';');
448        var resourceIds = new List<Guid>();
449        foreach (var resourceName in resourceNames) {
450          Guid resourceId = service.GetResourceId(resourceName);
451          if (resourceId == Guid.Empty) {
452            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
453          }
454          resourceIds.Add(resourceId);
455        }
456        return resourceIds;
457      });
458    }
459  }
460}
Note: See TracBrowser for help on using the repository browser.