Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs @ 257

Last change on this file since 257 was 255, checked in by gkronber, 17 years ago

closing wait-handle just to be sure that resources are released (ticket #149)

File size: 5.8 KB
RevLine 
[2]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Xml;
26using System.Threading;
27using HeuristicLab.Core;
28using HeuristicLab.Grid;
29using System.ServiceModel;
30using System.IO;
31using System.IO.Compression;
[219]32using HeuristicLab.PluginInfrastructure;
33using System.Windows.Forms;
[2]34
35namespace HeuristicLab.DistributedEngine {
36  public class DistributedEngine : EngineBase, IEditable {
[219]37    private JobManager jobManager;
38    private CompositeOperation waitingOperations;
[2]39    private string serverAddress;
40    public string ServerAddress {
41      get { return serverAddress; }
42      set {
43        if(value != serverAddress) {
44          serverAddress = value;
45        }
46      }
47    }
48    public override object Clone(IDictionary<Guid, object> clonedObjects) {
49      DistributedEngine clone = (DistributedEngine)base.Clone(clonedObjects);
50      clone.ServerAddress = serverAddress;
51      return clone;
52    }
53
54    public override IView CreateView() {
55      return new DistributedEngineEditor(this);
56    }
57    public virtual IEditor CreateEditor() {
58      return new DistributedEngineEditor(this);
59    }
60
61    public override void Execute() {
[219]62      if(jobManager == null) this.jobManager = new JobManager(serverAddress);
63      jobManager.Reset();
[2]64      base.Execute();
65    }
66
67    public override void ExecuteSteps(int steps) {
68      throw new InvalidOperationException("DistributedEngine doesn't support stepwise execution");
69    }
70
71    protected override void ProcessNextOperation() {
[219]72      IOperation operation = myExecutionStack.Pop();
73      if(operation is AtomicOperation) {
74        AtomicOperation atomicOperation = (AtomicOperation)operation;
75        IOperation next = null;
76        try {
77          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
78        } catch(Exception ex) {
79          // push operation on stack again
80          myExecutionStack.Push(atomicOperation);
81          Abort();
82          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
[35]83        }
[219]84        if(next != null)
85          myExecutionStack.Push(next);
86        OnOperationExecuted(atomicOperation);
87        if(atomicOperation.Operator.Breakpoint) Abort();
88      } else if(operation is CompositeOperation) {
89        CompositeOperation compositeOperation = (CompositeOperation)operation;
90        if(compositeOperation.ExecuteInParallel) {
[228]91          try {
92            WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
93            int i = 0;
[248]94            // start all parallel jobs
[228]95            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
96              waithandles[i++] = jobManager.BeginExecuteOperation(OperatorGraph, GlobalScope, parOperation);
[222]97            }
[248]98
99            // wait until all jobs are finished
[228]100            // WaitAll works only with maximally 64 waithandles
101            if(waithandles.Length <= 64) {
102              WaitHandle.WaitAll(waithandles);
103            } else {
104              for(i = 0; i < waithandles.Length; i++) {
105                waithandles[i].WaitOne();
[255]106                waithandles[i].Close();
[228]107              }
108            }
[248]109            // retrieve results and merge into scope-tree
110            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
111              IScope result = jobManager.EndExecuteOperation(parOperation);
112              MergeScope(parOperation.Scope, result);
[228]113            }
114          } catch(Exception e) {
[219]115            myExecutionStack.Push(compositeOperation);
[35]116            Abort();
[248]117            ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
[2]118          }
[219]119        } else {
120          for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
121            myExecutionStack.Push(compositeOperation.Operations[i]);
[2]122        }
123      }
124    }
125
[248]126    private void MergeScope(IScope original, IScope result) {
127      // merge the results
128      original.Clear();
129      foreach(IVariable variable in result.Variables) {
130        original.AddVariable(variable);
131      }
132      foreach(IScope subScope in result.SubScopes) {
133        original.AddSubScope(subScope);
134      }
135      foreach(KeyValuePair<string, string> alias in result.Aliases) {
136        original.AddAlias(alias.Key, alias.Value);
137      }
138    }
139
[2]140    #region Persistence Methods
141    public override XmlNode GetXmlNode(string name, XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
142      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
143      XmlAttribute addressAttribute = document.CreateAttribute("ServerAddress");
144      addressAttribute.Value = ServerAddress;
145      node.Attributes.Append(addressAttribute);
146      return node;
147    }
148    public override void Populate(XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
149      base.Populate(node, restoredObjects);
150      ServerAddress = node.Attributes["ServerAddress"].Value;
151    }
152    #endregion
153  }
154}
Note: See TracBrowser for help on using the repository browser.