Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.HiveEngine/3.3/HiveEngine.cs @ 6723

Last change on this file since 6723 was 6723, checked in by ascheibe, 13 years ago

#1233 Review comments: renamed HiveEperiment to Job

File size: 13.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive;
28using HeuristicLab.Common;
29using HeuristicLab.Core;
30using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
31
32namespace HeuristicLab.HiveEngine {
33  /// <summary>
34  /// Represents an engine that executes operations which can be executed in parallel on the hive
35  /// </summary>
36  [StorableClass]
37  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
38  public class HiveEngine : Engine {
39    private static object locker = new object();
40    private CancellationToken cancellationToken;
41    private bool firstRun = true;
42
43    [Storable]
44    private IOperator currentOperator;
45
46    [Storable]
47    public string ResourceNames { get; set; }
48
49    [Storable]
50    private int priority;
51    public int Priority {
52      get { return priority; }
53      set { priority = value; }
54    }
55
56    [Storable]
57    private TimeSpan executionTimeOnHive;
58    public TimeSpan ExecutionTimeOnHive {
59      get { return executionTimeOnHive; }
60      set {
61        if (value != executionTimeOnHive) {
62          executionTimeOnHive = value;
63          OnExecutionTimeOnHiveChanged();
64        }
65      }
66    }
67
68    [Storable]
69    private bool isPrivileged;
70    public bool IsPrivileged {
71      get { return isPrivileged; }
72      set { isPrivileged = value; }
73    }
74
75    // Job can't be storable, so RefreshableHiveExperiment can't be stored. But as previous runs are only informative it does not matter (only execution time on hive will be wrong because of that -> Todo)
76    private ItemCollection<RefreshableHiveExperiment> hiveExperiments = new ItemCollection<RefreshableHiveExperiment>();
77    public ItemCollection<RefreshableHiveExperiment> HiveExperiments {
78      get { return hiveExperiments; }
79      set { hiveExperiments = value; }
80    }
81
82    private List<Plugin> onlinePlugins;
83    public List<Plugin> OnlinePlugins {
84      get { return onlinePlugins; }
85      set { onlinePlugins = value; }
86    }
87
88    private List<Plugin> alreadyUploadedPlugins;
89    public List<Plugin> AlreadyUploadedPlugins {
90      get { return alreadyUploadedPlugins; }
91      set { alreadyUploadedPlugins = value; }
92    }
93
94    public bool IsAllowedPrivileged { get; set; }
95
96    #region constructors and cloning
97    public HiveEngine() {
98      this.ResourceNames = "HEAL";
99      this.Priority = 0;
100      this.log = new ThreadSafeLog(this.log);
101      this.IsAllowedPrivileged = ServiceLocator.Instance.CallHiveService((s) => s.IsAllowedPrivileged());
102    }
103
104    [StorableConstructor]
105    protected HiveEngine(bool deserializing) : base(deserializing) { }
106    protected HiveEngine(HiveEngine original, Cloner cloner)
107      : base(original, cloner) {
108      this.ResourceNames = original.ResourceNames;
109      this.currentOperator = cloner.Clone(original.currentOperator);
110      this.priority = original.priority;
111      this.executionTimeOnHive = original.executionTimeOnHive;
112      this.IsPrivileged = original.IsPrivileged;
113      // do not clone hiveExperiments - otherwise they would be sent with every job
114    }
115    public override IDeepCloneable Clone(Cloner cloner) {
116      return new HiveEngine(this, cloner);
117    }
118    #endregion
119
120    #region Events
121    protected override void OnPrepared() {
122      base.OnPrepared();
123      this.ExecutionTimeOnHive = TimeSpan.Zero;
124    }
125
126    public event EventHandler ExecutionTimeOnHiveChanged;
127    protected virtual void OnExecutionTimeOnHiveChanged() {
128      var handler = ExecutionTimeOnHiveChanged;
129      if (handler != null) handler(this, EventArgs.Empty);
130    }
131    #endregion
132
133    protected override void Run(CancellationToken cancellationToken) {
134      this.cancellationToken = cancellationToken;
135      Run(ExecutionStack);
136    }
137
138    private void Run(object state) {
139      Stack<IOperation> executionStack = (Stack<IOperation>)state;
140      IOperation next;
141      OperationCollection coll;
142      IAtomicOperation operation;
143
144      if (firstRun) {
145        TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
146        this.OnlinePlugins = ServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.Hash != null).ToList();
147        this.AlreadyUploadedPlugins = new List<Plugin>();
148        firstRun = false;
149      }
150
151      while (executionStack.Count > 0) {
152        cancellationToken.ThrowIfCancellationRequested();
153
154        next = executionStack.Pop();
155        if (next is OperationCollection) {
156          coll = (OperationCollection)next;
157
158          if (coll.Parallel) {
159            try {
160              // clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
161              IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
162              parentScopeClone.SubScopes.Clear();
163              parentScopeClone.ClearParentScopes();
164
165              EngineJob[] jobs = new EngineJob[coll.Count];
166              for (int i = 0; i < coll.Count; i++) {
167                jobs[i] = new EngineJob(coll[i], new SequentialEngine.SequentialEngine());
168              }
169
170              var experiment = CreateHiveExperiment();
171              IScope[] scopes = ExecuteOnHive(experiment, jobs, parentScopeClone, cancellationToken);
172
173              for (int i = 0; i < coll.Count; i++) {
174                if (coll[i] is IAtomicOperation) {
175                  ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
176                } else if (coll[i] is OperationCollection) {
177                  // todo ??
178                }
179              }
180            }
181            catch {
182              executionStack.Push(coll); throw;
183            }
184          } else {
185            for (int i = coll.Count - 1; i >= 0; i--)
186              if (coll[i] != null) executionStack.Push(coll[i]);
187          }
188        } else if (next is IAtomicOperation) {
189          operation = (IAtomicOperation)next;
190          try {
191            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
192          }
193          catch (Exception ex) {
194            executionStack.Push(operation);
195            if (ex is OperationCanceledException) throw ex;
196            else throw new OperatorExecutionException(operation.Operator, ex);
197          }
198          if (next != null) executionStack.Push(next);
199
200          if (operation.Operator.Breakpoint) {
201            log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
202            Pause();
203          }
204        }
205      }
206    }
207
208    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
209      e.SetObserved(); // avoid crash of process
210    }
211
212    private IRandom FindRandomParameter(IExecutionContext ec) {
213      try {
214        if (ec == null)
215          return null;
216
217        foreach (var p in ec.Parameters) {
218          if (p.Name == "Random" && p is IValueParameter)
219            return ((IValueParameter)p).Value as IRandom;
220        }
221        return FindRandomParameter(ec.Parent);
222      }
223      catch { return null; }
224    }
225
226    private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
227      ExchangeScope(source.Scope, target.Scope);
228    }
229
230    private static void ExchangeScope(IScope source, IScope target) {
231      target.Variables.Clear();
232      target.Variables.AddRange(source.Variables);
233      target.SubScopes.Clear();
234      target.SubScopes.AddRange(source.SubScopes);
235      // TODO: validate if parent scopes match - otherwise source is invalid
236    }
237
238    /// <summary>
239    /// This method blocks until all jobs are finished
240    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
241    /// </summary>
242    /// <param name="jobs"></param>
243    private IScope[] ExecuteOnHive(RefreshableHiveExperiment refreshableHiveExperiment, EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
244      log.LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
245      IScope[] scopes = new Scope[jobs.Length];
246      object locker = new object();
247      var hiveExperiment = refreshableHiveExperiment.HiveExperiment;
248
249      try {
250        // create upload-tasks
251        for (int i = 0; i < jobs.Length; i++) {
252          var engineHiveJob = new EngineHiveJob(jobs[i], parentScopeClone);
253          engineHiveJob.Job.Priority = this.Priority;
254          refreshableHiveExperiment.HiveJobs.Add(engineHiveJob);
255
256          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
257          IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext);
258          if (random != null)
259            random.Reset(random.Next());
260        }
261        HiveClient.StartExperiment((e) => { log.LogException(e); }, refreshableHiveExperiment, cancellationToken);
262
263        // do polling until experiment is finished and all jobs are downloaded
264        while (!refreshableHiveExperiment.AllJobsFinished()) {
265          Thread.Sleep(2000);
266          this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(hiveExperiments.Sum(x => x.ExecutionTime.TotalMilliseconds));
267          cancellationToken.ThrowIfCancellationRequested();
268        }
269        log.LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableHiveExperiment.ToString(), refreshableHiveExperiment.ExecutionTime));
270
271        var failedJobs = refreshableHiveExperiment.HiveJobs.Where(x => x.Job.State == TaskState.Failed);
272        if (failedJobs.Count() > 0) {
273          throw new HiveEngineException("Task failed: " + failedJobs.First().Job.StateLog.Last().Exception);
274        }
275
276        // get scopes
277        int j = 0;
278        foreach (var hiveJob in refreshableHiveExperiment.HiveJobs) {
279          var scope = ((IAtomicOperation)((EngineJob)hiveJob.ItemJob).InitialOperation).Scope;
280          scopes[j++] = scope;
281        }
282        return scopes;
283      }
284      catch (OperationCanceledException e) {
285        throw e;
286      }
287      catch (Exception e) {
288        log.LogException(e);
289        throw e;
290      }
291      finally {
292        DisposeHiveExperiment(refreshableHiveExperiment);
293      }
294    }
295
296    private RefreshableHiveExperiment CreateHiveExperiment() {
297      lock (locker) {
298        var hiveExperiment = new Job();
299        hiveExperiment.Name = "HiveEngine Run " + hiveExperiments.Count;
300        hiveExperiment.DateCreated = DateTime.Now;
301        hiveExperiment.ResourceNames = this.ResourceNames;
302        hiveExperiment.IsPrivileged = this.IsPrivileged;
303        var refreshableHiveExperiment = new RefreshableHiveExperiment(hiveExperiment);
304        refreshableHiveExperiment.IsDownloadable = false; // download happens automatically so disable button
305        hiveExperiments.Add(refreshableHiveExperiment);
306        return refreshableHiveExperiment;
307      }
308    }
309
310    private void DisposeHiveExperiment(RefreshableHiveExperiment refreshableHiveExperiment) {
311      refreshableHiveExperiment.RefreshAutomatically = false;
312      DeleteHiveExperiment(refreshableHiveExperiment.HiveExperiment.Id);
313      ClearData(refreshableHiveExperiment);
314    }
315
316    private void ClearData(RefreshableHiveExperiment refreshableHiveExperiment) {
317      var jobs = refreshableHiveExperiment.GetAllHiveJobs();
318      foreach (var job in jobs) {
319        job.ClearData();
320      }
321    }
322
323    private void DeleteHiveExperiment(Guid hiveExperimentId) {
324      HiveClient.TryAndRepeat(() => {
325        ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(hiveExperimentId));
326      }, 5, string.Format("Could not delete jobs"));
327    }
328
329    private List<Guid> GetResourceIds() {
330      return ServiceLocator.Instance.CallHiveService(service => {
331        var resourceNames = ResourceNames.Split(';');
332        var resourceIds = new List<Guid>();
333        foreach (var resourceName in resourceNames) {
334          Guid resourceId = service.GetResourceId(resourceName);
335          if (resourceId == Guid.Empty) {
336            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
337          }
338          resourceIds.Add(resourceId);
339        }
340        return resourceIds;
341      });
342    }
343  }
344}
Note: See TracBrowser for help on using the repository browser.