Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.DistributedEngine/3.2/DistributedEngine.cs @ 2211

Last change on this file since 2211 was 2058, checked in by gkronber, 16 years ago

Refactored JobManager and added a plugin that contains a bridge between grid and hive. The bridge allows to use the execution engine service of Hive as a grid server. This way CEDMA job execution and DistributedEngine job execution can either use Hive or Grid as backend. #642 (Hive backend for CEDMA)

File size: 15.4 KB
RevLine 
[2]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Xml;
26using System.Threading;
27using HeuristicLab.Core;
28using HeuristicLab.Grid;
29using System.ServiceModel;
30using System.IO;
31using System.IO.Compression;
[219]32using HeuristicLab.PluginInfrastructure;
33using System.Windows.Forms;
[268]34using System.Diagnostics;
[2]35
36namespace HeuristicLab.DistributedEngine {
37  public class DistributedEngine : EngineBase, IEditable {
[414]38    private List<KeyValuePair<ProcessingEngine, AtomicOperation>> suspendedEngines = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
[219]39    private JobManager jobManager;
[2]40    private string serverAddress;
41    public string ServerAddress {
42      get { return serverAddress; }
43      set {
[2055]44        if (value != serverAddress) {
[2]45          serverAddress = value;
46        }
47      }
48    }
49    public override object Clone(IDictionary<Guid, object> clonedObjects) {
50      DistributedEngine clone = (DistributedEngine)base.Clone(clonedObjects);
51      clone.ServerAddress = serverAddress;
52      return clone;
53    }
54
55    public override IView CreateView() {
56      return new DistributedEngineEditor(this);
57    }
58    public virtual IEditor CreateEditor() {
59      return new DistributedEngineEditor(this);
60    }
61
[414]62    public override bool Terminated {
63      get {
64        return base.Terminated && suspendedEngines.Count == 0;
65      }
66    }
67
68    public override void Reset() {
69      suspendedEngines.Clear();
70      base.Reset();
71    }
72
[2]73    public override void Execute() {
[2058]74      if (jobManager == null) this.jobManager = new JobManager(new GridServerProxy(serverAddress));
[219]75      jobManager.Reset();
[2]76      base.Execute();
77    }
78
79    public override void ExecuteSteps(int steps) {
80      throw new InvalidOperationException("DistributedEngine doesn't support stepwise execution");
81    }
82
83    protected override void ProcessNextOperation() {
[2055]84      if (suspendedEngines.Count > 0) {
[414]85        ProcessSuspendedEngines();
86      } else {
87        IOperation operation = myExecutionStack.Pop();
88        ProcessOperation(operation);
89      }
90    }
91
92    private void ProcessSuspendedEngines() {
[2055]93      AsyncGridResult[] asyncResults = new AsyncGridResult[suspendedEngines.Count];
[414]94      int i = 0;
[2055]95      foreach (KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
96        asyncResults[i++] = jobManager.BeginExecuteEngine(suspendedPair.Key);
[414]97      }
[2055]98      WaitForAll(asyncResults);
[414]99      // collect results
100      List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
101      try {
[2055]102        int resultIndex = 0;
103        foreach (KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
[414]104          KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
[2055]105              (ProcessingEngine)jobManager.EndExecuteEngine(asyncResults[resultIndex++]),
[414]106              suspendedPair.Value);
107          results.Add(p);
108        }
[2055]109      }
110      catch (Exception e) {
[414]111        // this exception means there was a problem with the underlying communication infrastructure
112        // -> show message dialog and abort engine
113        Abort();
114        ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
115        return;
116      }
117      // got all result engines without an exception -> merge results
118      ProcessResults(results);
119    }
120
121    private void ProcessOperation(IOperation operation) {
[2055]122      if (operation is AtomicOperation) {
[219]123        AtomicOperation atomicOperation = (AtomicOperation)operation;
124        IOperation next = null;
125        try {
126          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
[2055]127        }
128        catch (Exception ex) {
[219]129          // push operation on stack again
130          myExecutionStack.Push(atomicOperation);
131          Abort();
132          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
[35]133        }
[2055]134        if (next != null)
[219]135          myExecutionStack.Push(next);
136        OnOperationExecuted(atomicOperation);
[2055]137        if (atomicOperation.Operator.Breakpoint) Abort();
138      } else if (operation is CompositeOperation) {
[219]139        CompositeOperation compositeOperation = (CompositeOperation)operation;
[2055]140        if (compositeOperation.ExecuteInParallel) {
[414]141          ProcessParallelOperation(compositeOperation);
[299]142          OnOperationExecuted(compositeOperation);
[219]143        } else {
[2055]144          for (int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
[219]145            myExecutionStack.Push(compositeOperation.Operations[i]);
[2]146        }
147      }
148    }
149
[414]150    private void ProcessParallelOperation(CompositeOperation compositeOperation) {
151      // send operations to grid
[2055]152      AsyncGridResult[] asyncResults = BeginExecuteOperations(compositeOperation);
153      WaitForAll(asyncResults);
[414]154      // collect results
155      List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
156      try {
[2055]157        int i = 0;
158        foreach (AtomicOperation parOperation in compositeOperation.Operations) {
159          results.Add(new KeyValuePair<ProcessingEngine, AtomicOperation>(
160              (ProcessingEngine)jobManager.EndExecuteEngine(asyncResults[i++]), parOperation));
[414]161        }
[2055]162      }
163      catch (Exception e) {
[414]164        // this exception means there was a problem with the underlying communication infrastructure
165        // -> show message dialog, abort engine, requeue the whole composite operation again and return
166        myExecutionStack.Push(compositeOperation);
167        Abort();
168        ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
169        return;
170      }
171      // got all result engines without an exception -> merge results
172      ProcessResults(results);
173    }
174
[2055]175    private AsyncGridResult[] BeginExecuteOperations(CompositeOperation compositeOperation) {
176      AsyncGridResult[] asyncResults = new AsyncGridResult[compositeOperation.Operations.Count];
[414]177      int i = 0;
178      // HACK: assume that all atomicOperations have the same parent scope.
179      // 1) find that parent scope
180      // 2) remove all branches starting from the global scope that don't lead to the parentScope of the parallel operation
181      // 3) keep the branches to 'repair' the scope-tree later
182      // 4) for each parallel job attach only the sub-scope that this operation uses
183      // 5) after starting all parallel jobs restore the whole scope-tree
184      IScope parentScope = FindParentScope(GlobalScope, ((AtomicOperation)compositeOperation.Operations[0]).Scope);
185      List<IList<IScope>> prunedScopes = new List<IList<IScope>>();
186      PruneToParentScope(GlobalScope, parentScope, prunedScopes);
187      List<IScope> subScopes = new List<IScope>(parentScope.SubScopes);
[2055]188      foreach (IScope scope in subScopes) {
[414]189        parentScope.RemoveSubScope(scope);
190      }
191      // start all parallel jobs
[2055]192      foreach (AtomicOperation parOperation in compositeOperation.Operations) {
[414]193        parentScope.AddSubScope(parOperation.Scope);
[2055]194        asyncResults[i++] = jobManager.BeginExecuteEngine(new ProcessingEngine(GlobalScope, parOperation));
[414]195        parentScope.RemoveSubScope(parOperation.Scope);
196      }
[2055]197      foreach (IScope scope in subScopes) {
[414]198        parentScope.AddSubScope(scope);
199      }
200      prunedScopes.Reverse();
201      RestoreFullTree(GlobalScope, prunedScopes);
202
[2055]203      return asyncResults;
[414]204    }
205
[2055]206    private void WaitForAll(AsyncGridResult[] asyncResults) {
[414]207      // wait until all jobs are finished
208      // WaitAll works only with maximally 64 waithandles
[2055]209      if (asyncResults.Length <= 64) {
210        WaitHandle[] waitHandles = new WaitHandle[asyncResults.Length];
211        for (int i = 0; i < asyncResults.Length; i++) {
212          waitHandles[i] = asyncResults[i].WaitHandle;
213        }
214        WaitHandle.WaitAll(waitHandles);
[414]215      } else {
216        int i;
[2055]217        for (i = 0; i < asyncResults.Length; i++) {
218          asyncResults[i].WaitHandle.WaitOne();
[414]219        }
220      }
221    }
222
223    private void ProcessResults(List<KeyValuePair<ProcessingEngine, AtomicOperation>> results) {
224      // create a new compositeOperation to hold canceled operations that should be restarted
225      CompositeOperation canceledOperations = new CompositeOperation();
226      canceledOperations.ExecuteInParallel = true;
227
228      suspendedEngines.Clear();
229      // retrieve results and merge into scope-tree
[2055]230      foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in results) {
[414]231        ProcessingEngine resultEngine = p.Key;
232        AtomicOperation parOperation = p.Value;
[2055]233        if (resultEngine.Canceled && !resultEngine.Suspended) {
[414]234          // when an engine was canceled but not suspended this means there was a problem
235          // show error message and queue the operation for restart (again parallel)
236          // but don't merge the results of the aborted engine
237          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(new JobExecutionException(resultEngine.ErrorMessage)); });
238          canceledOperations.AddOperation(parOperation);
[2055]239        } else if (resultEngine.Suspended) {
[414]240          // when the engine was suspended it means it was stopped because of a breakpoint
241          // -> merge the partial results and queue the engine (which has operations remaining in the execution stack) to be resumed (parallel)
242          MergeScope(parOperation.Scope, resultEngine.InitialOperation.Scope);
243          resultEngine.InitialOperation = parOperation;
244          suspendedEngines.Add(new KeyValuePair<ProcessingEngine, AtomicOperation>(resultEngine, parOperation));
245        } else {
246          // engine is finished ->
247          // simply merge the results into our local scope-tree
248          MergeScope(parOperation.Scope, resultEngine.InitialOperation.Scope);
249        }
250      }
251      // if there were exceptions -> abort
[2055]252      if (canceledOperations.Operations.Count > 0) {
[414]253        // requeue the aborted operations
254        myExecutionStack.Push(canceledOperations);
255        Abort();
256      }
257      // if there were breakpoints -> abort
[2055]258      if (suspendedEngines.Count > 0) {
[414]259        Abort();
260      }
261    }
262
[268]263    private void RestoreFullTree(IScope currentScope, IList<IList<IScope>> savedScopes) {
[2055]264      if (savedScopes.Count == 0) return;
[268]265      IScope remainingBranch = currentScope.SubScopes[0];
266      currentScope.RemoveSubScope(remainingBranch);
267      IList<IScope> savedScopesForCurrent = savedScopes[0];
[2055]268      foreach (IScope savedScope in savedScopesForCurrent) {
[268]269        currentScope.AddSubScope(savedScope);
270      }
271      savedScopes.RemoveAt(0);
272      RestoreFullTree(remainingBranch, savedScopes);
273    }
274
275    private IScope PruneToParentScope(IScope currentScope, IScope scope, IList<IList<IScope>> prunedScopes) {
[2055]276      if (currentScope == scope) return currentScope;
277      if (currentScope.SubScopes.Count == 0) return null;
[268]278      IScope foundScope = null;
279      // try to find the searched scope in all my sub-scopes
[2055]280      foreach (IScope subScope in currentScope.SubScopes) {
[268]281        foundScope = PruneToParentScope(subScope, scope, prunedScopes);
[2055]282        if (foundScope != null) break; // we can stop as soon as we find the scope in a branch
[268]283      }
[2055]284      if (foundScope != null) { // when we found the scopes in my sub-scopes
[268]285        List<IScope> subScopes = new List<IScope>(currentScope.SubScopes); // store the list of sub-scopes
286        prunedScopes.Add(subScopes);
287        // remove all my sub-scopes
[2055]288        foreach (IScope subScope in subScopes) {
[268]289          currentScope.RemoveSubScope(subScope);
290        }
291        // add only the branch that leads to the scope that I search for
292        currentScope.AddSubScope(foundScope);
293        return currentScope; // return that this scope contains the branch that leads to the searched scopes
294      } else {
295        return null; // otherwise we didn't find the searched scope and we can return null
296      }
297    }
298
[281]299    private IScope FindParentScope(IScope currentScope, IScope childScope) {
[2055]300      if (currentScope.SubScopes.Contains(childScope)) return currentScope;
301      foreach (IScope subScope in currentScope.SubScopes) {
[281]302        IScope result = FindParentScope(subScope, childScope);
[2055]303        if (result != null) return result;
[268]304      }
305      return null;
306    }
307
[248]308    private void MergeScope(IScope original, IScope result) {
309      // merge the results
310      original.Clear();
[2055]311      foreach (IVariable variable in result.Variables) {
[248]312        original.AddVariable(variable);
313      }
[2055]314      foreach (IScope subScope in result.SubScopes) {
[248]315        original.AddSubScope(subScope);
316      }
[2055]317      foreach (KeyValuePair<string, string> alias in result.Aliases) {
[248]318        original.AddAlias(alias.Key, alias.Value);
319      }
320    }
321
[2]322    #region Persistence Methods
323    public override XmlNode GetXmlNode(string name, XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
324      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
325      XmlAttribute addressAttribute = document.CreateAttribute("ServerAddress");
326      addressAttribute.Value = ServerAddress;
[438]327      node.Attributes.Append(addressAttribute);
[2055]328      if (suspendedEngines.Count > 0) {
[414]329        XmlNode suspendedEnginesNode = document.CreateElement("SuspendedEngines");
[2055]330        foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in suspendedEngines) {
[414]331          XmlNode n = document.CreateElement("Entry");
332          n.AppendChild(PersistenceManager.Persist(p.Key, document, persistedObjects));
333          n.AppendChild(PersistenceManager.Persist(p.Value, document, persistedObjects));
334          suspendedEnginesNode.AppendChild(n);
335        }
336        node.AppendChild(suspendedEnginesNode);
337      }
[2]338      return node;
339    }
340    public override void Populate(XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
341      base.Populate(node, restoredObjects);
342      ServerAddress = node.Attributes["ServerAddress"].Value;
[414]343      XmlNode suspendedEnginesNode = node.SelectSingleNode("SuspendedEngines");
[2055]344      if (suspendedEnginesNode != null) {
345        foreach (XmlNode n in suspendedEnginesNode.ChildNodes) {
[414]346          KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
347            (ProcessingEngine)PersistenceManager.Restore(n.ChildNodes[0], restoredObjects),
348            (AtomicOperation)PersistenceManager.Restore(n.ChildNodes[1], restoredObjects));
349          suspendedEngines.Add(p);
350        }
351      }
[2]352    }
353    #endregion
354  }
[268]355}
Note: See TracBrowser for help on using the repository browser.