Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.MetaOptimization/HeuristicLab.HiveEngine/3.3/HiveEngine.cs @ 10573

Last change on this file since 10573 was 7137, checked in by ascheibe, 13 years ago

#1215 renamed ServiceLocator to HiveServiceLocator

File size: 13.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive;
28using HeuristicLab.Common;
29using HeuristicLab.Core;
30using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
31
32namespace HeuristicLab.HiveEngine {
33  /// <summary>
34  /// Represents an engine that executes operations which can be executed in parallel on the hive
35  /// </summary>
36  [StorableClass]
37  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
38  public class HiveEngine : Engine {
39    private static object locker = new object();
40    private CancellationToken cancellationToken;
41    private bool firstRun = true;
42
43    [Storable]
44    private IOperator currentOperator;
45
46    [Storable]
47    public string ResourceNames { get; set; }
48
49    [Storable]
50    private int priority;
51    public int Priority {
52      get { return priority; }
53      set { priority = value; }
54    }
55
56    [Storable]
57    private TimeSpan executionTimeOnHive;
58    public TimeSpan ExecutionTimeOnHive {
59      get { return executionTimeOnHive; }
60      set {
61        if (value != executionTimeOnHive) {
62          executionTimeOnHive = value;
63          OnExecutionTimeOnHiveChanged();
64        }
65      }
66    }
67
68    [Storable]
69    private bool isPrivileged;
70    public bool IsPrivileged {
71      get { return isPrivileged; }
72      set { isPrivileged = value; }
73    }
74
75    // Task can't be storable, so RefreshableHiveExperiment can't be stored. But as previous runs are only informative it does not matter (only execution time on hive will be wrong because of that -> Todo)
76    private ItemCollection<RefreshableJob> jobs = new ItemCollection<RefreshableJob>();
77    public ItemCollection<RefreshableJob> Jobs {
78      get { return jobs; }
79      set { jobs = value; }
80    }
81
82    private List<Plugin> onlinePlugins;
83    public List<Plugin> OnlinePlugins {
84      get { return onlinePlugins; }
85      set { onlinePlugins = value; }
86    }
87
88    private List<Plugin> alreadyUploadedPlugins;
89    public List<Plugin> AlreadyUploadedPlugins {
90      get { return alreadyUploadedPlugins; }
91      set { alreadyUploadedPlugins = value; }
92    }
93
94    public bool IsAllowedPrivileged { get; set; }
95
96    #region constructors and cloning
97    public HiveEngine() {
98      this.ResourceNames = "HEAL";
99      this.Priority = 0;
100      this.log = new ThreadSafeLog();
101      this.IsAllowedPrivileged = HiveServiceLocator.Instance.CallHiveService((s) => s.IsAllowedPrivileged());
102    }
103
104    [StorableConstructor]
105    protected HiveEngine(bool deserializing) : base(deserializing) { }
106    protected HiveEngine(HiveEngine original, Cloner cloner)
107      : base(original, cloner) {
108      this.ResourceNames = original.ResourceNames;
109      this.currentOperator = cloner.Clone(original.currentOperator);
110      this.priority = original.priority;
111      this.executionTimeOnHive = original.executionTimeOnHive;
112      this.IsPrivileged = original.IsPrivileged;
113      // do not clone jobs - otherwise they would be sent with every task
114    }
115    public override IDeepCloneable Clone(Cloner cloner) {
116      return new HiveEngine(this, cloner);
117    }
118    #endregion
119
120    #region Events
121    protected override void OnPrepared() {
122      base.OnPrepared();
123      this.ExecutionTimeOnHive = TimeSpan.Zero;
124    }
125
126    public event EventHandler ExecutionTimeOnHiveChanged;
127    protected virtual void OnExecutionTimeOnHiveChanged() {
128      var handler = ExecutionTimeOnHiveChanged;
129      if (handler != null) handler(this, EventArgs.Empty);
130    }
131    #endregion
132
133    protected override void Run(CancellationToken cancellationToken) {
134      this.cancellationToken = cancellationToken;
135      Run(ExecutionStack);
136    }
137
138    private void Run(object state) {
139      Stack<IOperation> executionStack = (Stack<IOperation>)state;
140      IOperation next;
141      OperationCollection coll;
142      IAtomicOperation operation;
143
144      if (firstRun) {
145        TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
146        this.OnlinePlugins = HiveServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.Hash != null).ToList();
147        this.AlreadyUploadedPlugins = new List<Plugin>();
148        firstRun = false;
149      }
150
151      while (executionStack.Count > 0) {
152        cancellationToken.ThrowIfCancellationRequested();
153
154        next = executionStack.Pop();
155        if (next is OperationCollection) {
156          coll = (OperationCollection)next;
157
158          if (coll.Parallel) {
159            try {
160              // clone the parent scope here and reuse it for each operation. otherwise for each task the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
161              IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
162              parentScopeClone.SubScopes.Clear();
163              parentScopeClone.ClearParentScopes();
164
165              EngineTask[] jobs = new EngineTask[coll.Count];
166              for (int i = 0; i < coll.Count; i++) {
167                jobs[i] = new EngineTask(coll[i], new SequentialEngine.SequentialEngine());
168              }
169
170              var experiment = CreateJob();
171              IScope[] scopes = ExecuteOnHive(experiment, jobs, parentScopeClone, cancellationToken);
172
173              for (int i = 0; i < coll.Count; i++) {
174                if (coll[i] is IAtomicOperation) {
175                  ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
176                } else if (coll[i] is OperationCollection) {
177                  // todo ??
178                }
179              }
180            }
181            catch {
182              executionStack.Push(coll); throw;
183            }
184          } else {
185            for (int i = coll.Count - 1; i >= 0; i--)
186              if (coll[i] != null) executionStack.Push(coll[i]);
187          }
188        } else if (next is IAtomicOperation) {
189          operation = (IAtomicOperation)next;
190          try {
191            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
192          }
193          catch (Exception ex) {
194            executionStack.Push(operation);
195            if (ex is OperationCanceledException) throw ex;
196            else throw new OperatorExecutionException(operation.Operator, ex);
197          }
198          if (next != null) executionStack.Push(next);
199
200          if (operation.Operator.Breakpoint) {
201            log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
202            Pause();
203          }
204        }
205      }
206    }
207
208    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
209      e.SetObserved(); // avoid crash of process
210    }
211
212    private IRandom FindRandomParameter(IExecutionContext ec) {
213      try {
214        if (ec == null)
215          return null;
216
217        foreach (var p in ec.Parameters) {
218          if (p.Name == "Random" && p is IValueParameter)
219            return ((IValueParameter)p).Value as IRandom;
220        }
221        return FindRandomParameter(ec.Parent);
222      }
223      catch { return null; }
224    }
225
226    private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
227      ExchangeScope(source.Scope, target.Scope);
228    }
229
230    private static void ExchangeScope(IScope source, IScope target) {
231      target.Variables.Clear();
232      target.Variables.AddRange(source.Variables);
233      target.SubScopes.Clear();
234      target.SubScopes.AddRange(source.SubScopes);
235      // TODO: validate if parent scopes match - otherwise source is invalid
236    }
237
238    /// <summary>
239    /// This method blocks until all jobs are finished
240    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
241    /// </summary>
242    /// <param name="jobs"></param>
243    private IScope[] ExecuteOnHive(RefreshableJob refreshableJob, EngineTask[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
244      log.LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
245      IScope[] scopes = new Scope[jobs.Length];
246      object locker = new object();
247      var hiveExperiment = refreshableJob.Job;
248
249      try {
250        // create upload-tasks
251        for (int i = 0; i < jobs.Length; i++) {
252          var engineHiveJob = new EngineHiveTask(jobs[i], parentScopeClone);
253          engineHiveJob.Task.Priority = this.Priority;
254          refreshableJob.HiveTasks.Add(engineHiveJob);
255
256          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
257          IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext);
258          if (random != null)
259            random.Reset(random.Next());
260        }
261        HiveClient.StartJob((e) => { log.LogException(e); }, refreshableJob, cancellationToken);
262
263        // do polling until experiment is finished and all jobs are downloaded
264        while (!refreshableJob.AllJobsFinished()) {
265          Thread.Sleep(2000);
266          this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(jobs.Sum(x => x.ExecutionTime.TotalMilliseconds));
267          cancellationToken.ThrowIfCancellationRequested();
268        }
269        log.LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableJob.ToString(), refreshableJob.ExecutionTime));
270
271        var failedJobs = refreshableJob.HiveTasks.Where(x => x.Task.State == TaskState.Failed);
272        if (failedJobs.Count() > 0) {
273          throw new HiveEngineException("Task failed: " + failedJobs.First().Task.StateLog.Last().Exception);
274        }
275
276        // get scopes
277        int j = 0;
278        foreach (var hiveJob in refreshableJob.HiveTasks) {
279          var scope = ((IAtomicOperation)((EngineTask)hiveJob.ItemTask).InitialOperation).Scope;
280          scopes[j++] = scope;
281        }
282        return scopes;
283      }
284      catch (OperationCanceledException e) {
285        throw e;
286      }
287      catch (Exception e) {
288        log.LogException(e);
289        throw e;
290      } finally {
291        DisposeJob(refreshableJob);
292      }
293    }
294
295    private RefreshableJob CreateJob() {
296      lock (locker) {
297        var hiveExperiment = new Job();
298        hiveExperiment.Name = "HiveEngine Run " + jobs.Count;
299        hiveExperiment.DateCreated = DateTime.Now;
300        hiveExperiment.ResourceNames = this.ResourceNames;
301        hiveExperiment.IsPrivileged = this.IsPrivileged;
302        var refreshableHiveExperiment = new RefreshableJob(hiveExperiment);
303        refreshableHiveExperiment.IsDownloadable = false; // download happens automatically so disable button
304        jobs.Add(refreshableHiveExperiment);
305        return refreshableHiveExperiment;
306      }
307    }
308
309    private void DisposeJob(RefreshableJob refreshableJob) {
310      refreshableJob.RefreshAutomatically = false;
311      DeleteHiveExperiment(refreshableJob.Job.Id);
312      ClearData(refreshableJob);
313    }
314
315    private void ClearData(RefreshableJob refreshableJob) {
316      var jobs = refreshableJob.GetAllHiveTasks();
317      foreach (var job in jobs) {
318        job.ClearData();
319      }
320    }
321
322    private void DeleteHiveExperiment(Guid jobId) {
323      HiveClient.TryAndRepeat(() => {
324        HiveServiceLocator.Instance.CallHiveService(s => s.DeleteJob(jobId));
325      }, 5, string.Format("Could not delete jobs"));
326    }
327
328    private List<Guid> GetResourceIds() {
329      return HiveServiceLocator.Instance.CallHiveService(service => {
330        var resourceNames = ResourceNames.Split(';');
331        var resourceIds = new List<Guid>();
332        foreach (var resourceName in resourceNames) {
333          Guid resourceId = service.GetResourceId(resourceName);
334          if (resourceId == Guid.Empty) {
335            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
336          }
337          resourceIds.Add(resourceId);
338        }
339        return resourceIds;
340      });
341    }
342  }
343}
Note: See TracBrowser for help on using the repository browser.