source: addons/HeuristicLab.MetaOptimization/HeuristicLab.HiveEngine/3.3/HiveEngine.cs @ 16173

Last change on this file since 16173 was 16173, checked in by jkarder, 3 years ago

#2839:

  • fixed compilation errors in HiveEngine
  • minor changes
File size: 12.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive;
28using HeuristicLab.Common;
29using HeuristicLab.Core;
30using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
31
32namespace HeuristicLab.HiveEngine {
33  /// <summary>
34  /// Represents an engine that executes operations which can be executed in parallel on the hive
35  /// </summary>
36  [StorableClass]
37  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
38  public class HiveEngine : Engine {
39    private static object locker = new object();
40    private CancellationToken cancellationToken;
41    private bool firstRun = true;
42
43    [Storable]
44    private IOperator currentOperator;
45
46    [Storable]
47    public Guid ProjectId { get; set; }
48
49    [Storable]
50    public IEnumerable<Guid> ResourceIds { get; set; }
51
52    [Storable]
53    private int priority;
54    public int Priority {
55      get { return priority; }
56      set { priority = value; }
57    }
58
59    [Storable]
60    private TimeSpan executionTimeOnHive;
61    public TimeSpan ExecutionTimeOnHive {
62      get { return executionTimeOnHive; }
63      set {
64        if (value != executionTimeOnHive) {
65          executionTimeOnHive = value;
66          OnExecutionTimeOnHiveChanged();
67        }
68      }
69    }
70
71    [Storable]
72    private bool isPrivileged;
73    public bool IsPrivileged {
74      get { return isPrivileged; }
75      set { isPrivileged = value; }
76    }
77
78    // Task can't be storable, so RefreshableHiveExperiment can't be stored. But as previous runs are only informative it does not matter (only execution time on hive will be wrong because of that -> Todo)
79    private ItemCollection<RefreshableJob> jobs = new ItemCollection<RefreshableJob>();
80    public ItemCollection<RefreshableJob> Jobs {
81      get { return jobs; }
82      set { jobs = value; }
83    }
84
85    private List<Plugin> onlinePlugins;
86    public List<Plugin> OnlinePlugins {
87      get { return onlinePlugins; }
88      set { onlinePlugins = value; }
89    }
90
91    private List<Plugin> alreadyUploadedPlugins;
92    public List<Plugin> AlreadyUploadedPlugins {
93      get { return alreadyUploadedPlugins; }
94      set { alreadyUploadedPlugins = value; }
95    }
96
97    #region constructors and cloning
98    public HiveEngine() {
99      this.ProjectId = Guid.Empty;
100      this.ResourceIds = new List<Guid>();
101      this.Priority = 0;
102      this.log = new ThreadSafeLog();
103    }
104
105    [StorableConstructor]
106    protected HiveEngine(bool deserializing) : base(deserializing) { }
107    protected HiveEngine(HiveEngine original, Cloner cloner)
108      : base(original, cloner) {
109      this.ProjectId = original.ProjectId;
110      this.ResourceIds = original.ResourceIds.ToList();
111      this.currentOperator = cloner.Clone(original.currentOperator);
112      this.priority = original.priority;
113      this.executionTimeOnHive = original.executionTimeOnHive;
114      this.IsPrivileged = original.IsPrivileged;
115      // do not clone jobs - otherwise they would be sent with every task
116    }
117    public override IDeepCloneable Clone(Cloner cloner) {
118      return new HiveEngine(this, cloner);
119    }
120    #endregion
121
122    #region Events
123    protected override void OnPrepared() {
124      base.OnPrepared();
125      this.ExecutionTimeOnHive = TimeSpan.Zero;
126    }
127
128    public event EventHandler ExecutionTimeOnHiveChanged;
129    protected virtual void OnExecutionTimeOnHiveChanged() {
130      var handler = ExecutionTimeOnHiveChanged;
131      if (handler != null) handler(this, EventArgs.Empty);
132    }
133    #endregion
134
135    protected override void Run(CancellationToken cancellationToken) {
136      this.cancellationToken = cancellationToken;
137      Run(ExecutionStack);
138    }
139
140    private void Run(object state) {
141      Stack<IOperation> executionStack = (Stack<IOperation>)state;
142      IOperation next;
143      OperationCollection coll;
144      IAtomicOperation operation;
145
146      if (firstRun) {
147        TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
148        this.OnlinePlugins = HiveServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.Hash != null).ToList();
149        this.AlreadyUploadedPlugins = new List<Plugin>();
150        firstRun = false;
151      }
152
153      if (ProjectId == Guid.Empty)
154        throw new ArgumentException("Cannot run HiveEngine. No project has been specified.");
155
156      if (!ResourceIds.Any())
157        throw new ArgumentException("Cannot run HiveEngine. No resources have been specified.");
158
159      while (executionStack.Count > 0) {
160        cancellationToken.ThrowIfCancellationRequested();
161
162        next = executionStack.Pop();
163        if (next is OperationCollection) {
164          coll = (OperationCollection)next;
165
166          if (coll.Parallel) {
167            try {
168              // clone the parent scope here and reuse it for each operation. otherwise for each task the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
169              IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
170              parentScopeClone.SubScopes.Clear();
171              parentScopeClone.ClearParentScopes();
172
173              EngineTask[] jobs = new EngineTask[coll.Count];
174              for (int i = 0; i < coll.Count; i++) {
175                jobs[i] = new EngineTask(coll[i], new SequentialEngine.SequentialEngine());
176              }
177
178              var experiment = CreateJob();
179              IScope[] scopes = ExecuteOnHive(experiment, jobs, parentScopeClone, cancellationToken);
180
181              for (int i = 0; i < coll.Count; i++) {
182                if (coll[i] is IAtomicOperation) {
183                  ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
184                } else if (coll[i] is OperationCollection) {
185                  // todo ??
186                }
187              }
188            } catch {
189              executionStack.Push(coll); throw;
190            }
191          } else {
192            for (int i = coll.Count - 1; i >= 0; i--)
193              if (coll[i] != null) executionStack.Push(coll[i]);
194          }
195        } else if (next is IAtomicOperation) {
196          operation = (IAtomicOperation)next;
197          try {
198            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
199          } catch (Exception ex) {
200            executionStack.Push(operation);
201            if (ex is OperationCanceledException) throw ex;
202            else throw new OperatorExecutionException(operation.Operator, ex);
203          }
204          if (next != null) executionStack.Push(next);
205
206          if (operation.Operator.Breakpoint) {
207            log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
208            Pause();
209          }
210        }
211      }
212    }
213
214    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
215      e.SetObserved(); // avoid crash of process
216    }
217
218    private IRandom FindRandomParameter(IExecutionContext ec) {
219      try {
220        if (ec == null)
221          return null;
222
223        foreach (var p in ec.Parameters) {
224          if (p.Name == "Random" && p is IValueParameter)
225            return ((IValueParameter)p).Value as IRandom;
226        }
227        return FindRandomParameter(ec.Parent);
228      } catch { return null; }
229    }
230
231    private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
232      ExchangeScope(source.Scope, target.Scope);
233    }
234
235    private static void ExchangeScope(IScope source, IScope target) {
236      target.Variables.Clear();
237      target.Variables.AddRange(source.Variables);
238      target.SubScopes.Clear();
239      target.SubScopes.AddRange(source.SubScopes);
240      // TODO: validate if parent scopes match - otherwise source is invalid
241    }
242
243    /// <summary>
244    /// This method blocks until all jobs are finished
245    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
246    /// </summary>
247    /// <param name="jobs"></param>
248    private IScope[] ExecuteOnHive(RefreshableJob refreshableJob, EngineTask[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
249      log.LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
250      IScope[] scopes = new Scope[jobs.Length];
251      object locker = new object();
252      var hiveExperiment = refreshableJob.Job;
253
254      try {
255        // create upload-tasks
256        for (int i = 0; i < jobs.Length; i++) {
257          var engineHiveJob = new EngineHiveTask(jobs[i], parentScopeClone);
258          engineHiveJob.Task.Priority = this.Priority;
259          refreshableJob.HiveTasks.Add(engineHiveJob);
260
261          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
262          IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext);
263          if (random != null)
264            random.Reset(random.Next());
265        }
266
267        HiveClient.StartJob((e) => { log.LogException(e); }, refreshableJob, cancellationToken);
268
269        // do polling until experiment is finished and all jobs are downloaded
270        while (!refreshableJob.IsFinished()) {
271          if (!refreshableJob.RefreshAutomatically && refreshableJob.Id != Guid.Empty)
272            refreshableJob.RefreshAutomatically = true;
273
274          Thread.Sleep(2000);
275
276          this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(jobs.Sum(x => x.ExecutionTime.TotalMilliseconds));
277          cancellationToken.ThrowIfCancellationRequested();
278        }
279
280        log.LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableJob.ToString(), refreshableJob.ExecutionTime));
281
282        var failedJobs = refreshableJob.HiveTasks.Where(x => x.Task.State == TaskState.Failed);
283        if (failedJobs.Count() > 0) {
284          throw new HiveEngineException("Task failed: " + failedJobs.First().Task.StateLog.Last().Exception);
285        }
286
287        // get scopes
288        int j = 0;
289        foreach (var hiveJob in refreshableJob.HiveTasks) {
290          var scope = ((IAtomicOperation)((EngineTask)hiveJob.ItemTask).InitialOperation).Scope;
291          scopes[j++] = scope;
292        }
293
294        return scopes;
295      } catch (OperationCanceledException e) {
296        throw e;
297      } catch (Exception e) {
298        log.LogException(e);
299        throw e;
300      } finally {
301        DisposeJob(refreshableJob);
302      }
303    }
304
305    private RefreshableJob CreateJob() {
306      lock (locker) {
307        var hiveExperiment = new Job {
308          Name = "HiveEngine Run " + jobs.Count,
309          DateCreated = DateTime.Now,
310          ProjectId = ProjectId,
311          ResourceIds = ResourceIds.ToList(),
312        };
313
314        var refreshableHiveExperiment = new RefreshableJob(hiveExperiment) {
315          RefreshAutomatically = false,
316          IsDownloadable = false // download happens automatically later on so disable button
317        };
318
319        jobs.Add(refreshableHiveExperiment);
320
321        return refreshableHiveExperiment;
322      }
323    }
324
325    private void DisposeJob(RefreshableJob refreshableJob) {
326      refreshableJob.RefreshAutomatically = false;
327      DeleteHiveExperiment(refreshableJob.Job.Id);
328      ClearData(refreshableJob);
329    }
330
331    private void ClearData(RefreshableJob refreshableJob) {
332      var jobs = refreshableJob.GetAllHiveTasks();
333      foreach (var job in jobs) {
334        job.ClearData();
335      }
336    }
337
338    private void DeleteHiveExperiment(Guid jobId) {
339      HiveClient.TryAndRepeat(() => {
340        HiveServiceLocator.Instance.CallHiveService(s => s.UpdateJobState(jobId, JobState.StatisticsPending));
341      }, 5, string.Format("Could not delete jobs"));
342    }
343  }
344}
Note: See TracBrowser for help on using the repository browser.