Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HiveHiveEngine/HeuristicLab.HiveEngine/3.3/HiveEngine.cs @ 7320

Last change on this file since 7320 was 7320, checked in by ascheibe, 12 years ago

#1744 fixed a bug in the job scheduler for hive engine tasks

File size: 15.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Common;
28using HeuristicLab.Clients.Hive;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
32
33namespace HeuristicLab.HiveEngine {
34  /// <summary>
35  /// Represents an engine that executes operations which can be executed in parallel on the hive
36  /// </summary>
37  [StorableClass]
38  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
39  public class HiveEngine : Engine, IHiveEngine {
40    private static object locker = new object();
41    private CancellationToken cancellationToken;
42    private bool firstRun = true;
43    AutoResetEvent handle = new AutoResetEvent(false);
44
45    [Storable]
46    public Guid ParentTaskId { get; set; }
47
48    [Storable]
49    private IOperator currentOperator;
50
51    [Storable]
52    private int nrOfSentRuns;
53
54    [Storable]
55    private string username;
56    public string Username {
57      get { return username; }
58      set { username = value; }
59    }
60
61    [Storable]
62    private string password;
63    public string Password {
64      get { return password; }
65      set { password = value; }
66    }
67
68    [Storable]
69    public string ResourceNames { get; set; }
70
71    [Storable]
72    private int priority;
73    public int Priority {
74      get { return priority; }
75      set { priority = value; }
76    }
77
78    [Storable]
79    private TimeSpan executionTimeOnHive;
80    public TimeSpan ExecutionTimeOnHive {
81      get { return executionTimeOnHive; }
82      set {
83        if (value != executionTimeOnHive) {
84          executionTimeOnHive = value;
85          OnExecutionTimeOnHiveChanged();
86        }
87      }
88    }
89
90    [Storable]
91    private bool isPrivileged;
92    public bool IsPrivileged {
93      get { return isPrivileged; }
94      set { isPrivileged = value; }
95    }
96
97    // Task can't be storable, so RefreshableHiveExperiment can't be stored. But as previous runs are only informative it does not matter (only execution time on hive will be wrong because of that -> Todo)
98    private ItemCollection<RefreshableJob> jobs = new ItemCollection<RefreshableJob>();
99    public ItemCollection<RefreshableJob> Jobs {
100      get { return jobs; }
101      set { jobs = value; }
102    }
103
104    private List<Plugin> onlinePlugins;
105    public List<Plugin> OnlinePlugins {
106      get { return onlinePlugins; }
107      set { onlinePlugins = value; }
108    }
109
110    private List<Plugin> alreadyUploadedPlugins;
111    public List<Plugin> AlreadyUploadedPlugins {
112      get { return alreadyUploadedPlugins; }
113      set { alreadyUploadedPlugins = value; }
114    }
115
116    [Storable]
117    private OperationCollection continueCollection = null;
118
119    [Storable]
120    private Guid lastJobGuid = Guid.Empty;
121
122    public bool IsAllowedPrivileged { get; set; }
123
124    #region constructors and cloning
125    public HiveEngine() {
126      this.ResourceNames = "HEAL";
127      this.Priority = 0;
128      this.log = new ThreadSafeLog();
129      this.IsAllowedPrivileged = HiveServiceLocator.Instance.CallHiveService((s) => s.IsAllowedPrivileged());
130      HiveServiceClient cl = ClientFactory.CreateClient<HiveServiceClient, IHiveService>();
131      username = cl.ClientCredentials.UserName.UserName;
132      password = cl.ClientCredentials.UserName.Password;
133    }
134
135    [StorableConstructor]
136    protected HiveEngine(bool deserializing) : base(deserializing) { }
137
138    [StorableHook(HookType.AfterDeserialization)]
139    private void AfterDeserialization() {
140      if (username != string.Empty && password != string.Empty) {
141        HiveServiceLocator.Instance.Username = username;
142        HiveServiceLocator.Instance.Password = password;
143      }
144    }
145
146    protected HiveEngine(HiveEngine original, Cloner cloner)
147      : base(original, cloner) {
148      this.ResourceNames = original.ResourceNames;
149      this.currentOperator = cloner.Clone(original.currentOperator);
150      this.priority = original.priority;
151      this.executionTimeOnHive = original.executionTimeOnHive;
152      this.IsPrivileged = original.IsPrivileged;
153      this.username = original.username;
154      this.password = original.password;
155      this.continueCollection = original.continueCollection;
156      this.lastJobGuid = original.lastJobGuid;
157      this.nrOfSentRuns = original.nrOfSentRuns;
158      this.ParentTaskId = original.ParentTaskId;
159      // do not clone jobs - otherwise they would be sent with every task
160    }
161    public override IDeepCloneable Clone(Cloner cloner) {
162      return new HiveEngine(this, cloner);
163    }
164    #endregion
165
166    #region Events
167    protected override void OnPrepared() {
168      base.OnPrepared();
169      this.ExecutionTimeOnHive = TimeSpan.Zero;
170      continueCollection = null;
171      lastJobGuid = Guid.Empty;
172    }
173
174    public event EventHandler ExecutionTimeOnHiveChanged;
175    protected virtual void OnExecutionTimeOnHiveChanged() {
176      var handler = ExecutionTimeOnHiveChanged;
177      if (handler != null) handler(this, EventArgs.Empty);
178    }
179    #endregion
180
181    protected override void Run(CancellationToken cancellationToken) {
182      this.cancellationToken = cancellationToken;
183      Run(ExecutionStack);
184    }
185
186    private void Run(object state) {
187      Stack<IOperation> executionStack = (Stack<IOperation>)state;
188      IOperation next;
189      OperationCollection coll;
190      IAtomicOperation operation;
191
192      if (firstRun) {
193        TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
194        this.OnlinePlugins = HiveServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.Hash != null).ToList();
195        this.AlreadyUploadedPlugins = new List<Plugin>();
196        firstRun = false;
197      }
198
199      while (executionStack.Count > 0) {
200        cancellationToken.ThrowIfCancellationRequested();
201
202        if (continueCollection != null && lastJobGuid != Guid.Empty) {
203          EngineTask[] eTasks = RetrieveResultsFromHive(lastJobGuid);
204          RestoreStateFromExecutedHiveTasks(eTasks, continueCollection);
205          continueCollection = null;
206          lastJobGuid = Guid.Empty;
207        }
208
209        next = executionStack.Pop();
210        if (next is OperationCollection) {
211          coll = (OperationCollection)next;
212
213          if (coll.Parallel) {
214            try {
215              // clone the parent scope here and reuse it for each operation. otherwise for each task the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
216              IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
217              parentScopeClone.SubScopes.Clear();
218              parentScopeClone.ClearParentScopes();
219
220              EngineTask[] tasks = new EngineTask[coll.Count];
221              for (int i = 0; i < coll.Count; i++) {
222                tasks[i] = new EngineTask(coll[i], new SequentialEngine.SequentialEngine());
223              }
224
225              var experiment = CreateJob();
226              ExecuteOnHive(experiment, tasks, parentScopeClone, new CancellationToken());
227              continueCollection = coll;
228              Pause();
229            }
230            catch {
231              executionStack.Push(coll); throw;
232            }
233          } else {
234            for (int i = coll.Count - 1; i >= 0; i--)
235              if (coll[i] != null) executionStack.Push(coll[i]);
236          }
237        } else if (next is IAtomicOperation) {
238          operation = (IAtomicOperation)next;
239          try {
240            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
241          }
242          catch (Exception ex) {
243            executionStack.Push(operation);
244            if (ex is OperationCanceledException) throw ex;
245            else throw new OperatorExecutionException(operation.Operator, ex);
246          }
247          if (next != null) executionStack.Push(next);
248
249          if (operation.Operator.Breakpoint) {
250            log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
251            Pause();
252          }
253        }
254      }
255    }
256
257    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
258      e.SetObserved(); // avoid crash of process
259    }
260
261    private IRandom FindRandomParameter(IExecutionContext ec) {
262      try {
263        if (ec == null)
264          return null;
265
266        foreach (var p in ec.Parameters) {
267          if (p.Name == "Random" && p is IValueParameter)
268            return ((IValueParameter)p).Value as IRandom;
269        }
270        return FindRandomParameter(ec.Parent);
271      }
272      catch { return null; }
273    }
274
275    private void RestoreStateFromExecutedHiveTasks(EngineTask[] tasks, OperationCollection coll) {
276      IScope[] scopes = new Scope[tasks.Length];
277
278      if (tasks.Count() != coll.Count) {
279        throw new ArgumentException("Retrieved tasks don't match operation collection");
280      }
281
282      int j = 0;
283      foreach (var hiveJob in tasks) {
284        var scope = ((IAtomicOperation)((EngineTask)hiveJob).InitialOperation).Scope;
285        scopes[j++] = scope;
286      }
287
288      for (int i = 0; i < coll.Count; i++) {
289        if (coll[i] is IAtomicOperation) {
290          ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
291        } else if (coll[i] is OperationCollection) {
292          // todo ??
293        }
294      }
295    }
296
297    private static void ExchangeScope(IScope source, IScope target) {
298      target.Variables.Clear();
299      target.Variables.AddRange(source.Variables);
300      target.SubScopes.Clear();
301      target.SubScopes.AddRange(source.SubScopes);
302      // TODO: validate if parent scopes match - otherwise source is invalid
303    }
304
305    /// <summary>
306    /// This method blocks until all jobs are finished
307    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
308    /// </summary>
309    /// <param name="tasks"></param>
310    private void ExecuteOnHive(RefreshableJob refreshableJob, EngineTask[] tasks, IScope parentScopeClone, CancellationToken cancellationToken) {
311      log.LogMessage(string.Format("Executing {0} operations on the hive.", tasks.Length));
312      IScope[] scopes = new Scope[tasks.Length];
313      object locker = new object();
314      var hiveExperiment = refreshableJob.Job;
315
316      try {
317        // create upload-tasks
318        for (int i = 0; i < tasks.Length; i++) {
319          var engineHiveTask = new EngineHiveTask(tasks[i], parentScopeClone);
320          engineHiveTask.Task.Priority = this.Priority;
321          if (ParentTaskId != Guid.Empty) {
322            engineHiveTask.Task.ParentTaskId = ParentTaskId;
323          }
324          refreshableJob.HiveTasks.Add(engineHiveTask);
325
326          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
327          IRandom random = FindRandomParameter(tasks[i].InitialOperation as IExecutionContext);
328          if (random != null)
329            random.Reset(random.Next());
330        }
331        HiveClient.StartJob((e) => { log.LogException(e); }, refreshableJob, cancellationToken);
332
333        while (refreshableJob.Job.Id == Guid.Empty) {
334          Thread.Sleep(500);
335        }
336        this.lastJobGuid = refreshableJob.Job.Id;
337        refreshableJob.Progress.Finished += new EventHandler(Progress_Finished);
338        handle.WaitOne();
339        refreshableJob.Progress.Finished -= new EventHandler(Progress_Finished);
340      }
341      catch (OperationCanceledException e) {
342        throw e;
343      }
344      catch (Exception e) {
345        log.LogException(e);
346        throw e;
347      }
348    }
349
350    void Progress_Finished(object sender, EventArgs e) {
351      handle.Set();
352    }
353
354    private EngineTask[] RetrieveResultsFromHive(Guid jobGuid) {
355      Job job = HiveServiceLocator.Instance.CallHiveService<Job>(s => s.GetJob(jobGuid));
356      var allTasks = HiveServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobTasks(job.Id));
357      var totalJobCount = allTasks.Count();
358
359      TaskDownloader downloader = new TaskDownloader(allTasks.Select(x => x.Id));
360      downloader.StartAsync();
361      while (!downloader.IsFinished) {
362        Thread.Sleep(500);
363        if (downloader.IsFaulted) {
364          throw downloader.Exception;
365        }
366      }
367      List<HiveTask> allHiveTasks = downloader.Results.Values.ToList();
368      allHiveTasks.ForEach(x => this.ExecutionTimeOnHive += x.Task.ExecutionTime);
369
370      var failedJobs = allHiveTasks.Where(x => x.Task.State != TaskState.Finished);
371      if (failedJobs.Count() > 0) {
372        throw new HiveEngineException("Task (" + failedJobs.First().Task.Id + ") failed: " + failedJobs.First().Task.StateLog.Last().Exception);
373      }
374
375      List<EngineTask> engineTasks = new List<EngineTask>();
376
377      foreach (var hTask in allHiveTasks) {
378        EngineTask ehTask = (EngineTask)hTask.ItemTask;
379        engineTasks.Add(ehTask);
380      }
381
382      jobs.Clear();
383      DeleteHiveExperiment(jobGuid);
384      return engineTasks.ToArray();
385    }
386
387    private RefreshableJob CreateJob() {
388      lock (locker) {
389        var hiveExperiment = new Job();
390        hiveExperiment.Name = "HiveEngine Run " + nrOfSentRuns++;
391        hiveExperiment.DateCreated = DateTime.Now;
392        hiveExperiment.ResourceNames = this.ResourceNames;
393        hiveExperiment.IsPrivileged = this.IsPrivileged;
394        var refreshableHiveExperiment = new RefreshableJob(hiveExperiment);
395        //refreshableHiveExperiment.RefreshAutomatically = false;
396        refreshableHiveExperiment.IsDownloadable = false; // download happens automatically so disable button
397        jobs.Add(refreshableHiveExperiment);
398        return refreshableHiveExperiment;
399      }
400    }
401
402    private void DeleteHiveExperiment(Guid jobId) {
403      HiveClient.TryAndRepeat(() => {
404        HiveServiceLocator.Instance.CallHiveService(s => s.DeleteJob(jobId));
405      }, 5, string.Format("Could not delete jobs"));
406    }
407
408    private List<Guid> GetResourceIds() {
409      return HiveServiceLocator.Instance.CallHiveService(service => {
410        var resourceNames = ResourceNames.Split(';');
411        var resourceIds = new List<Guid>();
412        foreach (var resourceName in resourceNames) {
413          Guid resourceId = service.GetResourceId(resourceName);
414          if (resourceId == Guid.Empty) {
415            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
416          }
417          resourceIds.Add(resourceId);
418        }
419        return resourceIds;
420      });
421    }
422  }
423}
Note: See TracBrowser for help on using the repository browser.