Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.ThreadParallelEngine/ThreadParallelEngine.cs @ 817

Last change on this file since 817 was 2, checked in by swagner, 17 years ago

Added HeuristicLab 3.0 sources from former SVN repository at revision 52

File size: 6.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Xml;
26using System.Threading;
27using HeuristicLab.Core;
28
29namespace HeuristicLab.ThreadParallelEngine {
30  public class ThreadParallelEngine : EngineBase, IEditable {
31    #region Inner Class Task
32    private class TaskList {
33      public Stack<IOperation>[] tasks;
34      public int next;
35      public Semaphore semaphore;
36    }
37    #endregion
38
39    // currently executed operators
40    private IOperator[] currentOperators;
41    private int operatorIndex;
42
43    private int myWorkers;
44    public int Workers {
45      get { return myWorkers; }
46      set {
47        if (value != myWorkers) {
48          myWorkers = value;
49          OnWorkersChanged();
50        }
51      }
52    }
53
54
55    public ThreadParallelEngine() {
56      myWorkers = Environment.ProcessorCount;
57      currentOperators = new IOperator[1000];
58    }
59
60
61    public override object Clone(IDictionary<Guid, object> clonedObjects) {
62      ThreadParallelEngine clone = (ThreadParallelEngine)base.Clone(clonedObjects);
63      clone.myWorkers = Workers;
64      return clone;
65    }
66
67    public override IView CreateView() {
68      return new ThreadParallelEngineEditor(this);
69    }
70    public virtual IEditor CreateEditor() {
71      return new ThreadParallelEngineEditor(this);
72    }
73
74    public override void ExecuteSteps(int steps) {
75      throw new InvalidOperationException("ThreadParallelEngine doesn't support stepwise execution");
76    }
77
78    public override void Abort() {
79      base.Abort();
80      for (int i = 0; i < currentOperators.Length; i++) {
81        if (currentOperators[i] != null)
82          currentOperators[i].Abort();
83      }
84    }
85
86    protected override void ProcessNextOperation() {
87      operatorIndex = 1;
88      ProcessNextOperation(myExecutionStack, 0);
89    }
90    private void ProcessNextOperation(Stack<IOperation> stack, int currentOperatorIndex) {
91      IOperation operation = stack.Pop();
92      if (operation is AtomicOperation) {
93        AtomicOperation atomicOperation = (AtomicOperation)operation;
94        IOperation next = null;
95        try {
96          currentOperators[currentOperatorIndex] = atomicOperation.Operator;
97          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
98        }
99        catch (Exception ex) {
100          // push operation on stack again
101          stack.Push(atomicOperation);
102          Abort();
103          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
104        }
105        if (next != null)
106          stack.Push(next);
107        OnOperationExecuted(atomicOperation);
108        if (atomicOperation.Operator.Breakpoint) Abort();
109      } else if (operation is CompositeOperation) {
110        CompositeOperation compositeOperation = (CompositeOperation)operation;
111        if (compositeOperation.ExecuteInParallel) {
112          TaskList list = new TaskList();
113          list.tasks = new Stack<IOperation>[compositeOperation.Operations.Count];
114          for (int i = 0; i < list.tasks.Length; i++) {
115            list.tasks[i] = new Stack<IOperation>();
116            list.tasks[i].Push(compositeOperation.Operations[i]);
117          }
118          list.next = 0;
119          list.semaphore = new Semaphore(0, Workers);
120
121          for (int i = 0; i < Workers; i++)
122            ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessParallelOperation), list);
123          for (int i = 0; i < Workers; i++)
124            list.semaphore.WaitOne();
125          list.semaphore.Close();
126
127          if (Canceled) {
128            // write back not finished tasks
129            CompositeOperation remaining = new CompositeOperation();
130            remaining.ExecuteInParallel = true;
131            for (int i = 0; i < list.tasks.Length; i++) {
132              if (list.tasks[i].Count > 0) {
133                CompositeOperation task = new CompositeOperation();
134                while (list.tasks[i].Count > 0)
135                  task.AddOperation(list.tasks[i].Pop());
136                remaining.AddOperation(task);
137              }
138            }
139            if (remaining.Operations.Count > 0)
140              stack.Push(remaining);
141          }
142        } else {
143          for (int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
144            stack.Push(compositeOperation.Operations[i]);
145        }
146      }
147    }
148    private void ProcessParallelOperation(object state) {
149      TaskList list = (TaskList)state;
150      int currentOperatorIndex, next;
151
152      do {
153        lock (currentOperators) {
154          currentOperatorIndex = operatorIndex;
155          operatorIndex++;
156        }
157        lock (list) {
158          next = list.next;
159          list.next++;
160        }
161
162        if (next < list.tasks.Length) {
163          Stack<IOperation> stack = list.tasks[next];
164          while ((!Canceled) && (stack.Count > 0))
165            ProcessNextOperation(stack, currentOperatorIndex);
166        }
167      } while ((!Canceled) && (next < list.tasks.Length));
168      list.semaphore.Release();
169    }
170
171    public event EventHandler WorkersChanged;
172    protected virtual void OnWorkersChanged() {
173      if (WorkersChanged != null)
174        WorkersChanged(this, new EventArgs());
175    }
176
177    #region Persistence Methods
178    public override XmlNode GetXmlNode(string name, XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
179      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
180      XmlAttribute workersAttribute = document.CreateAttribute("Workers");
181      workersAttribute.Value = Workers.ToString();
182      node.Attributes.Append(workersAttribute);
183      return node;
184    }
185    public override void Populate(XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
186      base.Populate(node, restoredObjects);
187      myWorkers = int.Parse(node.Attributes["Workers"].Value);
188    }
189    #endregion
190  }
191}
Note: See TracBrowser for help on using the repository browser.