Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs @ 5153

Last change on this file since 5153 was 5153, checked in by cneumuel, 13 years ago

#1260

  • increased timeouts for sent jobs (which are needed if the jobs take long time to deserialize on slave)
  • added DeleteJob to ClientService
  • made optimizer actually Pause instead of Stop when stop is called explicitly (so they can be resumed later)
  • temporarily disabled job-abortion from server because it aborted jobs which took too long to deserialize on slaves (this issue needs to be investigated)
  • reduced locking of engines on slave so that the deserialization does not block heartbeats

#1347

  • worked on HiveEngine
  • added test project for HiveEngine
File size: 7.3 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
6using HeuristicLab.Core;
7using HeuristicLab.Common;
8using HeuristicLab.Hive.Contracts.Interfaces;
9using HeuristicLab.Clients.Common;
10using HeuristicLab.Hive.ExperimentManager;
11using HeuristicLab.Hive.Contracts.BusinessObjects;
12using HeuristicLab.PluginInfrastructure;
13using HeuristicLab.Hive.Contracts.ResponseObjects;
14using System.Threading;
15using HeuristicLab.Random;
16
17namespace HeuristicLab.HiveEngine {
18  /// <summary>
19  /// Represents an engine that executes operations which can be executed in parallel on the hive
20  /// </summary>
21  [StorableClass]
22  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
23  public class HiveEngine : Engine {
24    [Storable]
25    private IOperator currentOperator;
26
27    [Storable]
28    public string ResourceIds { get; set; }
29
30    #region constructors and cloning
31    public HiveEngine() {
32      ResourceIds = "HEAL";
33    }
34    [StorableConstructor]
35    protected HiveEngine(bool deserializing) : base(deserializing) { }
36    protected HiveEngine(HiveEngine original, Cloner cloner)
37      : base(original, cloner) {
38      this.ResourceIds = original.ResourceIds;
39      this.currentOperator = cloner.Clone(original.currentOperator);
40    }
41    public override IDeepCloneable Clone(Cloner cloner) {
42      return new HiveEngine(this, cloner);
43    }
44    #endregion
45
46    protected override void ProcessNextOperation() {
47      currentOperator = null;
48      IOperation next = ExecutionStack.Pop();
49      OperationCollection coll = next as OperationCollection;
50
51      while (coll != null) {
52        if (coll.Parallel) {
53          IDictionary<IOperation, OperationJob> jobs = new Dictionary<IOperation, OperationJob>();
54          foreach (IOperation op in coll) {
55            jobs.Add(op, new OperationJob(op));
56          }
57
58          ExecuteOnHive(jobs);
59
60          foreach (var kvp in jobs) {
61            if (kvp.Key is IAtomicOperation) {
62              ReIntegrateScope(kvp.Value.Operation as IAtomicOperation, kvp.Key as IAtomicOperation);
63            } else if (kvp.Key is OperationCollection) {
64              // todo
65            }
66          }
67        } else {
68          for (int i = coll.Count - 1; i >= 0; i--)
69            ExecutionStack.Push(coll[i]);
70        }
71        next = ExecutionStack.Count > 0 ? ExecutionStack.Pop() : null;
72        coll = next as OperationCollection;
73      }
74
75      IAtomicOperation operation = next as IAtomicOperation;
76      if (operation != null) {
77        try {
78          currentOperator = operation.Operator;
79          ExecutionStack.Push(operation.Operator.Execute((IExecutionContext)operation));
80        }
81        catch (Exception ex) {
82          ExecutionStack.Push(operation);
83          OnExceptionOccurred(new OperatorExecutionException(operation.Operator, ex));
84          Pause();
85        }
86        if (operation.Operator.Breakpoint) {
87          Log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
88          Pause();
89        }
90      }
91    }
92
93    private IRandom FindRandomParameter(IExecutionContext ec) {
94      try {
95        if (ec == null)
96          return null;
97
98        foreach (var p in ec.Parameters) {
99          if (p.Name == "Random" && p is IValueParameter)
100            return ((IValueParameter)p).Value as IRandom;
101        }
102        return FindRandomParameter(ec.Parent);
103      }
104      catch { return null; }
105    }
106
107    private static void ReIntegrateScope(IAtomicOperation source, IAtomicOperation target) {
108      ExchangeScope(source.Scope, target.Scope);
109    }
110
111    private static void ExchangeScope(IScope source, IScope target) {
112      target.Variables.Clear();
113      target.Variables.AddRange(source.Variables);
114      target.SubScopes.Clear();
115      target.SubScopes.AddRange(source.SubScopes);
116      // TODO: validate if parent scopes match - otherwise source is invalid
117    }
118
119    /// <summary>
120    /// This method blocks until all jobs are finished
121    /// </summary>
122    /// <param name="jobDict"></param>
123    private void ExecuteOnHive(IDictionary<IOperation, OperationJob> jobDict) {
124      Log.LogMessage(string.Format("Executing {0} operations on the hive.", jobDict.Count));
125      IDictionary<Guid, IOperation> jobIds = new Dictionary<Guid, IOperation>();
126      JobResultList results;
127
128      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
129        List<JobDto> jobs = new List<JobDto>();
130        foreach (var kvp in jobDict) {
131          // shuffle random variable to avoid the same random sequence in each operation
132          IRandom random = FindRandomParameter(kvp.Key as IExecutionContext);
133          if (random != null)
134            random.Reset(random.Next());
135
136          var groups = ResourceIds.Split(';');
137          SerializedJob serializedJob = new SerializedJob();
138          serializedJob.SerializedJobData = SerializedJob.Serialize(kvp.Value);
139          serializedJob.JobInfo = new JobDto();
140          serializedJob.JobInfo.State = JobState.Offline;
141          serializedJob.JobInfo.CoresNeeded = 1;
142          serializedJob.JobInfo.PluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList();
143          ResponseObject<JobDto> response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
144          jobs.Add(response.Obj);
145          jobIds.Add(response.Obj.Id, kvp.Key);
146        }
147        results = service.Obj.GetJobResults(jobIds.Keys).Obj;
148      }
149
150      while (!results.All(
151          x => x.State == JobState.Finished ||
152          x.State == JobState.Failed ||
153          x.State == JobState.Aborted)) {
154        Thread.Sleep(5000);
155        using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
156          results = service.Obj.GetJobResults(jobIds.Keys).Obj;
157        }
158      }
159
160      // all finished
161      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
162        foreach (Guid jobId in jobIds.Keys) {
163          SerializedJob serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
164          OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData);
165          jobDict[jobIds[jobId]] = operationJob;
166        }
167      }
168
169      // delete jobs
170      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
171        foreach (Guid jobId in jobIds.Keys) {
172          service.Obj.DeleteJob(jobId);
173        }
174      }
175
176      Log.LogMessage(string.Format("Operations on the hive finished.", jobDict.Count));
177    }
178
179    public override void Pause() {
180      base.Pause();
181      if (currentOperator != null) currentOperator.Abort();
182    }
183    public override void Stop() {
184      base.Stop();
185      if (currentOperator != null) currentOperator.Abort();
186    }
187
188  }
189}
Note: See TracBrowser for help on using the repository browser.