1 | using System;
|
---|
2 | using System.Collections.Generic;
|
---|
3 | using System.Linq;
|
---|
4 | using System.Text;
|
---|
5 | using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
|
---|
6 | using HeuristicLab.Core;
|
---|
7 | using HeuristicLab.Common;
|
---|
8 | using HeuristicLab.Hive.Contracts.Interfaces;
|
---|
9 | using HeuristicLab.Clients.Common;
|
---|
10 | using HeuristicLab.Hive.ExperimentManager;
|
---|
11 | using HeuristicLab.Hive.Contracts.BusinessObjects;
|
---|
12 | using HeuristicLab.PluginInfrastructure;
|
---|
13 | using HeuristicLab.Hive.Contracts.ResponseObjects;
|
---|
14 | using System.Threading;
|
---|
15 |
|
---|
16 | namespace HeuristicLab.HiveEngine {
|
---|
17 | /// <summary>
|
---|
18 | /// Represents an engine that executes operations which can be executed in parallel on the hive
|
---|
19 | /// </summary>
|
---|
20 | [StorableClass]
|
---|
21 | [Item("Hive Parallel Engine", "Engine for parallel execution on the hive.")]
|
---|
22 | public class HiveEngine : Engine {
|
---|
23 | private IOperator currentOperator;
|
---|
24 |
|
---|
25 | #region constructors and cloning
|
---|
26 | public HiveEngine() { }
|
---|
27 | [StorableConstructor]
|
---|
28 | protected HiveEngine(bool deserializing) : base(deserializing) { }
|
---|
29 | protected HiveEngine(HiveEngine original, Cloner cloner)
|
---|
30 | : base(original, cloner) {
|
---|
31 | }
|
---|
32 | public override IDeepCloneable Clone(Cloner cloner) {
|
---|
33 | return new HiveEngine(this, cloner);
|
---|
34 | }
|
---|
35 | #endregion
|
---|
36 |
|
---|
37 | protected override void ProcessNextOperation() {
|
---|
38 | currentOperator = null;
|
---|
39 | IOperation next = ExecutionStack.Pop();
|
---|
40 | OperationCollection coll = next as OperationCollection;
|
---|
41 |
|
---|
42 | while (coll != null) {
|
---|
43 | if (coll.Parallel) {
|
---|
44 | IScope scope = ((IAtomicOperation)coll.Where(x => x is IAtomicOperation).First()).Scope;
|
---|
45 | IEnumerable<IOperation> finishedOperations = ExecuteOnHive(coll.Select(op => new OperationJob(op)));
|
---|
46 | foreach (IOperation op in finishedOperations) {
|
---|
47 | ReIntegrateScope(scope, op);
|
---|
48 | }
|
---|
49 | } else {
|
---|
50 | for (int i = coll.Count - 1; i >= 0; i--)
|
---|
51 | ExecutionStack.Push(coll[i]);
|
---|
52 | }
|
---|
53 | next = ExecutionStack.Count > 0 ? ExecutionStack.Pop() : null;
|
---|
54 | coll = next as OperationCollection;
|
---|
55 | }
|
---|
56 |
|
---|
57 | IAtomicOperation operation = next as IAtomicOperation;
|
---|
58 | if (operation != null) {
|
---|
59 | try {
|
---|
60 | currentOperator = operation.Operator;
|
---|
61 | ExecutionStack.Push(operation.Operator.Execute((IExecutionContext)operation));
|
---|
62 | }
|
---|
63 | catch (Exception ex) {
|
---|
64 | ExecutionStack.Push(operation);
|
---|
65 | OnExceptionOccurred(new OperatorExecutionException(operation.Operator, ex));
|
---|
66 | Pause();
|
---|
67 | }
|
---|
68 | if (operation.Operator.Breakpoint) {
|
---|
69 | Log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
|
---|
70 | Pause();
|
---|
71 | }
|
---|
72 | }
|
---|
73 | }
|
---|
74 |
|
---|
75 | private static void ReIntegrateScope(IScope scope, IOperation operation) {
|
---|
76 | if (operation is IAtomicOperation) {
|
---|
77 | MergeScopes(scope, ((IAtomicOperation)operation).Scope);
|
---|
78 | } else if (operation is OperationCollection) {
|
---|
79 | foreach (IOperation op in (OperationCollection)operation) {
|
---|
80 | ReIntegrateScope(scope, op);
|
---|
81 | }
|
---|
82 | }
|
---|
83 | }
|
---|
84 |
|
---|
85 | private static void MergeScopes(IScope target, IScope source) {
|
---|
86 | foreach (IVariable variable in source.Variables) {
|
---|
87 | if (target.Variables.ContainsKey(variable.Name)) {
|
---|
88 | if (target.Variables[variable.Name] != source.Variables[variable.Name]) {
|
---|
89 | // problem: ResultCollection are not equal altough they contain the same values.
|
---|
90 | // - either implement ResultCollection.Equals, or consider ResultCollection as special case here
|
---|
91 | //throw new ScopeMergeException(string.Format("Variable {0} already exists in target scope and has a different value ({1}, {2}).", variable.Name, target.Variables[variable.Name].Value, variable.Value));
|
---|
92 | }
|
---|
93 | } else {
|
---|
94 | target.Variables.Add(variable);
|
---|
95 | }
|
---|
96 | }
|
---|
97 |
|
---|
98 | //this is not useful on the parentscope of the initial scope
|
---|
99 | //foreach (IScope subScope in source.SubScopes) {
|
---|
100 | // target.SubScopes.Add(subScope);
|
---|
101 | //}
|
---|
102 |
|
---|
103 | if (target.Parent != null && source.Parent != null)
|
---|
104 | MergeScopes(target.Parent, source.Parent);
|
---|
105 | }
|
---|
106 |
|
---|
107 |
|
---|
108 | private IEnumerable<IOperation> TestExecuteOnHive(IEnumerable<OperationJob> operationJobs) {
|
---|
109 | //var clonedJobs = operationJobs.Select(x => (OperationJob)x.Clone()).ToArray();
|
---|
110 | var results = new List<IOperation>();
|
---|
111 |
|
---|
112 | foreach (var job in operationJobs) {
|
---|
113 | job.Start();
|
---|
114 | while (job.ExecutionState != Core.ExecutionState.Stopped) {
|
---|
115 | Thread.Sleep(100);
|
---|
116 | }
|
---|
117 | results.Add(job.Operation);
|
---|
118 | }
|
---|
119 | return results;
|
---|
120 | }
|
---|
121 |
|
---|
122 | private IEnumerable<IOperation> ExecuteOnHive(IEnumerable<OperationJob> operationJobs) {
|
---|
123 | IEnumerable<Guid> jobIds;
|
---|
124 | JobResultList results;
|
---|
125 |
|
---|
126 |
|
---|
127 | using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
|
---|
128 | List<JobDto> jobs = new List<JobDto>();
|
---|
129 | foreach (OperationJob operationJob in operationJobs) {
|
---|
130 | var groups = new string[] { "HEAL" };
|
---|
131 | SerializedJob serializedJob = new SerializedJob();
|
---|
132 | serializedJob.SerializedJobData = SerializedJob.Serialize(operationJob);
|
---|
133 | serializedJob.JobInfo = new JobDto();
|
---|
134 | serializedJob.JobInfo.State = JobState.Offline;
|
---|
135 | serializedJob.JobInfo.CoresNeeded = 1;
|
---|
136 | serializedJob.JobInfo.PluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList();
|
---|
137 | ResponseObject<JobDto> response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
|
---|
138 | jobs.Add(response.Obj);
|
---|
139 | }
|
---|
140 | jobIds = jobs.Select(x => x.Id);
|
---|
141 | results = service.Obj.GetJobResults(jobIds).Obj;
|
---|
142 | }
|
---|
143 |
|
---|
144 | while (!results.All(
|
---|
145 | x => x.State == JobState.Finished ||
|
---|
146 | x.State == JobState.Failed ||
|
---|
147 | x.State == JobState.Aborted)) {
|
---|
148 | Thread.Sleep(5000);
|
---|
149 | using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
|
---|
150 | results = service.Obj.GetJobResults(jobIds).Obj;
|
---|
151 | }
|
---|
152 | }
|
---|
153 |
|
---|
154 | // all finished
|
---|
155 | List<OperationJob> finishedJobs = new List<OperationJob>();
|
---|
156 | using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
|
---|
157 | foreach (Guid jobId in jobIds) {
|
---|
158 | SerializedJob serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
|
---|
159 | OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData);
|
---|
160 | finishedJobs.Add(operationJob);
|
---|
161 | }
|
---|
162 | }
|
---|
163 |
|
---|
164 | // delete jobs
|
---|
165 | // TODO
|
---|
166 |
|
---|
167 | return finishedJobs.Select(x => x.Operation);
|
---|
168 | }
|
---|
169 |
|
---|
170 | public override void Pause() {
|
---|
171 | base.Pause();
|
---|
172 | if (currentOperator != null) currentOperator.Abort();
|
---|
173 | }
|
---|
174 | public override void Stop() {
|
---|
175 | base.Stop();
|
---|
176 | if (currentOperator != null) currentOperator.Abort();
|
---|
177 | }
|
---|
178 | }
|
---|
179 | }
|
---|