Free cookie consent management tool by TermsFeed Policy Generator

source: branches/Collections/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs @ 381

Last change on this file since 381 was 299, checked in by gkronber, 17 years ago

fixed #164

File size: 10.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Xml;
26using System.Threading;
27using HeuristicLab.Core;
28using HeuristicLab.Grid;
29using System.ServiceModel;
30using System.IO;
31using System.IO.Compression;
32using HeuristicLab.PluginInfrastructure;
33using System.Windows.Forms;
34using System.Diagnostics;
35
36namespace HeuristicLab.DistributedEngine {
37  public class DistributedEngine : EngineBase, IEditable {
38    private JobManager jobManager;
39    private CompositeOperation waitingOperations;
40    private string serverAddress;
41    public string ServerAddress {
42      get { return serverAddress; }
43      set {
44        if(value != serverAddress) {
45          serverAddress = value;
46        }
47      }
48    }
49    public override object Clone(IDictionary<Guid, object> clonedObjects) {
50      DistributedEngine clone = (DistributedEngine)base.Clone(clonedObjects);
51      clone.ServerAddress = serverAddress;
52      return clone;
53    }
54
55    public override IView CreateView() {
56      return new DistributedEngineEditor(this);
57    }
58    public virtual IEditor CreateEditor() {
59      return new DistributedEngineEditor(this);
60    }
61
62    public override void Execute() {
63      if(jobManager == null) this.jobManager = new JobManager(serverAddress);
64      jobManager.Reset();
65      base.Execute();
66    }
67
68    public override void ExecuteSteps(int steps) {
69      throw new InvalidOperationException("DistributedEngine doesn't support stepwise execution");
70    }
71
72    protected override void ProcessNextOperation() {
73      IOperation operation = myExecutionStack.Pop();
74      if(operation is AtomicOperation) {
75        AtomicOperation atomicOperation = (AtomicOperation)operation;
76        IOperation next = null;
77        try {
78          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
79        } catch(Exception ex) {
80          // push operation on stack again
81          myExecutionStack.Push(atomicOperation);
82          Abort();
83          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
84        }
85        if(next != null)
86          myExecutionStack.Push(next);
87        OnOperationExecuted(atomicOperation);
88        if(atomicOperation.Operator.Breakpoint) Abort();
89      } else if(operation is CompositeOperation) {
90        CompositeOperation compositeOperation = (CompositeOperation)operation;
91        if(compositeOperation.ExecuteInParallel) {
92          try {
93            WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
94            int i = 0;
95            // HACK: assume that all atomicOperations have the same parent scope.
96            // 1) find that parent scope
97            // 2) remove all branches starting from the global scope that don't lead to the parentScope of the parallel operation
98            // 3) keep the branches to 'repair' the scope-tree later
99            // 4) for each parallel job attach only the sub-scope that this operation uses
100            // 5) after starting all parallel jobs restore the whole scope-tree
101            IScope parentScope = FindParentScope(GlobalScope, ((AtomicOperation)compositeOperation.Operations[0]).Scope);
102            List<IList<IScope>> prunedScopes = new List<IList<IScope>>();
103            PruneToParentScope(GlobalScope, parentScope, prunedScopes);
104            List<IScope> subScopes = new List<IScope>(parentScope.SubScopes);
105            foreach(IScope scope in subScopes) {
106              parentScope.RemoveSubScope(scope);
107            }
108            // start all parallel jobs
109            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
110              parentScope.AddSubScope(parOperation.Scope);
111              waithandles[i++] = jobManager.BeginExecuteOperation(GlobalScope, parOperation);
112              parentScope.RemoveSubScope(parOperation.Scope);
113            }
114            foreach(IScope scope in subScopes) {
115              parentScope.AddSubScope(scope);
116            }
117            prunedScopes.Reverse();
118            RestoreFullTree(GlobalScope, prunedScopes);
119
120            // wait until all jobs are finished
121            // WaitAll works only with maximally 64 waithandles
122            if(waithandles.Length <= 64) {
123              WaitHandle.WaitAll(waithandles);
124            } else {
125              for(i = 0; i < waithandles.Length; i++) {
126                waithandles[i].WaitOne();
127                waithandles[i].Close();
128              }
129            }
130            // retrieve results and merge into scope-tree
131            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
132              ProcessingEngine resultEngine = jobManager.EndExecuteOperation(parOperation);
133              if(resultEngine.ExecutionStack.Count > 0) {
134                // when there are operations left in the execution stack it means that the engine has been aborted
135                // for unkown reason. Probably there was a problem at the client, so we can try to execute the steps locally.
136                // If they also fail the (local) distributued-engine will be aborted and we will see an error-message.
137                // Solution: We could push all waiting operations in the execution stack of the result engine into our own
138                // execution stack, but this is not easy because we have to change the operations to point to the
139                // original scopes instead of the new scopes (created while deserializing the processing engine).
140                // Instead just push the original parallel operation back on the stack to force local execution.
141                ExecutionStack.Push(parOperation);
142              } else {
143                // if everything went fine we can merge the results into our local scope-tree
144                MergeScope(parOperation.Scope, resultEngine.InitialOperation.Scope);
145              }
146            }
147          } catch(Exception e) {
148            myExecutionStack.Push(compositeOperation);
149            Abort();
150            ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
151          }
152          OnOperationExecuted(compositeOperation);
153        } else {
154          for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
155            myExecutionStack.Push(compositeOperation.Operations[i]);
156        }
157      }
158    }
159
160    private void RestoreFullTree(IScope currentScope, IList<IList<IScope>> savedScopes) {
161      if(savedScopes.Count == 0) return;
162      IScope remainingBranch = currentScope.SubScopes[0];
163      currentScope.RemoveSubScope(remainingBranch);
164      IList<IScope> savedScopesForCurrent = savedScopes[0];
165      foreach(IScope savedScope in savedScopesForCurrent) {
166        currentScope.AddSubScope(savedScope);
167      }
168      savedScopes.RemoveAt(0);
169      RestoreFullTree(remainingBranch, savedScopes);
170    }
171
172    private IScope PruneToParentScope(IScope currentScope, IScope scope, IList<IList<IScope>> prunedScopes) {
173      if(currentScope == scope) return currentScope;
174      if(currentScope.SubScopes.Count == 0) return null;
175      IScope foundScope = null;
176      // try to find the searched scope in all my sub-scopes
177      foreach(IScope subScope in currentScope.SubScopes) {
178        foundScope = PruneToParentScope(subScope, scope, prunedScopes);
179        if(foundScope != null) break; // we can stop as soon as we find the scope in a branch
180      }
181      if(foundScope != null) { // when we found the scopes in my sub-scopes
182        List<IScope> subScopes = new List<IScope>(currentScope.SubScopes); // store the list of sub-scopes
183        prunedScopes.Add(subScopes);
184        // remove all my sub-scopes
185        foreach(IScope subScope in subScopes) {
186          currentScope.RemoveSubScope(subScope);
187        }
188        // add only the branch that leads to the scope that I search for
189        currentScope.AddSubScope(foundScope);
190        return currentScope; // return that this scope contains the branch that leads to the searched scopes
191      } else {
192        return null; // otherwise we didn't find the searched scope and we can return null
193      }
194    }
195
196    private IScope FindParentScope(IScope currentScope, IScope childScope) {
197      if(currentScope.SubScopes.Contains(childScope)) return currentScope;
198      foreach(IScope subScope in currentScope.SubScopes) {
199        IScope result = FindParentScope(subScope, childScope);
200        if(result != null) return result;
201      }
202      return null;
203    }
204
205    private void MergeScope(IScope original, IScope result) {
206      // merge the results
207      original.Clear();
208      foreach(IVariable variable in result.Variables) {
209        original.AddVariable(variable);
210      }
211      foreach(IScope subScope in result.SubScopes) {
212        original.AddSubScope(subScope);
213      }
214      foreach(KeyValuePair<string, string> alias in result.Aliases) {
215        original.AddAlias(alias.Key, alias.Value);
216      }
217    }
218
219    #region Persistence Methods
220    public override XmlNode GetXmlNode(string name, XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
221      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
222      XmlAttribute addressAttribute = document.CreateAttribute("ServerAddress");
223      addressAttribute.Value = ServerAddress;
224      node.Attributes.Append(addressAttribute);
225      return node;
226    }
227    public override void Populate(XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
228      base.Populate(node, restoredObjects);
229      ServerAddress = node.Attributes["ServerAddress"].Value;
230    }
231    #endregion
232  }
233}
Note: See TracBrowser for help on using the repository browser.