Free cookie consent management tool by TermsFeed Policy Generator

source: branches/Persistence Test/HeuristicLab.DistributedEngine/3.2/DistributedEngine.cs @ 4593

Last change on this file since 4593 was 2058, checked in by gkronber, 16 years ago

Refactored JobManager and added a plugin that contains a bridge between grid and hive. The bridge allows to use the execution engine service of Hive as a grid server. This way CEDMA job execution and DistributedEngine job execution can either use Hive or Grid as backend. #642 (Hive backend for CEDMA)

File size: 15.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Xml;
26using System.Threading;
27using HeuristicLab.Core;
28using HeuristicLab.Grid;
29using System.ServiceModel;
30using System.IO;
31using System.IO.Compression;
32using HeuristicLab.PluginInfrastructure;
33using System.Windows.Forms;
34using System.Diagnostics;
35
36namespace HeuristicLab.DistributedEngine {
37  public class DistributedEngine : EngineBase, IEditable {
38    private List<KeyValuePair<ProcessingEngine, AtomicOperation>> suspendedEngines = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
39    private JobManager jobManager;
40    private string serverAddress;
41    public string ServerAddress {
42      get { return serverAddress; }
43      set {
44        if (value != serverAddress) {
45          serverAddress = value;
46        }
47      }
48    }
49    public override object Clone(IDictionary<Guid, object> clonedObjects) {
50      DistributedEngine clone = (DistributedEngine)base.Clone(clonedObjects);
51      clone.ServerAddress = serverAddress;
52      return clone;
53    }
54
55    public override IView CreateView() {
56      return new DistributedEngineEditor(this);
57    }
58    public virtual IEditor CreateEditor() {
59      return new DistributedEngineEditor(this);
60    }
61
62    public override bool Terminated {
63      get {
64        return base.Terminated && suspendedEngines.Count == 0;
65      }
66    }
67
68    public override void Reset() {
69      suspendedEngines.Clear();
70      base.Reset();
71    }
72
73    public override void Execute() {
74      if (jobManager == null) this.jobManager = new JobManager(new GridServerProxy(serverAddress));
75      jobManager.Reset();
76      base.Execute();
77    }
78
79    public override void ExecuteSteps(int steps) {
80      throw new InvalidOperationException("DistributedEngine doesn't support stepwise execution");
81    }
82
83    protected override void ProcessNextOperation() {
84      if (suspendedEngines.Count > 0) {
85        ProcessSuspendedEngines();
86      } else {
87        IOperation operation = myExecutionStack.Pop();
88        ProcessOperation(operation);
89      }
90    }
91
92    private void ProcessSuspendedEngines() {
93      AsyncGridResult[] asyncResults = new AsyncGridResult[suspendedEngines.Count];
94      int i = 0;
95      foreach (KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
96        asyncResults[i++] = jobManager.BeginExecuteEngine(suspendedPair.Key);
97      }
98      WaitForAll(asyncResults);
99      // collect results
100      List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
101      try {
102        int resultIndex = 0;
103        foreach (KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
104          KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
105              (ProcessingEngine)jobManager.EndExecuteEngine(asyncResults[resultIndex++]),
106              suspendedPair.Value);
107          results.Add(p);
108        }
109      }
110      catch (Exception e) {
111        // this exception means there was a problem with the underlying communication infrastructure
112        // -> show message dialog and abort engine
113        Abort();
114        ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
115        return;
116      }
117      // got all result engines without an exception -> merge results
118      ProcessResults(results);
119    }
120
121    private void ProcessOperation(IOperation operation) {
122      if (operation is AtomicOperation) {
123        AtomicOperation atomicOperation = (AtomicOperation)operation;
124        IOperation next = null;
125        try {
126          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
127        }
128        catch (Exception ex) {
129          // push operation on stack again
130          myExecutionStack.Push(atomicOperation);
131          Abort();
132          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
133        }
134        if (next != null)
135          myExecutionStack.Push(next);
136        OnOperationExecuted(atomicOperation);
137        if (atomicOperation.Operator.Breakpoint) Abort();
138      } else if (operation is CompositeOperation) {
139        CompositeOperation compositeOperation = (CompositeOperation)operation;
140        if (compositeOperation.ExecuteInParallel) {
141          ProcessParallelOperation(compositeOperation);
142          OnOperationExecuted(compositeOperation);
143        } else {
144          for (int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
145            myExecutionStack.Push(compositeOperation.Operations[i]);
146        }
147      }
148    }
149
150    private void ProcessParallelOperation(CompositeOperation compositeOperation) {
151      // send operations to grid
152      AsyncGridResult[] asyncResults = BeginExecuteOperations(compositeOperation);
153      WaitForAll(asyncResults);
154      // collect results
155      List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
156      try {
157        int i = 0;
158        foreach (AtomicOperation parOperation in compositeOperation.Operations) {
159          results.Add(new KeyValuePair<ProcessingEngine, AtomicOperation>(
160              (ProcessingEngine)jobManager.EndExecuteEngine(asyncResults[i++]), parOperation));
161        }
162      }
163      catch (Exception e) {
164        // this exception means there was a problem with the underlying communication infrastructure
165        // -> show message dialog, abort engine, requeue the whole composite operation again and return
166        myExecutionStack.Push(compositeOperation);
167        Abort();
168        ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
169        return;
170      }
171      // got all result engines without an exception -> merge results
172      ProcessResults(results);
173    }
174
175    private AsyncGridResult[] BeginExecuteOperations(CompositeOperation compositeOperation) {
176      AsyncGridResult[] asyncResults = new AsyncGridResult[compositeOperation.Operations.Count];
177      int i = 0;
178      // HACK: assume that all atomicOperations have the same parent scope.
179      // 1) find that parent scope
180      // 2) remove all branches starting from the global scope that don't lead to the parentScope of the parallel operation
181      // 3) keep the branches to 'repair' the scope-tree later
182      // 4) for each parallel job attach only the sub-scope that this operation uses
183      // 5) after starting all parallel jobs restore the whole scope-tree
184      IScope parentScope = FindParentScope(GlobalScope, ((AtomicOperation)compositeOperation.Operations[0]).Scope);
185      List<IList<IScope>> prunedScopes = new List<IList<IScope>>();
186      PruneToParentScope(GlobalScope, parentScope, prunedScopes);
187      List<IScope> subScopes = new List<IScope>(parentScope.SubScopes);
188      foreach (IScope scope in subScopes) {
189        parentScope.RemoveSubScope(scope);
190      }
191      // start all parallel jobs
192      foreach (AtomicOperation parOperation in compositeOperation.Operations) {
193        parentScope.AddSubScope(parOperation.Scope);
194        asyncResults[i++] = jobManager.BeginExecuteEngine(new ProcessingEngine(GlobalScope, parOperation));
195        parentScope.RemoveSubScope(parOperation.Scope);
196      }
197      foreach (IScope scope in subScopes) {
198        parentScope.AddSubScope(scope);
199      }
200      prunedScopes.Reverse();
201      RestoreFullTree(GlobalScope, prunedScopes);
202
203      return asyncResults;
204    }
205
206    private void WaitForAll(AsyncGridResult[] asyncResults) {
207      // wait until all jobs are finished
208      // WaitAll works only with maximally 64 waithandles
209      if (asyncResults.Length <= 64) {
210        WaitHandle[] waitHandles = new WaitHandle[asyncResults.Length];
211        for (int i = 0; i < asyncResults.Length; i++) {
212          waitHandles[i] = asyncResults[i].WaitHandle;
213        }
214        WaitHandle.WaitAll(waitHandles);
215      } else {
216        int i;
217        for (i = 0; i < asyncResults.Length; i++) {
218          asyncResults[i].WaitHandle.WaitOne();
219        }
220      }
221    }
222
223    private void ProcessResults(List<KeyValuePair<ProcessingEngine, AtomicOperation>> results) {
224      // create a new compositeOperation to hold canceled operations that should be restarted
225      CompositeOperation canceledOperations = new CompositeOperation();
226      canceledOperations.ExecuteInParallel = true;
227
228      suspendedEngines.Clear();
229      // retrieve results and merge into scope-tree
230      foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in results) {
231        ProcessingEngine resultEngine = p.Key;
232        AtomicOperation parOperation = p.Value;
233        if (resultEngine.Canceled && !resultEngine.Suspended) {
234          // when an engine was canceled but not suspended this means there was a problem
235          // show error message and queue the operation for restart (again parallel)
236          // but don't merge the results of the aborted engine
237          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(new JobExecutionException(resultEngine.ErrorMessage)); });
238          canceledOperations.AddOperation(parOperation);
239        } else if (resultEngine.Suspended) {
240          // when the engine was suspended it means it was stopped because of a breakpoint
241          // -> merge the partial results and queue the engine (which has operations remaining in the execution stack) to be resumed (parallel)
242          MergeScope(parOperation.Scope, resultEngine.InitialOperation.Scope);
243          resultEngine.InitialOperation = parOperation;
244          suspendedEngines.Add(new KeyValuePair<ProcessingEngine, AtomicOperation>(resultEngine, parOperation));
245        } else {
246          // engine is finished ->
247          // simply merge the results into our local scope-tree
248          MergeScope(parOperation.Scope, resultEngine.InitialOperation.Scope);
249        }
250      }
251      // if there were exceptions -> abort
252      if (canceledOperations.Operations.Count > 0) {
253        // requeue the aborted operations
254        myExecutionStack.Push(canceledOperations);
255        Abort();
256      }
257      // if there were breakpoints -> abort
258      if (suspendedEngines.Count > 0) {
259        Abort();
260      }
261    }
262
263    private void RestoreFullTree(IScope currentScope, IList<IList<IScope>> savedScopes) {
264      if (savedScopes.Count == 0) return;
265      IScope remainingBranch = currentScope.SubScopes[0];
266      currentScope.RemoveSubScope(remainingBranch);
267      IList<IScope> savedScopesForCurrent = savedScopes[0];
268      foreach (IScope savedScope in savedScopesForCurrent) {
269        currentScope.AddSubScope(savedScope);
270      }
271      savedScopes.RemoveAt(0);
272      RestoreFullTree(remainingBranch, savedScopes);
273    }
274
275    private IScope PruneToParentScope(IScope currentScope, IScope scope, IList<IList<IScope>> prunedScopes) {
276      if (currentScope == scope) return currentScope;
277      if (currentScope.SubScopes.Count == 0) return null;
278      IScope foundScope = null;
279      // try to find the searched scope in all my sub-scopes
280      foreach (IScope subScope in currentScope.SubScopes) {
281        foundScope = PruneToParentScope(subScope, scope, prunedScopes);
282        if (foundScope != null) break; // we can stop as soon as we find the scope in a branch
283      }
284      if (foundScope != null) { // when we found the scopes in my sub-scopes
285        List<IScope> subScopes = new List<IScope>(currentScope.SubScopes); // store the list of sub-scopes
286        prunedScopes.Add(subScopes);
287        // remove all my sub-scopes
288        foreach (IScope subScope in subScopes) {
289          currentScope.RemoveSubScope(subScope);
290        }
291        // add only the branch that leads to the scope that I search for
292        currentScope.AddSubScope(foundScope);
293        return currentScope; // return that this scope contains the branch that leads to the searched scopes
294      } else {
295        return null; // otherwise we didn't find the searched scope and we can return null
296      }
297    }
298
299    private IScope FindParentScope(IScope currentScope, IScope childScope) {
300      if (currentScope.SubScopes.Contains(childScope)) return currentScope;
301      foreach (IScope subScope in currentScope.SubScopes) {
302        IScope result = FindParentScope(subScope, childScope);
303        if (result != null) return result;
304      }
305      return null;
306    }
307
308    private void MergeScope(IScope original, IScope result) {
309      // merge the results
310      original.Clear();
311      foreach (IVariable variable in result.Variables) {
312        original.AddVariable(variable);
313      }
314      foreach (IScope subScope in result.SubScopes) {
315        original.AddSubScope(subScope);
316      }
317      foreach (KeyValuePair<string, string> alias in result.Aliases) {
318        original.AddAlias(alias.Key, alias.Value);
319      }
320    }
321
322    #region Persistence Methods
323    public override XmlNode GetXmlNode(string name, XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
324      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
325      XmlAttribute addressAttribute = document.CreateAttribute("ServerAddress");
326      addressAttribute.Value = ServerAddress;
327      node.Attributes.Append(addressAttribute);
328      if (suspendedEngines.Count > 0) {
329        XmlNode suspendedEnginesNode = document.CreateElement("SuspendedEngines");
330        foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in suspendedEngines) {
331          XmlNode n = document.CreateElement("Entry");
332          n.AppendChild(PersistenceManager.Persist(p.Key, document, persistedObjects));
333          n.AppendChild(PersistenceManager.Persist(p.Value, document, persistedObjects));
334          suspendedEnginesNode.AppendChild(n);
335        }
336        node.AppendChild(suspendedEnginesNode);
337      }
338      return node;
339    }
340    public override void Populate(XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
341      base.Populate(node, restoredObjects);
342      ServerAddress = node.Attributes["ServerAddress"].Value;
343      XmlNode suspendedEnginesNode = node.SelectSingleNode("SuspendedEngines");
344      if (suspendedEnginesNode != null) {
345        foreach (XmlNode n in suspendedEnginesNode.ChildNodes) {
346          KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
347            (ProcessingEngine)PersistenceManager.Restore(n.ChildNodes[0], restoredObjects),
348            (AtomicOperation)PersistenceManager.Restore(n.ChildNodes[1], restoredObjects));
349          suspendedEngines.Add(p);
350        }
351      }
352    }
353    #endregion
354  }
355}
Note: See TracBrowser for help on using the repository browser.