Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs @ 5136

Last change on this file since 5136 was 5136, checked in by cneumuel, 13 years ago

#1347

  • worked on HiveEngine
File size: 7.0 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
6using HeuristicLab.Core;
7using HeuristicLab.Common;
8using HeuristicLab.Hive.Contracts.Interfaces;
9using HeuristicLab.Clients.Common;
10using HeuristicLab.Hive.ExperimentManager;
11using HeuristicLab.Hive.Contracts.BusinessObjects;
12using HeuristicLab.PluginInfrastructure;
13using HeuristicLab.Hive.Contracts.ResponseObjects;
14using System.Threading;
15
16namespace HeuristicLab.HiveEngine {
17  /// <summary>
18  /// Represents an engine that executes operations which can be executed in parallel on the hive
19  /// </summary>
20  [StorableClass]
21  [Item("Hive Parallel Engine", "Engine for parallel execution on the hive.")]
22  public class HiveEngine : Engine {
23    private IOperator currentOperator;
24
25    #region constructors and cloning
26    public HiveEngine() { }
27    [StorableConstructor]
28    protected HiveEngine(bool deserializing) : base(deserializing) { }
29    protected HiveEngine(HiveEngine original, Cloner cloner)
30      : base(original, cloner) {
31    }
32    public override IDeepCloneable Clone(Cloner cloner) {
33      return new HiveEngine(this, cloner);
34    }
35    #endregion
36
37    protected override void ProcessNextOperation() {
38      currentOperator = null;
39      IOperation next = ExecutionStack.Pop();
40      OperationCollection coll = next as OperationCollection;
41
42      while (coll != null) {
43        if (coll.Parallel) {
44          IScope scope = ((IAtomicOperation)coll.Where(x => x is IAtomicOperation).First()).Scope;
45          IEnumerable<IOperation> finishedOperations = ExecuteOnHive(coll.Select(op => new OperationJob(op)));
46          foreach (IOperation op in finishedOperations) {
47            ReIntegrateScope(scope, op);
48          }
49        } else {
50          for (int i = coll.Count - 1; i >= 0; i--)
51            ExecutionStack.Push(coll[i]);
52        }
53        next = ExecutionStack.Count > 0 ? ExecutionStack.Pop() : null;
54        coll = next as OperationCollection;
55      }
56
57      IAtomicOperation operation = next as IAtomicOperation;
58      if (operation != null) {
59        try {
60          currentOperator = operation.Operator;
61          ExecutionStack.Push(operation.Operator.Execute((IExecutionContext)operation));
62        }
63        catch (Exception ex) {
64          ExecutionStack.Push(operation);
65          OnExceptionOccurred(new OperatorExecutionException(operation.Operator, ex));
66          Pause();
67        }
68        if (operation.Operator.Breakpoint) {
69          Log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
70          Pause();
71        }
72      }
73    }
74
75    private static void ReIntegrateScope(IScope scope, IOperation operation) {
76      if (operation is IAtomicOperation) {
77        MergeScopes(scope, ((IAtomicOperation)operation).Scope);
78      } else if (operation is OperationCollection) {
79        foreach (IOperation op in (OperationCollection)operation) {
80          ReIntegrateScope(scope, op);
81        }
82      }
83    }
84
85    private static void MergeScopes(IScope target, IScope source) {
86      foreach (IVariable variable in source.Variables) {
87        if (target.Variables.ContainsKey(variable.Name)) {
88          if (target.Variables[variable.Name] != source.Variables[variable.Name]) {
89            // problem: ResultCollection are not equal altough they contain the same values.
90            //             - either implement ResultCollection.Equals, or consider ResultCollection as special case here
91            //throw new ScopeMergeException(string.Format("Variable {0} already exists in target scope and has a different value ({1}, {2}).", variable.Name, target.Variables[variable.Name].Value, variable.Value));
92          }
93        } else {
94          target.Variables.Add(variable);
95        }
96      }
97
98      //this is not useful on the parentscope of the initial scope
99      //foreach (IScope subScope in source.SubScopes) {
100      //  target.SubScopes.Add(subScope);
101      //}
102
103      if (target.Parent != null && source.Parent != null)
104        MergeScopes(target.Parent, source.Parent);
105    }
106
107
108    private IEnumerable<IOperation> TestExecuteOnHive(IEnumerable<OperationJob> operationJobs) {
109      //var clonedJobs = operationJobs.Select(x => (OperationJob)x.Clone()).ToArray();
110      var results = new List<IOperation>();
111
112      foreach (var job in operationJobs) {
113        job.Start();
114        while (job.ExecutionState != Core.ExecutionState.Stopped) {
115          Thread.Sleep(100);
116        }
117        results.Add(job.Operation);
118      }
119      return results;
120    }
121
122    private IEnumerable<IOperation> ExecuteOnHive(IEnumerable<OperationJob> operationJobs) {
123      IEnumerable<Guid> jobIds;
124      JobResultList results;
125
126
127      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
128        List<JobDto> jobs = new List<JobDto>();
129        foreach (OperationJob operationJob in operationJobs) {
130          var groups = new string[] { "HEAL" };
131          SerializedJob serializedJob = new SerializedJob();
132          serializedJob.SerializedJobData = SerializedJob.Serialize(operationJob);
133          serializedJob.JobInfo = new JobDto();
134          serializedJob.JobInfo.State = JobState.Offline;
135          serializedJob.JobInfo.CoresNeeded = 1;
136          serializedJob.JobInfo.PluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList();
137          ResponseObject<JobDto> response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
138          jobs.Add(response.Obj);
139        }
140        jobIds = jobs.Select(x => x.Id);
141        results = service.Obj.GetJobResults(jobIds).Obj;
142      }
143
144      while (!results.All(
145          x => x.State == JobState.Finished ||
146          x.State == JobState.Failed ||
147          x.State == JobState.Aborted)) {
148        Thread.Sleep(5000);
149        using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
150          results = service.Obj.GetJobResults(jobIds).Obj;
151        }
152      }
153
154      // all finished
155      List<OperationJob> finishedJobs = new List<OperationJob>();
156      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
157        foreach (Guid jobId in jobIds) {
158          SerializedJob serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
159          OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData);
160          finishedJobs.Add(operationJob);
161        }
162      }
163
164      // delete jobs
165      // TODO
166
167      return finishedJobs.Select(x => x.Operation);
168    }
169
170    public override void Pause() {
171      base.Pause();
172      if (currentOperator != null) currentOperator.Abort();
173    }
174    public override void Stop() {
175      base.Stop();
176      if (currentOperator != null) currentOperator.Abort();
177    }
178  }
179}
Note: See TracBrowser for help on using the repository browser.