Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs @ 406

Last change on this file since 406 was 383, checked in by gkronber, 16 years ago

changed code to remove compiler warnings

File size: 10.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Xml;
26using System.Threading;
27using HeuristicLab.Core;
28using HeuristicLab.Grid;
29using System.ServiceModel;
30using System.IO;
31using System.IO.Compression;
32using HeuristicLab.PluginInfrastructure;
33using System.Windows.Forms;
34using System.Diagnostics;
35
36namespace HeuristicLab.DistributedEngine {
37  public class DistributedEngine : EngineBase, IEditable {
38    private JobManager jobManager;
39    private string serverAddress;
40    public string ServerAddress {
41      get { return serverAddress; }
42      set {
43        if(value != serverAddress) {
44          serverAddress = value;
45        }
46      }
47    }
48    public override object Clone(IDictionary<Guid, object> clonedObjects) {
49      DistributedEngine clone = (DistributedEngine)base.Clone(clonedObjects);
50      clone.ServerAddress = serverAddress;
51      return clone;
52    }
53
54    public override IView CreateView() {
55      return new DistributedEngineEditor(this);
56    }
57    public virtual IEditor CreateEditor() {
58      return new DistributedEngineEditor(this);
59    }
60
61    public override void Execute() {
62      if(jobManager == null) this.jobManager = new JobManager(serverAddress);
63      jobManager.Reset();
64      base.Execute();
65    }
66
67    public override void ExecuteSteps(int steps) {
68      throw new InvalidOperationException("DistributedEngine doesn't support stepwise execution");
69    }
70
71    protected override void ProcessNextOperation() {
72      IOperation operation = myExecutionStack.Pop();
73      if(operation is AtomicOperation) {
74        AtomicOperation atomicOperation = (AtomicOperation)operation;
75        IOperation next = null;
76        try {
77          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
78        } catch(Exception ex) {
79          // push operation on stack again
80          myExecutionStack.Push(atomicOperation);
81          Abort();
82          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
83        }
84        if(next != null)
85          myExecutionStack.Push(next);
86        OnOperationExecuted(atomicOperation);
87        if(atomicOperation.Operator.Breakpoint) Abort();
88      } else if(operation is CompositeOperation) {
89        CompositeOperation compositeOperation = (CompositeOperation)operation;
90        if(compositeOperation.ExecuteInParallel) {
91          try {
92            WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
93            int i = 0;
94            // HACK: assume that all atomicOperations have the same parent scope.
95            // 1) find that parent scope
96            // 2) remove all branches starting from the global scope that don't lead to the parentScope of the parallel operation
97            // 3) keep the branches to 'repair' the scope-tree later
98            // 4) for each parallel job attach only the sub-scope that this operation uses
99            // 5) after starting all parallel jobs restore the whole scope-tree
100            IScope parentScope = FindParentScope(GlobalScope, ((AtomicOperation)compositeOperation.Operations[0]).Scope);
101            List<IList<IScope>> prunedScopes = new List<IList<IScope>>();
102            PruneToParentScope(GlobalScope, parentScope, prunedScopes);
103            List<IScope> subScopes = new List<IScope>(parentScope.SubScopes);
104            foreach(IScope scope in subScopes) {
105              parentScope.RemoveSubScope(scope);
106            }
107            // start all parallel jobs
108            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
109              parentScope.AddSubScope(parOperation.Scope);
110              waithandles[i++] = jobManager.BeginExecuteOperation(GlobalScope, parOperation);
111              parentScope.RemoveSubScope(parOperation.Scope);
112            }
113            foreach(IScope scope in subScopes) {
114              parentScope.AddSubScope(scope);
115            }
116            prunedScopes.Reverse();
117            RestoreFullTree(GlobalScope, prunedScopes);
118
119            // wait until all jobs are finished
120            // WaitAll works only with maximally 64 waithandles
121            if(waithandles.Length <= 64) {
122              WaitHandle.WaitAll(waithandles);
123            } else {
124              for(i = 0; i < waithandles.Length; i++) {
125                waithandles[i].WaitOne();
126                waithandles[i].Close();
127              }
128            }
129            // retrieve results and merge into scope-tree
130            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
131              ProcessingEngine resultEngine = jobManager.EndExecuteOperation(parOperation);
132              if(resultEngine.ExecutionStack.Count > 0) {
133                // when there are operations left in the execution stack it means that the engine has been aborted
134                // for unkown reason. Probably there was a problem at the client, so we can try to execute the steps locally.
135                // If they also fail the (local) distributued-engine will be aborted and we will see an error-message.
136                // Solution: We could push all waiting operations in the execution stack of the result engine into our own
137                // execution stack, but this is not easy because we have to change the operations to point to the
138                // original scopes instead of the new scopes (created while deserializing the processing engine).
139                // Instead just push the original parallel operation back on the stack to force local execution.
140                ExecutionStack.Push(parOperation);
141              } else {
142                // if everything went fine we can merge the results into our local scope-tree
143                MergeScope(parOperation.Scope, resultEngine.InitialOperation.Scope);
144              }
145            }
146          } catch(Exception e) {
147            myExecutionStack.Push(compositeOperation);
148            Abort();
149            ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
150          }
151          OnOperationExecuted(compositeOperation);
152        } else {
153          for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
154            myExecutionStack.Push(compositeOperation.Operations[i]);
155        }
156      }
157    }
158
159    private void RestoreFullTree(IScope currentScope, IList<IList<IScope>> savedScopes) {
160      if(savedScopes.Count == 0) return;
161      IScope remainingBranch = currentScope.SubScopes[0];
162      currentScope.RemoveSubScope(remainingBranch);
163      IList<IScope> savedScopesForCurrent = savedScopes[0];
164      foreach(IScope savedScope in savedScopesForCurrent) {
165        currentScope.AddSubScope(savedScope);
166      }
167      savedScopes.RemoveAt(0);
168      RestoreFullTree(remainingBranch, savedScopes);
169    }
170
171    private IScope PruneToParentScope(IScope currentScope, IScope scope, IList<IList<IScope>> prunedScopes) {
172      if(currentScope == scope) return currentScope;
173      if(currentScope.SubScopes.Count == 0) return null;
174      IScope foundScope = null;
175      // try to find the searched scope in all my sub-scopes
176      foreach(IScope subScope in currentScope.SubScopes) {
177        foundScope = PruneToParentScope(subScope, scope, prunedScopes);
178        if(foundScope != null) break; // we can stop as soon as we find the scope in a branch
179      }
180      if(foundScope != null) { // when we found the scopes in my sub-scopes
181        List<IScope> subScopes = new List<IScope>(currentScope.SubScopes); // store the list of sub-scopes
182        prunedScopes.Add(subScopes);
183        // remove all my sub-scopes
184        foreach(IScope subScope in subScopes) {
185          currentScope.RemoveSubScope(subScope);
186        }
187        // add only the branch that leads to the scope that I search for
188        currentScope.AddSubScope(foundScope);
189        return currentScope; // return that this scope contains the branch that leads to the searched scopes
190      } else {
191        return null; // otherwise we didn't find the searched scope and we can return null
192      }
193    }
194
195    private IScope FindParentScope(IScope currentScope, IScope childScope) {
196      if(currentScope.SubScopes.Contains(childScope)) return currentScope;
197      foreach(IScope subScope in currentScope.SubScopes) {
198        IScope result = FindParentScope(subScope, childScope);
199        if(result != null) return result;
200      }
201      return null;
202    }
203
204    private void MergeScope(IScope original, IScope result) {
205      // merge the results
206      original.Clear();
207      foreach(IVariable variable in result.Variables) {
208        original.AddVariable(variable);
209      }
210      foreach(IScope subScope in result.SubScopes) {
211        original.AddSubScope(subScope);
212      }
213      foreach(KeyValuePair<string, string> alias in result.Aliases) {
214        original.AddAlias(alias.Key, alias.Value);
215      }
216    }
217
218    #region Persistence Methods
219    public override XmlNode GetXmlNode(string name, XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
220      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
221      XmlAttribute addressAttribute = document.CreateAttribute("ServerAddress");
222      addressAttribute.Value = ServerAddress;
223      node.Attributes.Append(addressAttribute);
224      return node;
225    }
226    public override void Populate(XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
227      base.Populate(node, restoredObjects);
228      ServerAddress = node.Attributes["ServerAddress"].Value;
229    }
230    #endregion
231  }
232}
Note: See TracBrowser for help on using the repository browser.