Free cookie consent management tool by TermsFeed Policy Generator

source: branches/Operator Architecture Refactoring/HeuristicLab.DistributedEngine/DistributedEngine.cs @ 630

Last change on this file since 630 was 438, checked in by gkronber, 16 years ago

fixed #229 (Loading a persisted distributed-engine throws Exception)

File size: 15.0 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Xml;
26using System.Threading;
27using HeuristicLab.Core;
28using HeuristicLab.Grid;
29using System.ServiceModel;
30using System.IO;
31using System.IO.Compression;
32using HeuristicLab.PluginInfrastructure;
33using System.Windows.Forms;
34using System.Diagnostics;
35
36namespace HeuristicLab.DistributedEngine {
37  public class DistributedEngine : EngineBase, IEditable {
38    private List<KeyValuePair<ProcessingEngine, AtomicOperation>> suspendedEngines = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
39    private JobManager jobManager;
40    private string serverAddress;
41    public string ServerAddress {
42      get { return serverAddress; }
43      set {
44        if(value != serverAddress) {
45          serverAddress = value;
46        }
47      }
48    }
49    public override object Clone(IDictionary<Guid, object> clonedObjects) {
50      DistributedEngine clone = (DistributedEngine)base.Clone(clonedObjects);
51      clone.ServerAddress = serverAddress;
52      return clone;
53    }
54
55    public override IView CreateView() {
56      return new DistributedEngineEditor(this);
57    }
58    public virtual IEditor CreateEditor() {
59      return new DistributedEngineEditor(this);
60    }
61
62    public override bool Terminated {
63      get {
64        return base.Terminated && suspendedEngines.Count == 0;
65      }
66    }
67
68    public override void Reset() {
69      suspendedEngines.Clear();
70      base.Reset();
71    }
72
73    public override void Execute() {
74      if(jobManager == null) this.jobManager = new JobManager(serverAddress);
75      jobManager.Reset();
76      base.Execute();
77    }
78
79    public override void ExecuteSteps(int steps) {
80      throw new InvalidOperationException("DistributedEngine doesn't support stepwise execution");
81    }
82
83    protected override void ProcessNextOperation() {
84      if(suspendedEngines.Count > 0) {
85        ProcessSuspendedEngines();
86      } else {
87        IOperation operation = myExecutionStack.Pop();
88        ProcessOperation(operation);
89      }
90    }
91
92    private void ProcessSuspendedEngines() {
93      WaitHandle[] waitHandles = new WaitHandle[suspendedEngines.Count];
94      int i = 0;
95      foreach(KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
96        waitHandles[i++] = jobManager.BeginExecuteEngine(suspendedPair.Key);
97      }
98      WaitForAll(waitHandles);
99      // collect results
100      List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
101      try {
102        foreach(KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
103          KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
104              jobManager.EndExecuteOperation(suspendedPair.Value),
105              suspendedPair.Value);
106          results.Add(p);
107        }
108      } catch(Exception e) {
109        // this exception means there was a problem with the underlying communication infrastructure
110        // -> show message dialog and abort engine
111        Abort();
112        ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
113        return;
114      }
115      // got all result engines without an exception -> merge results
116      ProcessResults(results);
117    }
118
119    private void ProcessOperation(IOperation operation) {
120      if(operation is AtomicOperation) {
121        AtomicOperation atomicOperation = (AtomicOperation)operation;
122        IOperation next = null;
123        try {
124          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
125        } catch(Exception ex) {
126          // push operation on stack again
127          myExecutionStack.Push(atomicOperation);
128          Abort();
129          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
130        }
131        if(next != null)
132          myExecutionStack.Push(next);
133        OnOperationExecuted(atomicOperation);
134        if(atomicOperation.Operator.Breakpoint) Abort();
135      } else if(operation is CompositeOperation) {
136        CompositeOperation compositeOperation = (CompositeOperation)operation;
137        if(compositeOperation.ExecuteInParallel) {
138          ProcessParallelOperation(compositeOperation);
139          OnOperationExecuted(compositeOperation);
140        } else {
141          for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
142            myExecutionStack.Push(compositeOperation.Operations[i]);
143        }
144      }
145    }
146
147    private void ProcessParallelOperation(CompositeOperation compositeOperation) {
148      // send operations to grid
149      WaitHandle[] waithandles = BeginExecuteOperations(compositeOperation);
150      WaitForAll(waithandles);
151      // collect results
152      List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
153      try {
154        foreach(AtomicOperation parOperation in compositeOperation.Operations) {
155          KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
156            jobManager.EndExecuteOperation(parOperation), parOperation);
157          results.Add(p);
158        }
159      } catch(Exception e) {
160        // this exception means there was a problem with the underlying communication infrastructure
161        // -> show message dialog, abort engine, requeue the whole composite operation again and return
162        myExecutionStack.Push(compositeOperation);
163        Abort();
164        ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
165        return;
166      }
167      // got all result engines without an exception -> merge results
168      ProcessResults(results);
169    }
170
171    private WaitHandle[] BeginExecuteOperations(CompositeOperation compositeOperation) {
172      WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
173      int i = 0;
174      // HACK: assume that all atomicOperations have the same parent scope.
175      // 1) find that parent scope
176      // 2) remove all branches starting from the global scope that don't lead to the parentScope of the parallel operation
177      // 3) keep the branches to 'repair' the scope-tree later
178      // 4) for each parallel job attach only the sub-scope that this operation uses
179      // 5) after starting all parallel jobs restore the whole scope-tree
180      IScope parentScope = FindParentScope(GlobalScope, ((AtomicOperation)compositeOperation.Operations[0]).Scope);
181      List<IList<IScope>> prunedScopes = new List<IList<IScope>>();
182      PruneToParentScope(GlobalScope, parentScope, prunedScopes);
183      List<IScope> subScopes = new List<IScope>(parentScope.SubScopes);
184      foreach(IScope scope in subScopes) {
185        parentScope.RemoveSubScope(scope);
186      }
187      // start all parallel jobs
188      foreach(AtomicOperation parOperation in compositeOperation.Operations) {
189        parentScope.AddSubScope(parOperation.Scope);
190        waithandles[i++] = jobManager.BeginExecuteOperation(GlobalScope, parOperation);
191        parentScope.RemoveSubScope(parOperation.Scope);
192      }
193      foreach(IScope scope in subScopes) {
194        parentScope.AddSubScope(scope);
195      }
196      prunedScopes.Reverse();
197      RestoreFullTree(GlobalScope, prunedScopes);
198
199      return waithandles;
200    }
201
202    private void WaitForAll(WaitHandle[] waithandles) {
203      // wait until all jobs are finished
204      // WaitAll works only with maximally 64 waithandles
205      if(waithandles.Length <= 64) {
206        WaitHandle.WaitAll(waithandles);
207      } else {
208        int i;
209        for(i = 0; i < waithandles.Length; i++) {
210          waithandles[i].WaitOne();
211          waithandles[i].Close();
212        }
213      }
214    }
215
216    private void ProcessResults(List<KeyValuePair<ProcessingEngine, AtomicOperation>> results) {
217      // create a new compositeOperation to hold canceled operations that should be restarted
218      CompositeOperation canceledOperations = new CompositeOperation();
219      canceledOperations.ExecuteInParallel = true;
220
221      suspendedEngines.Clear();
222      // retrieve results and merge into scope-tree
223      foreach(KeyValuePair<ProcessingEngine, AtomicOperation> p in results) {
224        ProcessingEngine resultEngine = p.Key;
225        AtomicOperation parOperation = p.Value;
226        if(resultEngine.Canceled && !resultEngine.Suspended) {
227          // when an engine was canceled but not suspended this means there was a problem
228          // show error message and queue the operation for restart (again parallel)
229          // but don't merge the results of the aborted engine
230          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(new JobExecutionException(resultEngine.ErrorMessage)); });
231          canceledOperations.AddOperation(parOperation);
232        } else if(resultEngine.Suspended) {
233          // when the engine was suspended it means it was stopped because of a breakpoint
234          // -> merge the partial results and queue the engine (which has operations remaining in the execution stack) to be resumed (parallel)
235          MergeScope(parOperation.Scope, resultEngine.InitialOperation.Scope);
236          resultEngine.InitialOperation = parOperation;
237          suspendedEngines.Add(new KeyValuePair<ProcessingEngine, AtomicOperation>(resultEngine, parOperation));
238        } else {
239          // engine is finished ->
240          // simply merge the results into our local scope-tree
241          MergeScope(parOperation.Scope, resultEngine.InitialOperation.Scope);
242        }
243      }
244      // if there were exceptions -> abort
245      if(canceledOperations.Operations.Count > 0) {
246        // requeue the aborted operations
247        myExecutionStack.Push(canceledOperations);
248        Abort();
249      }
250      // if there were breakpoints -> abort
251      if(suspendedEngines.Count > 0) {
252        Abort();
253      }
254    }
255
256    private void RestoreFullTree(IScope currentScope, IList<IList<IScope>> savedScopes) {
257      if(savedScopes.Count == 0) return;
258      IScope remainingBranch = currentScope.SubScopes[0];
259      currentScope.RemoveSubScope(remainingBranch);
260      IList<IScope> savedScopesForCurrent = savedScopes[0];
261      foreach(IScope savedScope in savedScopesForCurrent) {
262        currentScope.AddSubScope(savedScope);
263      }
264      savedScopes.RemoveAt(0);
265      RestoreFullTree(remainingBranch, savedScopes);
266    }
267
268    private IScope PruneToParentScope(IScope currentScope, IScope scope, IList<IList<IScope>> prunedScopes) {
269      if(currentScope == scope) return currentScope;
270      if(currentScope.SubScopes.Count == 0) return null;
271      IScope foundScope = null;
272      // try to find the searched scope in all my sub-scopes
273      foreach(IScope subScope in currentScope.SubScopes) {
274        foundScope = PruneToParentScope(subScope, scope, prunedScopes);
275        if(foundScope != null) break; // we can stop as soon as we find the scope in a branch
276      }
277      if(foundScope != null) { // when we found the scopes in my sub-scopes
278        List<IScope> subScopes = new List<IScope>(currentScope.SubScopes); // store the list of sub-scopes
279        prunedScopes.Add(subScopes);
280        // remove all my sub-scopes
281        foreach(IScope subScope in subScopes) {
282          currentScope.RemoveSubScope(subScope);
283        }
284        // add only the branch that leads to the scope that I search for
285        currentScope.AddSubScope(foundScope);
286        return currentScope; // return that this scope contains the branch that leads to the searched scopes
287      } else {
288        return null; // otherwise we didn't find the searched scope and we can return null
289      }
290    }
291
292    private IScope FindParentScope(IScope currentScope, IScope childScope) {
293      if(currentScope.SubScopes.Contains(childScope)) return currentScope;
294      foreach(IScope subScope in currentScope.SubScopes) {
295        IScope result = FindParentScope(subScope, childScope);
296        if(result != null) return result;
297      }
298      return null;
299    }
300
301    private void MergeScope(IScope original, IScope result) {
302      // merge the results
303      original.Clear();
304      foreach(IVariable variable in result.Variables) {
305        original.AddVariable(variable);
306      }
307      foreach(IScope subScope in result.SubScopes) {
308        original.AddSubScope(subScope);
309      }
310      foreach(KeyValuePair<string, string> alias in result.Aliases) {
311        original.AddAlias(alias.Key, alias.Value);
312      }
313    }
314
315    #region Persistence Methods
316    public override XmlNode GetXmlNode(string name, XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
317      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
318      XmlAttribute addressAttribute = document.CreateAttribute("ServerAddress");
319      addressAttribute.Value = ServerAddress;
320      node.Attributes.Append(addressAttribute);
321      if(suspendedEngines.Count > 0) {
322        XmlNode suspendedEnginesNode = document.CreateElement("SuspendedEngines");
323        foreach(KeyValuePair<ProcessingEngine, AtomicOperation> p in suspendedEngines) {
324          XmlNode n = document.CreateElement("Entry");
325          n.AppendChild(PersistenceManager.Persist(p.Key, document, persistedObjects));
326          n.AppendChild(PersistenceManager.Persist(p.Value, document, persistedObjects));
327          suspendedEnginesNode.AppendChild(n);
328        }
329        node.AppendChild(suspendedEnginesNode);
330      }
331      return node;
332    }
333    public override void Populate(XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
334      base.Populate(node, restoredObjects);
335      ServerAddress = node.Attributes["ServerAddress"].Value;
336      XmlNode suspendedEnginesNode = node.SelectSingleNode("SuspendedEngines");
337      if(suspendedEnginesNode != null) {
338        foreach(XmlNode n in suspendedEnginesNode.ChildNodes) {
339          KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
340            (ProcessingEngine)PersistenceManager.Restore(n.ChildNodes[0], restoredObjects),
341            (AtomicOperation)PersistenceManager.Restore(n.ChildNodes[1], restoredObjects));
342          suspendedEngines.Add(p);
343        }
344      }
345    }
346    #endregion
347  }
348}
Note: See TracBrowser for help on using the repository browser.