Free cookie consent management tool by TermsFeed Policy Generator

source: branches/Collections/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs @ 846

Last change on this file since 846 was 384, checked in by gkronber, 16 years ago

merged changesets r382 and r383 (fix references and compiler warnings) into the "collections" branch

File size: 10.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Xml;
26using System.Threading;
27using HeuristicLab.Core;
28using HeuristicLab.Grid;
29using System.ServiceModel;
30using System.IO;
31using System.IO.Compression;
32using HeuristicLab.PluginInfrastructure;
33using System.Windows.Forms;
34using System.Diagnostics;
35
36namespace HeuristicLab.DistributedEngine {
37  public class DistributedEngine : EngineBase, IEditable {
38    private JobManager jobManager;
39    private string serverAddress;
40    public string ServerAddress {
41      get { return serverAddress; }
42      set {
43        if(value != serverAddress) {
44          serverAddress = value;
45        }
46      }
47    }
48    public override object Clone(IDictionary<Guid, object> clonedObjects) {
49      DistributedEngine clone = (DistributedEngine)base.Clone(clonedObjects);
50      clone.ServerAddress = serverAddress;
51      return clone;
52    }
53
54    public override IView CreateView() {
55      return new DistributedEngineEditor(this);
56    }
57    public virtual IEditor CreateEditor() {
58      return new DistributedEngineEditor(this);
59    }
60
61    public override void Execute() {
62      if(jobManager == null) this.jobManager = new JobManager(serverAddress);
63      jobManager.Reset();
64      base.Execute();
65    }
66
67    public override void ExecuteSteps(int steps) {
68      throw new InvalidOperationException("DistributedEngine doesn't support stepwise execution");
69    }
70
71    protected override void ProcessNextOperation() {
72      IOperation operation = myExecutionStack.Pop();
73      if(operation is AtomicOperation) {
74        AtomicOperation atomicOperation = (AtomicOperation)operation;
75        IOperation next = null;
76        try {
77          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
78        } catch(Exception ex) {
79          // push operation on stack again
80          myExecutionStack.Push(atomicOperation);
81          Abort();
82          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
83        }
84        if(next != null)
85          myExecutionStack.Push(next);
86        OnOperationExecuted(atomicOperation);
87        if(atomicOperation.Operator.Breakpoint) Abort();
88      } else if(operation is CompositeOperation) {
89        CompositeOperation compositeOperation = (CompositeOperation)operation;
90        if(compositeOperation.ExecuteInParallel) {
91          try {
92            WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
93            int i = 0;
94            // HACK: assume that all atomicOperations have the same parent scope.
95            // 1) find that parent scope
96            // 2) remove all branches starting from the global scope that don't lead to the parentScope of the parallel operation
97            // 3) keep the branches to 'repair' the scope-tree later
98            // 4) for each parallel job attach only the sub-scope that this operation uses
99            // 5) after starting all parallel jobs restore the whole scope-tree
100            IScope parentScope = FindParentScope(GlobalScope, ((AtomicOperation)compositeOperation.Operations[0]).Scope);
101            List<IList<IScope>> prunedScopes = new List<IList<IScope>>();
102            PruneToParentScope(GlobalScope, parentScope, prunedScopes);
103            List<IScope> subScopes = new List<IScope>(parentScope.SubScopes);
104            foreach(IScope scope in subScopes) {
105              parentScope.RemoveSubScope(scope);
106            }
107            // start all parallel jobs
108            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
109              parentScope.AddSubScope(parOperation.Scope);
110              waithandles[i++] = jobManager.BeginExecuteOperation(GlobalScope, parOperation);
111              parentScope.RemoveSubScope(parOperation.Scope);
112            }
113            foreach(IScope scope in subScopes) {
114              parentScope.AddSubScope(scope);
115            }
116            prunedScopes.Reverse();
117            RestoreFullTree(GlobalScope, prunedScopes);
118
119            // wait until all jobs are finished
120            // WaitAll works only with maximally 64 waithandles
121            if(waithandles.Length <= 64) {
122              WaitHandle.WaitAll(waithandles);
123            } else {
124              for(i = 0; i < waithandles.Length; i++) {
125                waithandles[i].WaitOne();
126                waithandles[i].Close();
127              }
128            }
129            // retrieve results and merge into scope-tree
130            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
131              ProcessingEngine resultEngine = jobManager.EndExecuteOperation(parOperation);
132              if(resultEngine.ExecutionStack.Count > 0) {
133                // when there are operations left in the execution stack it means that the engine has been aborted
134                // for unkown reason. Probably there was a problem at the client, so we can try to execute the steps locally.
135                // If they also fail the (local) distributued-engine will be aborted and we will see an error-message.
136                // Solution: We could push all waiting operations in the execution stack of the result engine into our own
137                // execution stack, but this is not easy because we have to change the operations to point to the
138                // original scopes instead of the new scopes (created while deserializing the processing engine).
139                // Instead just push the original parallel operation back on the stack to force local execution.
140                ExecutionStack.Push(parOperation);
141              } else {
142                // if everything went fine we can merge the results into our local scope-tree
143                MergeScope(parOperation.Scope, resultEngine.InitialOperation.Scope);
144              }
145            }
146          } catch(Exception e) {
147            myExecutionStack.Push(compositeOperation);
148            Abort();
149            ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
150          }
151          OnOperationExecuted(compositeOperation);
152        } else {
153          for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
154            myExecutionStack.Push(compositeOperation.Operations[i]);
155        }
156      }
157    }
158
159    private void RestoreFullTree(IScope currentScope, IList<IList<IScope>> savedScopes) {
160      if(savedScopes.Count == 0) return;
161      IScope remainingBranch = currentScope.SubScopes[0];
162      currentScope.RemoveSubScope(remainingBranch);
163      IList<IScope> savedScopesForCurrent = savedScopes[0];
164      foreach(IScope savedScope in savedScopesForCurrent) {
165        currentScope.AddSubScope(savedScope);
166      }
167      savedScopes.RemoveAt(0);
168      RestoreFullTree(remainingBranch, savedScopes);
169    }
170
171    private IScope PruneToParentScope(IScope currentScope, IScope scope, IList<IList<IScope>> prunedScopes) {
172      if(currentScope == scope) return currentScope;
173      if(currentScope.SubScopes.Count == 0) return null;
174      IScope foundScope = null;
175      // try to find the searched scope in all my sub-scopes
176      foreach(IScope subScope in currentScope.SubScopes) {
177        foundScope = PruneToParentScope(subScope, scope, prunedScopes);
178        if(foundScope != null) break; // we can stop as soon as we find the scope in a branch
179      }
180      if(foundScope != null) { // when we found the scopes in my sub-scopes
181        List<IScope> subScopes = new List<IScope>(currentScope.SubScopes); // store the list of sub-scopes
182        prunedScopes.Add(subScopes);
183        // remove all my sub-scopes
184        foreach(IScope subScope in subScopes) {
185          currentScope.RemoveSubScope(subScope);
186        }
187        // add only the branch that leads to the scope that I search for
188        currentScope.AddSubScope(foundScope);
189        return currentScope; // return that this scope contains the branch that leads to the searched scopes
190      } else {
191        return null; // otherwise we didn't find the searched scope and we can return null
192      }
193    }
194
195    private IScope FindParentScope(IScope currentScope, IScope childScope) {
196      if(currentScope.SubScopes.Contains(childScope)) return currentScope;
197      foreach(IScope subScope in currentScope.SubScopes) {
198        IScope result = FindParentScope(subScope, childScope);
199        if(result != null) return result;
200      }
201      return null;
202    }
203
204    private void MergeScope(IScope original, IScope result) {
205      // merge the results
206      original.Clear();
207      foreach(IVariable variable in result.Variables) {
208        original.AddVariable(variable);
209      }
210      foreach(IScope subScope in result.SubScopes) {
211        original.AddSubScope(subScope);
212      }
213      foreach(KeyValuePair<string, string> alias in result.Aliases) {
214        original.AddAlias(alias.Key, alias.Value);
215      }
216    }
217
218    #region Persistence Methods
219    public override XmlNode GetXmlNode(string name, XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
220      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
221      XmlAttribute addressAttribute = document.CreateAttribute("ServerAddress");
222      addressAttribute.Value = ServerAddress;
223      node.Attributes.Append(addressAttribute);
224      return node;
225    }
226    public override void Populate(XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
227      base.Populate(node, restoredObjects);
228      ServerAddress = node.Attributes["ServerAddress"].Value;
229    }
230    #endregion
231  }
232}
Note: See TracBrowser for help on using the repository browser.