Free cookie consent management tool by TermsFeed Policy Generator

source: branches/Operator Architecture Refactoring/HeuristicLab.ThreadParallelEngine/3.2/ThreadParallelEngine.cs @ 2219

Last change on this file since 2219 was 2029, checked in by swagner, 16 years ago

Refactoring of the Operator Architecture (#95)

File size: 9.5 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Xml;
26using System.Threading;
27using HeuristicLab.Core;
28
29namespace HeuristicLab.ThreadParallelEngine {
30  /// <summary>
31  /// Implementation of an engine being able to run in parallel with threads.
32  /// </summary>
33  public class ThreadParallelEngine : EngineBase, IEditable {
34    #region Inner Class Task
35    private class TaskList {
36      public Stack<IOperation>[] tasks;
37      public int next;
38      public Semaphore semaphore;
39    }
40    #endregion
41
42    // currently executed operators
43    private IOperator[] currentOperators;
44    private int operatorIndex;
45
46    private int myWorkers;
47    /// <summary>
48    /// Gets or sets the number of worker threads of the current engine.
49    /// </summary>
50    /// <remarks>Calls <see cref="OnWorkersChanged"/> in the setter.</remarks>
51    public int Workers {
52      get { return myWorkers; }
53      set {
54        if (value != myWorkers) {
55          myWorkers = value;
56          OnWorkersChanged();
57        }
58      }
59    }
60
61    /// <summary>
62    /// Initializes a new instance of <see cref="ThreadParallelEngine"/> with the number of processors
63    /// as number of worker threads.
64    /// </summary>
65    public ThreadParallelEngine() {
66      myWorkers = System.Environment.ProcessorCount;
67      currentOperators = new IOperator[1000];
68    }
69
70
71    /// <inheritdoc/>
72    /// <returns>The cloned object as <see cref="ThreadParallelEngine"/>.</returns>
73    public override object Clone(IDictionary<Guid, object> clonedObjects) {
74      ThreadParallelEngine clone = (ThreadParallelEngine)base.Clone(clonedObjects);
75      clone.myWorkers = Workers;
76      return clone;
77    }
78
79    /// <summary>
80    /// Creates a new instance of <see cref="ThreadParallelEngineEditor"/> to display the current
81    /// instance.
82    /// </summary>
83    /// <returns>The created instance as <see cref="ThreadParallelEngineEditor"/>.</returns>
84    public override IView CreateView() {
85      return new ThreadParallelEngineEditor(this);
86    }
87    /// <summary>
88    /// Creates a new instance of <see cref="ThreadParallelEngineEditor"/>.
89    /// </summary>
90    /// <returns>The created instance as <see cref="ThreadParallelEngineEditor"/>.</returns>
91    public virtual IEditor CreateEditor() {
92      return new ThreadParallelEngineEditor(this);
93    }
94
95    /// <summary>
96    /// This execution method is not supported by ThreadParallelEngines.
97    /// </summary>
98    /// <exception cref="InvalidOperationException">Thrown because the current instance of an engine
99    /// does not support stepwise execution.</exception>
100    /// <param name="steps">The number of steps to execute.</param>
101    public override void ExecuteSteps(int steps) {
102      throw new InvalidOperationException("ThreadParallelEngine doesn't support stepwise execution");
103    }
104
105    /// <inheritdoc/>
106    public override void Abort() {
107      base.Abort();
108      for (int i = 0; i < currentOperators.Length; i++) {
109        if (currentOperators[i] != null)
110          currentOperators[i].Abort();
111      }
112    }
113    /// <summary>
114    /// Processes the next operation (if it is a compositeOperation and it can be executed in parallel it is
115    /// done).
116    /// </summary>
117    protected override void ProcessNextOperation() {
118      operatorIndex = 1;
119      ProcessNextOperation(myExecutionStack, 0);
120    }
121    private void ProcessNextOperation(Stack<IOperation> stack, int currentOperatorIndex) {
122      IOperation operation = stack.Pop();
123      if (operation is AtomicOperation) {
124        AtomicOperation atomicOperation = (AtomicOperation)operation;
125        IOperation next = null;
126        try {
127          currentOperators[currentOperatorIndex] = atomicOperation.Operator;
128          next = atomicOperation.Operator.Execute(atomicOperation.Environment, atomicOperation.Scope);
129        }
130        catch (Exception ex) {
131          // push operation on stack again
132          stack.Push(atomicOperation);
133          Abort();
134          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
135        }
136        if (next != null)
137          stack.Push(next);
138        OnOperationExecuted(atomicOperation);
139        if (atomicOperation.Operator.Breakpoint) Abort();
140      } else if (operation is CompositeOperation) {
141        CompositeOperation compositeOperation = (CompositeOperation)operation;
142        if (compositeOperation.ExecuteInParallel) {
143          TaskList list = new TaskList();
144          list.tasks = new Stack<IOperation>[compositeOperation.Operations.Count];
145          for (int i = 0; i < list.tasks.Length; i++) {
146            list.tasks[i] = new Stack<IOperation>();
147            list.tasks[i].Push(compositeOperation.Operations[i]);
148          }
149          list.next = 0;
150          list.semaphore = new Semaphore(0, Workers);
151
152          for (int i = 0; i < Workers; i++)
153            ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessParallelOperation), list);
154          for (int i = 0; i < Workers; i++)
155            list.semaphore.WaitOne();
156          list.semaphore.Close();
157
158          if (Canceled) {
159            // write back not finished tasks
160            CompositeOperation remaining = new CompositeOperation();
161            remaining.ExecuteInParallel = true;
162            for (int i = 0; i < list.tasks.Length; i++) {
163              if (list.tasks[i].Count > 0) {
164                CompositeOperation task = new CompositeOperation();
165                while (list.tasks[i].Count > 0)
166                  task.AddOperation(list.tasks[i].Pop());
167                remaining.AddOperation(task);
168              }
169            }
170            if (remaining.Operations.Count > 0)
171              stack.Push(remaining);
172          }
173        } else {
174          for (int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
175            stack.Push(compositeOperation.Operations[i]);
176        }
177      }
178    }
179    private void ProcessParallelOperation(object state) {
180      TaskList list = (TaskList)state;
181      int currentOperatorIndex, next;
182
183      do {
184        lock (currentOperators) {
185          currentOperatorIndex = operatorIndex;
186          operatorIndex++;
187        }
188        lock (list) {
189          next = list.next;
190          list.next++;
191        }
192
193        if (next < list.tasks.Length) {
194          Stack<IOperation> stack = list.tasks[next];
195          while ((!Canceled) && (stack.Count > 0))
196            ProcessNextOperation(stack, currentOperatorIndex);
197        }
198      } while ((!Canceled) && (next < list.tasks.Length));
199      list.semaphore.Release();
200    }
201
202    /// <summary>
203    /// Occurs when the number of workers has been changed.
204    /// </summary>
205    public event EventHandler WorkersChanged;
206    /// <summary>
207    /// Fires a new <c>WorkersChanged</c> event.
208    /// </summary>
209    protected virtual void OnWorkersChanged() {
210      if (WorkersChanged != null)
211        WorkersChanged(this, new EventArgs());
212    }
213
214    #region Persistence Methods
215    /// <summary>
216    /// Saves the current instance as <see cref="XmlNode"/> in the specified <paramref name="document"/>.
217    /// </summary>
218    /// <remarks>The number of workers is saved as <see cref="XmlAttribute"/> with attribute name
219    /// <c>Workers</c>.</remarks>
220    /// <param name="name">The (tag)name of the <see cref="XmlNode"/>.</param>
221    /// <param name="document">The <see cref="XmlDocument"/> where the data is saved.</param>
222    /// <param name="persistedObjects">The dictionary of all already persisted objects.
223    /// (Needed to avoid cycles.)</param>
224    /// <returns>The saved <see cref="XmlNode"/>.</returns>
225    public override XmlNode GetXmlNode(string name, XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
226      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
227      XmlAttribute workersAttribute = document.CreateAttribute("Workers");
228      workersAttribute.Value = Workers.ToString();
229      node.Attributes.Append(workersAttribute);
230      return node;
231    }
232    /// <summary>
233    /// Loads the persisted engine from the specified <paramref name="node"/>.
234    /// </summary>
235    /// <remarks>The number of workers must be saved in a specific way, see <see cref="GetXmlNode"/>
236    /// for further information.</remarks>
237    /// <param name="node">The <see cref="XmlNode"/> where the instance is saved.</param>
238    /// <param name="restoredObjects">The dictionary of all already restored objects.
239    /// (Needed to avoid cycles.)</param>
240    public override void Populate(XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
241      base.Populate(node, restoredObjects);
242      myWorkers = int.Parse(node.Attributes["Workers"].Value);
243    }
244    #endregion
245  }
246}
Note: See TracBrowser for help on using the repository browser.