Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.MetaOptimization/HeuristicLab.HiveEngine/3.3/HiveEngine.cs @ 15065

Last change on this file since 15065 was 14913, checked in by jkarder, 8 years ago

#2784: fixed hive engine

File size: 12.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive;
28using HeuristicLab.Common;
29using HeuristicLab.Core;
30using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
31
32namespace HeuristicLab.HiveEngine {
33  /// <summary>
34  /// Represents an engine that executes operations which can be executed in parallel on the hive
35  /// </summary>
36  [StorableClass]
37  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
38  public class HiveEngine : Engine {
39    private static object locker = new object();
40    private CancellationToken cancellationToken;
41    private bool firstRun = true;
42
43    [Storable]
44    private IOperator currentOperator;
45
46    [Storable]
47    public string ResourceNames { get; set; }
48
49    [Storable]
50    private int priority;
51    public int Priority {
52      get { return priority; }
53      set { priority = value; }
54    }
55
56    [Storable]
57    private TimeSpan executionTimeOnHive;
58    public TimeSpan ExecutionTimeOnHive {
59      get { return executionTimeOnHive; }
60      set {
61        if (value != executionTimeOnHive) {
62          executionTimeOnHive = value;
63          OnExecutionTimeOnHiveChanged();
64        }
65      }
66    }
67
68    [Storable]
69    private bool isPrivileged;
70    public bool IsPrivileged {
71      get { return isPrivileged; }
72      set { isPrivileged = value; }
73    }
74
75    // Task can't be storable, so RefreshableHiveExperiment can't be stored. But as previous runs are only informative it does not matter (only execution time on hive will be wrong because of that -> Todo)
76    private ItemCollection<RefreshableJob> jobs = new ItemCollection<RefreshableJob>();
77    public ItemCollection<RefreshableJob> Jobs {
78      get { return jobs; }
79      set { jobs = value; }
80    }
81
82    private List<Plugin> onlinePlugins;
83    public List<Plugin> OnlinePlugins {
84      get { return onlinePlugins; }
85      set { onlinePlugins = value; }
86    }
87
88    private List<Plugin> alreadyUploadedPlugins;
89    public List<Plugin> AlreadyUploadedPlugins {
90      get { return alreadyUploadedPlugins; }
91      set { alreadyUploadedPlugins = value; }
92    }
93
94    #region constructors and cloning
95    public HiveEngine() {
96      this.ResourceNames = "HEAL";
97      this.Priority = 0;
98      this.log = new ThreadSafeLog();
99    }
100
101    [StorableConstructor]
102    protected HiveEngine(bool deserializing) : base(deserializing) { }
103    protected HiveEngine(HiveEngine original, Cloner cloner)
104      : base(original, cloner) {
105      this.ResourceNames = original.ResourceNames;
106      this.currentOperator = cloner.Clone(original.currentOperator);
107      this.priority = original.priority;
108      this.executionTimeOnHive = original.executionTimeOnHive;
109      this.IsPrivileged = original.IsPrivileged;
110      // do not clone jobs - otherwise they would be sent with every task
111    }
112    public override IDeepCloneable Clone(Cloner cloner) {
113      return new HiveEngine(this, cloner);
114    }
115    #endregion
116
117    #region Events
118    protected override void OnPrepared() {
119      base.OnPrepared();
120      this.ExecutionTimeOnHive = TimeSpan.Zero;
121    }
122
123    public event EventHandler ExecutionTimeOnHiveChanged;
124    protected virtual void OnExecutionTimeOnHiveChanged() {
125      var handler = ExecutionTimeOnHiveChanged;
126      if (handler != null) handler(this, EventArgs.Empty);
127    }
128    #endregion
129
130    protected override void Run(CancellationToken cancellationToken) {
131      this.cancellationToken = cancellationToken;
132      Run(ExecutionStack);
133    }
134
135    private void Run(object state) {
136      Stack<IOperation> executionStack = (Stack<IOperation>)state;
137      IOperation next;
138      OperationCollection coll;
139      IAtomicOperation operation;
140
141      if (firstRun) {
142        TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
143        this.OnlinePlugins = HiveServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.Hash != null).ToList();
144        this.AlreadyUploadedPlugins = new List<Plugin>();
145        firstRun = false;
146      }
147
148      while (executionStack.Count > 0) {
149        cancellationToken.ThrowIfCancellationRequested();
150
151        next = executionStack.Pop();
152        if (next is OperationCollection) {
153          coll = (OperationCollection)next;
154
155          if (coll.Parallel) {
156            try {
157              // clone the parent scope here and reuse it for each operation. otherwise for each task the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
158              IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
159              parentScopeClone.SubScopes.Clear();
160              parentScopeClone.ClearParentScopes();
161
162              EngineTask[] jobs = new EngineTask[coll.Count];
163              for (int i = 0; i < coll.Count; i++) {
164                jobs[i] = new EngineTask(coll[i], new SequentialEngine.SequentialEngine());
165              }
166
167              var experiment = CreateJob();
168              IScope[] scopes = ExecuteOnHive(experiment, jobs, parentScopeClone, cancellationToken);
169
170              for (int i = 0; i < coll.Count; i++) {
171                if (coll[i] is IAtomicOperation) {
172                  ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
173                } else if (coll[i] is OperationCollection) {
174                  // todo ??
175                }
176              }
177            }
178            catch {
179              executionStack.Push(coll); throw;
180            }
181          } else {
182            for (int i = coll.Count - 1; i >= 0; i--)
183              if (coll[i] != null) executionStack.Push(coll[i]);
184          }
185        } else if (next is IAtomicOperation) {
186          operation = (IAtomicOperation)next;
187          try {
188            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
189          }
190          catch (Exception ex) {
191            executionStack.Push(operation);
192            if (ex is OperationCanceledException) throw ex;
193            else throw new OperatorExecutionException(operation.Operator, ex);
194          }
195          if (next != null) executionStack.Push(next);
196
197          if (operation.Operator.Breakpoint) {
198            log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
199            Pause();
200          }
201        }
202      }
203    }
204
205    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
206      e.SetObserved(); // avoid crash of process
207    }
208
209    private IRandom FindRandomParameter(IExecutionContext ec) {
210      try {
211        if (ec == null)
212          return null;
213
214        foreach (var p in ec.Parameters) {
215          if (p.Name == "Random" && p is IValueParameter)
216            return ((IValueParameter)p).Value as IRandom;
217        }
218        return FindRandomParameter(ec.Parent);
219      }
220      catch { return null; }
221    }
222
223    private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
224      ExchangeScope(source.Scope, target.Scope);
225    }
226
227    private static void ExchangeScope(IScope source, IScope target) {
228      target.Variables.Clear();
229      target.Variables.AddRange(source.Variables);
230      target.SubScopes.Clear();
231      target.SubScopes.AddRange(source.SubScopes);
232      // TODO: validate if parent scopes match - otherwise source is invalid
233    }
234
235    /// <summary>
236    /// This method blocks until all jobs are finished
237    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
238    /// </summary>
239    /// <param name="jobs"></param>
240    private IScope[] ExecuteOnHive(RefreshableJob refreshableJob, EngineTask[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
241      log.LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
242      IScope[] scopes = new Scope[jobs.Length];
243      object locker = new object();
244      var hiveExperiment = refreshableJob.Job;
245
246      try {
247        // create upload-tasks
248        for (int i = 0; i < jobs.Length; i++) {
249          var engineHiveJob = new EngineHiveTask(jobs[i], parentScopeClone);
250          engineHiveJob.Task.Priority = this.Priority;
251          refreshableJob.HiveTasks.Add(engineHiveJob);
252
253          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
254          IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext);
255          if (random != null)
256            random.Reset(random.Next());
257        }
258        HiveClient.StartJob((e) => { log.LogException(e); }, refreshableJob, cancellationToken);
259
260        // do polling until experiment is finished and all jobs are downloaded
261        while (!refreshableJob.IsFinished()) {
262          Thread.Sleep(2000);
263          this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(jobs.Sum(x => x.ExecutionTime.TotalMilliseconds));
264          cancellationToken.ThrowIfCancellationRequested();
265        }
266        log.LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableJob.ToString(), refreshableJob.ExecutionTime));
267
268        var failedJobs = refreshableJob.HiveTasks.Where(x => x.Task.State == TaskState.Failed);
269        if (failedJobs.Count() > 0) {
270          throw new HiveEngineException("Task failed: " + failedJobs.First().Task.StateLog.Last().Exception);
271        }
272
273        // get scopes
274        int j = 0;
275        foreach (var hiveJob in refreshableJob.HiveTasks) {
276          var scope = ((IAtomicOperation)((EngineTask)hiveJob.ItemTask).InitialOperation).Scope;
277          scopes[j++] = scope;
278        }
279        return scopes;
280      }
281      catch (OperationCanceledException e) {
282        throw e;
283      }
284      catch (Exception e) {
285        log.LogException(e);
286        throw e;
287      }
288      finally {
289        DisposeJob(refreshableJob);
290      }
291    }
292
293    private RefreshableJob CreateJob() {
294      lock (locker) {
295        var hiveExperiment = new Job();
296        hiveExperiment.Name = "HiveEngine Run " + jobs.Count;
297        hiveExperiment.DateCreated = DateTime.Now;
298        hiveExperiment.ResourceNames = this.ResourceNames;
299        var refreshableHiveExperiment = new RefreshableJob(hiveExperiment);
300        refreshableHiveExperiment.IsDownloadable = false; // download happens automatically so disable button
301        jobs.Add(refreshableHiveExperiment);
302        return refreshableHiveExperiment;
303      }
304    }
305
306    private void DisposeJob(RefreshableJob refreshableJob) {
307      refreshableJob.RefreshAutomatically = false;
308      DeleteHiveExperiment(refreshableJob.Job.Id);
309      ClearData(refreshableJob);
310    }
311
312    private void ClearData(RefreshableJob refreshableJob) {
313      var jobs = refreshableJob.GetAllHiveTasks();
314      foreach (var job in jobs) {
315        job.ClearData();
316      }
317    }
318
319    private void DeleteHiveExperiment(Guid jobId) {
320      HiveClient.TryAndRepeat(() => {
321        HiveServiceLocator.Instance.CallHiveService(s => s.DeleteJob(jobId));
322      }, 5, string.Format("Could not delete jobs"));
323    }
324
325    private List<Guid> GetResourceIds() {
326      return HiveServiceLocator.Instance.CallHiveService(service => {
327        var resourceNames = ResourceNames.Split(';');
328        var resourceIds = new List<Guid>();
329        foreach (var resourceName in resourceNames) {
330          Guid resourceId = service.GetResourceId(resourceName);
331          if (resourceId == Guid.Empty) {
332            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
333          }
334          resourceIds.Add(resourceId);
335        }
336        return resourceIds;
337      });
338    }
339  }
340}
Note: See TracBrowser for help on using the repository browser.