Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.0/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs @ 398

Last change on this file since 398 was 300, checked in by gkronber, 16 years ago

merged r229 into HL3 stable branch

File size: 9.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Xml;
26using System.Threading;
27using HeuristicLab.Core;
28using HeuristicLab.Grid;
29using System.ServiceModel;
30using System.IO;
31using System.IO.Compression;
32using HeuristicLab.PluginInfrastructure;
33using System.Windows.Forms;
34using System.Diagnostics;
35
36namespace HeuristicLab.DistributedEngine {
37  public class DistributedEngine : EngineBase, IEditable {
38    private JobManager jobManager;
39    private CompositeOperation waitingOperations;
40    private string serverAddress;
41    public string ServerAddress {
42      get { return serverAddress; }
43      set {
44        if(value != serverAddress) {
45          serverAddress = value;
46        }
47      }
48    }
49    public override object Clone(IDictionary<Guid, object> clonedObjects) {
50      DistributedEngine clone = (DistributedEngine)base.Clone(clonedObjects);
51      clone.ServerAddress = serverAddress;
52      return clone;
53    }
54
55    public override IView CreateView() {
56      return new DistributedEngineEditor(this);
57    }
58    public virtual IEditor CreateEditor() {
59      return new DistributedEngineEditor(this);
60    }
61
62    public override void Execute() {
63      if(jobManager == null) this.jobManager = new JobManager(serverAddress);
64      jobManager.Reset();
65      base.Execute();
66    }
67
68    public override void ExecuteSteps(int steps) {
69      throw new InvalidOperationException("DistributedEngine doesn't support stepwise execution");
70    }
71
72    protected override void ProcessNextOperation() {
73      IOperation operation = myExecutionStack.Pop();
74      if(operation is AtomicOperation) {
75        AtomicOperation atomicOperation = (AtomicOperation)operation;
76        IOperation next = null;
77        try {
78          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
79        } catch(Exception ex) {
80          // push operation on stack again
81          myExecutionStack.Push(atomicOperation);
82          Abort();
83          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
84        }
85        if(next != null)
86          myExecutionStack.Push(next);
87        OnOperationExecuted(atomicOperation);
88        if(atomicOperation.Operator.Breakpoint) Abort();
89      } else if(operation is CompositeOperation) {
90        CompositeOperation compositeOperation = (CompositeOperation)operation;
91        if(compositeOperation.ExecuteInParallel) {
92          try {
93            WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
94            int i = 0;
95            // HACK: assume that all atomicOperations have the same parent scope.
96            // 1) find that parent scope
97            // 2) remove all branches starting from the global scope that don't lead to the parentScope of the parallel operation
98            // 3) keep the branches to 'repair' the scope-tree later
99            // 4) for each parallel job attach only the sub-scope that this operation uses
100            // 5) after starting all parallel jobs restore the whole scope-tree
101            IScope parentScope = FindParentScope(GlobalScope, compositeOperation);
102            List<IList<IScope>> prunedScopes = new List<IList<IScope>>();
103            PruneToParentScope(GlobalScope, parentScope, prunedScopes);
104            List<IScope> subScopes = new List<IScope>(parentScope.SubScopes);
105            foreach(IScope scope in subScopes) {
106              parentScope.RemoveSubScope(scope);
107            }
108            // start all parallel jobs
109            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
110              parentScope.AddSubScope(parOperation.Scope);
111              waithandles[i++] = jobManager.BeginExecuteOperation(GlobalScope, parOperation);
112              parentScope.RemoveSubScope(parOperation.Scope);
113            }
114            foreach(IScope scope in subScopes) {
115              parentScope.AddSubScope(scope);
116            }
117            prunedScopes.Reverse();
118            RestoreFullTree(GlobalScope, prunedScopes);
119
120            // wait until all jobs are finished
121            // WaitAll works only with maximally 64 waithandles
122            if(waithandles.Length <= 64) {
123              WaitHandle.WaitAll(waithandles);
124            } else {
125              for(i = 0; i < waithandles.Length; i++) {
126                waithandles[i].WaitOne();
127                waithandles[i].Close();
128              }
129            }
130            // retrieve results and merge into scope-tree
131            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
132              IScope result = jobManager.EndExecuteOperation(parOperation);
133              MergeScope(parOperation.Scope, result);
134            }
135          } catch(Exception e) {
136            myExecutionStack.Push(compositeOperation);
137            Abort();
138            ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
139          }
140          OnOperationExecuted(compositeOperation);
141        } else {
142          for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
143            myExecutionStack.Push(compositeOperation.Operations[i]);
144        }
145      }
146    }
147
148    private void RestoreFullTree(IScope currentScope, IList<IList<IScope>> savedScopes) {
149      if(savedScopes.Count == 0) return;
150      IScope remainingBranch = currentScope.SubScopes[0];
151      currentScope.RemoveSubScope(remainingBranch);
152      IList<IScope> savedScopesForCurrent = savedScopes[0];
153      foreach(IScope savedScope in savedScopesForCurrent) {
154        currentScope.AddSubScope(savedScope);
155      }
156      savedScopes.RemoveAt(0);
157      RestoreFullTree(remainingBranch, savedScopes);
158    }
159
160    private IScope PruneToParentScope(IScope currentScope, IScope scope, IList<IList<IScope>> prunedScopes) {
161      if(currentScope == scope) return currentScope;
162      if(currentScope.SubScopes.Count == 0) return null;
163      IScope foundScope = null;
164      // try to find the searched scope in all my sub-scopes
165      foreach(IScope subScope in currentScope.SubScopes) {
166        foundScope = PruneToParentScope(subScope, scope, prunedScopes);
167        if(foundScope != null) break; // we can stop as soon as we find the scope in a branch
168      }
169      if(foundScope != null) { // when we found the scopes in my sub-scopes
170        List<IScope> subScopes = new List<IScope>(currentScope.SubScopes); // store the list of sub-scopes
171        prunedScopes.Add(subScopes);
172        // remove all my sub-scopes
173        foreach(IScope subScope in subScopes) {
174          currentScope.RemoveSubScope(subScope);
175        }
176        // add only the branch that leads to the scope that I search for
177        currentScope.AddSubScope(foundScope);
178        return currentScope; // return that this scope contains the branch that leads to the searched scopes
179      } else {
180        return null; // otherwise we didn't find the searched scope and we can return null
181      }
182    }
183
184    private IScope FindParentScope(IScope currentScope, CompositeOperation compositeOperation) {
185      AtomicOperation currentOperation = (AtomicOperation)compositeOperation.Operations[0];
186      if(currentScope.SubScopes.Contains(currentOperation.Scope)) return currentScope;
187      foreach(IScope subScope in currentScope.SubScopes) {
188        IScope result = FindParentScope(subScope, compositeOperation);
189        if(result != null) return result;
190      }
191      return null;
192    }
193
194    private void MergeScope(IScope original, IScope result) {
195      // merge the results
196      original.Clear();
197      foreach(IVariable variable in result.Variables) {
198        original.AddVariable(variable);
199      }
200      foreach(IScope subScope in result.SubScopes) {
201        original.AddSubScope(subScope);
202      }
203      foreach(KeyValuePair<string, string> alias in result.Aliases) {
204        original.AddAlias(alias.Key, alias.Value);
205      }
206    }
207
208    #region Persistence Methods
209    public override XmlNode GetXmlNode(string name, XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
210      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
211      XmlAttribute addressAttribute = document.CreateAttribute("ServerAddress");
212      addressAttribute.Value = ServerAddress;
213      node.Attributes.Append(addressAttribute);
214      return node;
215    }
216    public override void Populate(XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
217      base.Populate(node, restoredObjects);
218      ServerAddress = node.Attributes["ServerAddress"].Value;
219    }
220    #endregion
221  }
222}
Note: See TracBrowser for help on using the repository browser.