Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.HiveEngine/3.4/HiveEngine.cs @ 6444

Last change on this file since 6444 was 6444, checked in by cneumuel, 13 years ago

#1233

  • stability improvements for HiveExperiment and HiveEngine
  • parallelized upload of jobs
  • enabled cancellation of job upload
  • reduced the amount of double-assignment of jobs by an additional check in HeartbeatManager
  • tried to tackle the amount of deadlocks by automatically rerunning transactions
  • some fixes
File size: 13.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive;
28using HeuristicLab.Common;
29using HeuristicLab.Core;
30using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
31
32namespace HeuristicLab.HiveEngine {
33  /// <summary>
34  /// Represents an engine that executes operations which can be executed in parallel on the hive
35  /// </summary>
36  [StorableClass]
37  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
38  public class HiveEngine : Engine {
39    private static object locker = new object();
40    private CancellationToken cancellationToken;
41    private bool firstRun = true;
42
43    [Storable]
44    private IOperator currentOperator;
45
46    [Storable]
47    public string ResourceNames { get; set; }
48
49    [Storable]
50    private int priority;
51    public int Priority {
52      get { return priority; }
53      set { priority = value; }
54    }
55
56    [Storable]
57    private TimeSpan executionTimeOnHive;
58    public TimeSpan ExecutionTimeOnHive {
59      get { return executionTimeOnHive; }
60      set {
61        if (value != executionTimeOnHive) {
62          executionTimeOnHive = value;
63          OnExecutionTimeOnHiveChanged();
64        }
65      }
66    }
67
68    [Storable]
69    private bool isPrivileged;
70    public bool IsPrivileged {
71      get { return isPrivileged; }
72      set { isPrivileged = value; }
73    }
74
75    // HiveExperiment can't be storable, so RefreshableHiveExperiment can't be stored. But as previous runs are only informative it does not matter (only execution time on hive will be wrong because of that -> Todo)
76    private ItemCollection<RefreshableHiveExperiment> hiveExperiments = new ItemCollection<RefreshableHiveExperiment>();
77    public ItemCollection<RefreshableHiveExperiment> HiveExperiments {
78      get { return hiveExperiments; }
79      set { hiveExperiments = value; }
80    }
81
82    private List<Plugin> onlinePlugins;
83    public List<Plugin> OnlinePlugins {
84      get { return onlinePlugins; }
85      set { onlinePlugins = value; }
86    }
87
88    private List<Plugin> alreadyUploadedPlugins;
89    public List<Plugin> AlreadyUploadedPlugins {
90      get { return alreadyUploadedPlugins; }
91      set { alreadyUploadedPlugins = value; }
92    }
93
94    #region constructors and cloning
95    public HiveEngine() {
96      ResourceNames = "HEAL";
97      Priority = 0;
98      this.log = new ThreadSafeLog(this.log);
99    }
100
101    [StorableConstructor]
102    protected HiveEngine(bool deserializing) : base(deserializing) { }
103    protected HiveEngine(HiveEngine original, Cloner cloner)
104      : base(original, cloner) {
105      this.ResourceNames = original.ResourceNames;
106      this.currentOperator = cloner.Clone(original.currentOperator);
107      this.priority = original.priority;
108      this.executionTimeOnHive = original.executionTimeOnHive;
109      this.IsPrivileged = original.IsPrivileged;
110      // do not clone hiveExperiments - otherwise they would be sent with every job
111    }
112    public override IDeepCloneable Clone(Cloner cloner) {
113      return new HiveEngine(this, cloner);
114    }
115    #endregion
116
117    #region Events
118    protected override void OnPrepared() {
119      base.OnPrepared();
120      this.ExecutionTimeOnHive = TimeSpan.Zero;
121    }
122
123    public event EventHandler ExecutionTimeOnHiveChanged;
124    protected virtual void OnExecutionTimeOnHiveChanged() {
125      var handler = ExecutionTimeOnHiveChanged;
126      if (handler != null) handler(this, EventArgs.Empty);
127    }
128    #endregion
129
130    protected override void Run(CancellationToken cancellationToken) {
131      this.cancellationToken = cancellationToken;
132      Run(ExecutionStack);
133    }
134
135    private void Run(object state) {
136      Stack<IOperation> executionStack = (Stack<IOperation>)state;
137      IOperation next;
138      OperationCollection coll;
139      IAtomicOperation operation;
140
141      if (firstRun) {
142        TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
143        this.OnlinePlugins = ServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.Hash != null).ToList();
144        this.AlreadyUploadedPlugins = new List<Plugin>();
145        firstRun = false;
146      }
147
148      while (executionStack.Count > 0) {
149        cancellationToken.ThrowIfCancellationRequested();
150
151        next = executionStack.Pop();
152        if (next is OperationCollection) {
153          coll = (OperationCollection)next;
154
155          if (coll.Parallel) {
156            try {
157              // clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
158              IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
159              parentScopeClone.SubScopes.Clear();
160              parentScopeClone.ClearParentScopes();
161
162              EngineJob[] jobs = new EngineJob[coll.Count];
163              for (int i = 0; i < coll.Count; i++) {
164                jobs[i] = new EngineJob(coll[i], new SequentialEngine.SequentialEngine());
165              }
166
167              var experiment = CreateHiveExperiment();
168              IScope[] scopes = ExecuteOnHive(experiment, jobs, parentScopeClone, cancellationToken);
169
170              for (int i = 0; i < coll.Count; i++) {
171                if (coll[i] is IAtomicOperation) {
172                  ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
173                } else if (coll[i] is OperationCollection) {
174                  // todo ??
175                }
176              }
177            }
178            catch {
179              executionStack.Push(coll); throw;
180            }
181          } else {
182            for (int i = coll.Count - 1; i >= 0; i--)
183              if (coll[i] != null) executionStack.Push(coll[i]);
184          }
185        } else if (next is IAtomicOperation) {
186          operation = (IAtomicOperation)next;
187          try {
188            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
189          }
190          catch (Exception ex) {
191            executionStack.Push(operation);
192            if (ex is OperationCanceledException) throw ex;
193            else throw new OperatorExecutionException(operation.Operator, ex);
194          }
195          if (next != null) executionStack.Push(next);
196
197          if (operation.Operator.Breakpoint) {
198            log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
199            Pause();
200          }
201        }
202      }
203    }
204
205    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
206      e.SetObserved(); // avoid crash of process
207    }
208
209    private IRandom FindRandomParameter(IExecutionContext ec) {
210      try {
211        if (ec == null)
212          return null;
213
214        foreach (var p in ec.Parameters) {
215          if (p.Name == "Random" && p is IValueParameter)
216            return ((IValueParameter)p).Value as IRandom;
217        }
218        return FindRandomParameter(ec.Parent);
219      }
220      catch { return null; }
221    }
222
223    private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
224      ExchangeScope(source.Scope, target.Scope);
225    }
226
227    private static void ExchangeScope(IScope source, IScope target) {
228      target.Variables.Clear();
229      target.Variables.AddRange(source.Variables);
230      target.SubScopes.Clear();
231      target.SubScopes.AddRange(source.SubScopes);
232      // TODO: validate if parent scopes match - otherwise source is invalid
233    }
234
235    /// <summary>
236    /// This method blocks until all jobs are finished
237    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
238    /// </summary>
239    /// <param name="jobs"></param>
240    private IScope[] ExecuteOnHive(RefreshableHiveExperiment refreshableHiveExperiment, EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
241      log.LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
242      IScope[] scopes = new Scope[jobs.Length];
243      object locker = new object();
244      var hiveExperiment = refreshableHiveExperiment.HiveExperiment;
245
246      try {
247        // create upload-tasks
248        for (int i = 0; i < jobs.Length; i++) {
249          var engineHiveJob = new EngineHiveJob(jobs[i], parentScopeClone);
250          engineHiveJob.Job.Priority = this.Priority;
251          hiveExperiment.HiveJobs.Add(engineHiveJob);
252
253          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
254          IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext);
255          if (random != null)
256            random.Reset(random.Next());
257        }
258        HiveClient.StartExperiment((e) => { log.LogException(e); }, refreshableHiveExperiment, cancellationToken);
259
260        // do polling until experiment is finished and all jobs are downloaded
261        while (!refreshableHiveExperiment.AllJobsFinished()) {
262          Thread.Sleep(2000);
263          this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(hiveExperiments.Sum(x => x.HiveExperiment.ExecutionTime.TotalMilliseconds));
264          cancellationToken.ThrowIfCancellationRequested();
265        }
266        log.LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableHiveExperiment.ToString(), refreshableHiveExperiment.HiveExperiment.ExecutionTime));
267
268        var failedJobs = hiveExperiment.HiveJobs.Where(x => x.Job.State == JobState.Failed);
269        if (failedJobs.Count() > 0) {
270          throw new HiveEngineException("Job failed: " + failedJobs.First().Job.StateLog.Last().Exception);
271        }
272
273        // get scopes
274        int j = 0;
275        foreach (var hiveJob in hiveExperiment.HiveJobs) {
276          var scope = ((IAtomicOperation)((EngineJob)hiveJob.ItemJob).InitialOperation).Scope;
277          scopes[j++] = scope;
278        }
279        return scopes;
280      }
281      catch (OperationCanceledException e) {
282        throw e;
283      }
284      catch (Exception e) {
285        log.LogException(e);
286        throw e;
287      }
288      finally {
289        DisposeHiveExperiment(refreshableHiveExperiment);
290      }
291    }
292
293    private RefreshableHiveExperiment CreateHiveExperiment() {
294      lock (locker) {
295        var hiveExperiment = new HiveExperiment();
296        hiveExperiment.Name = "HiveEngine Run " + hiveExperiments.Count;
297        hiveExperiment.DateCreated = DateTime.Now;
298        hiveExperiment.ResourceNames = this.ResourceNames;
299        hiveExperiment.IsPrivileged = this.IsPrivileged;
300        var refreshableHiveExperiment = new RefreshableHiveExperiment(hiveExperiment);
301        refreshableHiveExperiment.IsControllable = false;
302        hiveExperiments.Add(refreshableHiveExperiment);
303        return refreshableHiveExperiment;
304      }
305    }
306
307    private void DisposeHiveExperiment(RefreshableHiveExperiment refreshableHiveExperiment) {
308      refreshableHiveExperiment.RefreshAutomatically = false;
309      DeleteHiveExperiment(refreshableHiveExperiment.HiveExperiment.Id);
310      ClearData(refreshableHiveExperiment);
311    }
312
313    private void ClearData(RefreshableHiveExperiment refreshableHiveExperiment) {
314      var jobs = refreshableHiveExperiment.HiveExperiment.GetAllHiveJobs();
315      foreach (var job in jobs) {
316        job.ClearData();
317      }
318    }
319
320    private void DeleteHiveExperiment(Guid hiveExperimentId) {
321      HiveClient.TryAndRepeat(() => {
322        ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(hiveExperimentId));
323      }, 5, string.Format("Could not delete jobs"));
324    }
325
326    private List<Guid> GetResourceIds() {
327      return ServiceLocator.Instance.CallHiveService(service => {
328        var resourceNames = ResourceNames.Split(';');
329        var resourceIds = new List<Guid>();
330        foreach (var resourceName in resourceNames) {
331          Guid resourceId = service.GetResourceId(resourceName);
332          if (resourceId == Guid.Empty) {
333            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
334          }
335          resourceIds.Add(resourceId);
336        }
337        return resourceIds;
338      });
339    }
340  }
341}
Note: See TracBrowser for help on using the repository browser.