Free cookie consent management tool by TermsFeed Policy Generator

source: branches/Operator Architecture Refactoring/HeuristicLab.ThreadParallelEngine/3.3/ThreadParallelEngine.cs @ 2219

Last change on this file since 2219 was 2033, checked in by swagner, 16 years ago

Refactoring of the operator architecture (#95)

File size: 7.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Xml;
26using System.Threading;
27using HeuristicLab.Core;
28using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
29
30namespace HeuristicLab.ThreadParallelEngine {
31  /// <summary>
32  /// Implementation of an engine being able to run in parallel with threads.
33  /// </summary>
34  public class ThreadParallelEngine : EngineBase, IEditable {
35    #region Inner Class Task
36    private class TaskList {
37      public Stack<IOperation>[] tasks;
38      public int next;
39      public Semaphore semaphore;
40    }
41    #endregion
42
43    // currently executed operators
44    private IOperator[] currentOperators;
45    private int operatorIndex;
46
47    [Storable]
48    private int myWorkers;
49    /// <summary>
50    /// Gets or sets the number of worker threads of the current engine.
51    /// </summary>
52    /// <remarks>Calls <see cref="OnWorkersChanged"/> in the setter.</remarks>
53    public int Workers {
54      get { return myWorkers; }
55      set {
56        if (value != myWorkers) {
57          myWorkers = value;
58          OnWorkersChanged();
59        }
60      }
61    }
62
63    /// <summary>
64    /// Initializes a new instance of <see cref="ThreadParallelEngine"/> with the number of processors
65    /// as number of worker threads.
66    /// </summary>
67    public ThreadParallelEngine() {
68      myWorkers = Environment.ProcessorCount;
69      currentOperators = new IOperator[1000];
70    }
71
72
73    /// <inheritdoc/>
74    /// <returns>The cloned object as <see cref="ThreadParallelEngine"/>.</returns>
75    public override object Clone(IDictionary<long, object> clonedObjects) {
76      ThreadParallelEngine clone = (ThreadParallelEngine)base.Clone(clonedObjects);
77      clone.myWorkers = Workers;
78      return clone;
79    }
80
81    /// <summary>
82    /// Creates a new instance of <see cref="ThreadParallelEngineEditor"/> to display the current
83    /// instance.
84    /// </summary>
85    /// <returns>The created instance as <see cref="ThreadParallelEngineEditor"/>.</returns>
86    public override IView CreateView() {
87      return new ThreadParallelEngineEditor(this);
88    }
89    /// <summary>
90    /// Creates a new instance of <see cref="ThreadParallelEngineEditor"/>.
91    /// </summary>
92    /// <returns>The created instance as <see cref="ThreadParallelEngineEditor"/>.</returns>
93    public virtual IEditor CreateEditor() {
94      return new ThreadParallelEngineEditor(this);
95    }
96
97    /// <summary>
98    /// This execution method is not supported by ThreadParallelEngines.
99    /// </summary>
100    /// <exception cref="InvalidOperationException">Thrown because the current instance of an engine
101    /// does not support stepwise execution.</exception>
102    /// <param name="steps">The number of steps to execute.</param>
103    public override void ExecuteSteps(int steps) {
104      throw new InvalidOperationException("ThreadParallelEngine doesn't support stepwise execution");
105    }
106
107    /// <inheritdoc/>
108    public override void Abort() {
109      base.Abort();
110      for (int i = 0; i < currentOperators.Length; i++) {
111        if (currentOperators[i] != null)
112          currentOperators[i].Abort();
113      }
114    }
115    /// <summary>
116    /// Processes the next operation (if it is a compositeOperation and it can be executed in parallel it is
117    /// done).
118    /// </summary>
119    protected override void ProcessNextOperation() {
120      operatorIndex = 1;
121      ProcessNextOperation(myExecutionStack, 0);
122    }
123    private void ProcessNextOperation(Stack<IOperation> stack, int currentOperatorIndex) {
124      IOperation operation = stack.Pop();
125      if (operation is AtomicOperation) {
126        AtomicOperation atomicOperation = (AtomicOperation)operation;
127        IOperation next = null;
128        try {
129          currentOperators[currentOperatorIndex] = atomicOperation.Operator;
130          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
131        }
132        catch (Exception ex) {
133          // push operation on stack again
134          stack.Push(atomicOperation);
135          Abort();
136          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
137        }
138        if (next != null)
139          stack.Push(next);
140        OnOperationExecuted(atomicOperation);
141        if (atomicOperation.Operator.Breakpoint) Abort();
142      } else if (operation is CompositeOperation) {
143        CompositeOperation compositeOperation = (CompositeOperation)operation;
144        if (compositeOperation.ExecuteInParallel) {
145          TaskList list = new TaskList();
146          list.tasks = new Stack<IOperation>[compositeOperation.Operations.Count];
147          for (int i = 0; i < list.tasks.Length; i++) {
148            list.tasks[i] = new Stack<IOperation>();
149            list.tasks[i].Push(compositeOperation.Operations[i]);
150          }
151          list.next = 0;
152          list.semaphore = new Semaphore(0, Workers);
153
154          for (int i = 0; i < Workers; i++)
155            ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessParallelOperation), list);
156          for (int i = 0; i < Workers; i++)
157            list.semaphore.WaitOne();
158          list.semaphore.Close();
159
160          if (Canceled) {
161            // write back not finished tasks
162            CompositeOperation remaining = new CompositeOperation();
163            remaining.ExecuteInParallel = true;
164            for (int i = 0; i < list.tasks.Length; i++) {
165              if (list.tasks[i].Count > 0) {
166                CompositeOperation task = new CompositeOperation();
167                while (list.tasks[i].Count > 0)
168                  task.AddOperation(list.tasks[i].Pop());
169                remaining.AddOperation(task);
170              }
171            }
172            if (remaining.Operations.Count > 0)
173              stack.Push(remaining);
174          }
175        } else {
176          for (int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
177            stack.Push(compositeOperation.Operations[i]);
178        }
179      }
180    }
181    private void ProcessParallelOperation(object state) {
182      TaskList list = (TaskList)state;
183      int currentOperatorIndex, next;
184
185      do {
186        lock (currentOperators) {
187          currentOperatorIndex = operatorIndex;
188          operatorIndex++;
189        }
190        lock (list) {
191          next = list.next;
192          list.next++;
193        }
194
195        if (next < list.tasks.Length) {
196          Stack<IOperation> stack = list.tasks[next];
197          while ((!Canceled) && (stack.Count > 0))
198            ProcessNextOperation(stack, currentOperatorIndex);
199        }
200      } while ((!Canceled) && (next < list.tasks.Length));
201      list.semaphore.Release();
202    }
203
204    /// <summary>
205    /// Occurs when the number of workers has been changed.
206    /// </summary>
207    public event EventHandler WorkersChanged;
208    /// <summary>
209    /// Fires a new <c>WorkersChanged</c> event.
210    /// </summary>
211    protected virtual void OnWorkersChanged() {
212      if (WorkersChanged != null)
213        WorkersChanged(this, new EventArgs());
214    }
215  }
216}
Note: See TracBrowser for help on using the repository browser.