Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.ThreadParallelEngine/3.3/ThreadParallelEngine.cs @ 2520

Last change on this file since 2520 was 2520, checked in by swagner, 14 years ago

Implemented first draft of MainForm support in HeuristicLab.Core/HeuristicLab.Core.Views and all other depending plugins (#770)

File size: 7.2 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Xml;
26using System.Threading;
27using HeuristicLab.Core;
28using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
29
30namespace HeuristicLab.ThreadParallelEngine {
31  /// <summary>
32  /// Implementation of an engine being able to run in parallel with threads.
33  /// </summary>
34  public class ThreadParallelEngine : EngineBase {
35    #region Inner Class Task
36    private class TaskList {
37      public Stack<IOperation>[] tasks;
38      public int next;
39      public Semaphore semaphore;
40    }
41    #endregion
42
43    // currently executed operators
44    private IOperator[] currentOperators;
45    private int operatorIndex;
46
47    [Storable]
48    private int myWorkers;
49    /// <summary>
50    /// Gets or sets the number of worker threads of the current engine.
51    /// </summary>
52    /// <remarks>Calls <see cref="OnWorkersChanged"/> in the setter.</remarks>
53    public int Workers {
54      get { return myWorkers; }
55      set {
56        if (value != myWorkers) {
57          myWorkers = value;
58          OnWorkersChanged();
59        }
60      }
61    }
62
63    /// <summary>
64    /// Initializes a new instance of <see cref="ThreadParallelEngine"/> with the number of processors
65    /// as number of worker threads.
66    /// </summary>
67    public ThreadParallelEngine() {
68      myWorkers = Environment.ProcessorCount;
69      currentOperators = new IOperator[1000];
70    }
71
72
73    /// <inheritdoc/>
74    /// <returns>The cloned object as <see cref="ThreadParallelEngine"/>.</returns>
75    public override object Clone(IDictionary<Guid, object> clonedObjects) {
76      ThreadParallelEngine clone = (ThreadParallelEngine)base.Clone(clonedObjects);
77      clone.myWorkers = Workers;
78      return clone;
79    }
80
81    /// <summary>
82    /// This execution method is not supported by ThreadParallelEngines.
83    /// </summary>
84    /// <exception cref="InvalidOperationException">Thrown because the current instance of an engine
85    /// does not support stepwise execution.</exception>
86    /// <param name="steps">The number of steps to execute.</param>
87    public override void ExecuteSteps(int steps) {
88      throw new InvalidOperationException("ThreadParallelEngine doesn't support stepwise execution");
89    }
90
91    /// <inheritdoc/>
92    public override void Abort() {
93      base.Abort();
94      for (int i = 0; i < currentOperators.Length; i++) {
95        if (currentOperators[i] != null)
96          currentOperators[i].Abort();
97      }
98    }
99    /// <summary>
100    /// Processes the next operation (if it is a compositeOperation and it can be executed in parallel it is
101    /// done).
102    /// </summary>
103    protected override void ProcessNextOperation() {
104      operatorIndex = 1;
105      ProcessNextOperation(myExecutionStack, 0);
106    }
107    private void ProcessNextOperation(Stack<IOperation> stack, int currentOperatorIndex) {
108      IOperation operation = stack.Pop();
109      if (operation is AtomicOperation) {
110        AtomicOperation atomicOperation = (AtomicOperation)operation;
111        IOperation next = null;
112        try {
113          currentOperators[currentOperatorIndex] = atomicOperation.Operator;
114          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
115        }
116        catch (Exception ex) {
117          // push operation on stack again
118          stack.Push(atomicOperation);
119          Abort();
120          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
121        }
122        if (next != null)
123          stack.Push(next);
124        OnOperationExecuted(atomicOperation);
125        if (atomicOperation.Operator.Breakpoint) Abort();
126      } else if (operation is CompositeOperation) {
127        CompositeOperation compositeOperation = (CompositeOperation)operation;
128        if (compositeOperation.ExecuteInParallel) {
129          TaskList list = new TaskList();
130          list.tasks = new Stack<IOperation>[compositeOperation.Operations.Count];
131          for (int i = 0; i < list.tasks.Length; i++) {
132            list.tasks[i] = new Stack<IOperation>();
133            list.tasks[i].Push(compositeOperation.Operations[i]);
134          }
135          list.next = 0;
136          list.semaphore = new Semaphore(0, Workers);
137
138          for (int i = 0; i < Workers; i++)
139            ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessParallelOperation), list);
140          for (int i = 0; i < Workers; i++)
141            list.semaphore.WaitOne();
142          list.semaphore.Close();
143
144          if (Canceled) {
145            // write back not finished tasks
146            CompositeOperation remaining = new CompositeOperation();
147            remaining.ExecuteInParallel = true;
148            for (int i = 0; i < list.tasks.Length; i++) {
149              if (list.tasks[i].Count > 0) {
150                CompositeOperation task = new CompositeOperation();
151                while (list.tasks[i].Count > 0)
152                  task.AddOperation(list.tasks[i].Pop());
153                remaining.AddOperation(task);
154              }
155            }
156            if (remaining.Operations.Count > 0)
157              stack.Push(remaining);
158          }
159        } else {
160          for (int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
161            stack.Push(compositeOperation.Operations[i]);
162        }
163      }
164    }
165    private void ProcessParallelOperation(object state) {
166      TaskList list = (TaskList)state;
167      int currentOperatorIndex, next;
168
169      do {
170        lock (currentOperators) {
171          currentOperatorIndex = operatorIndex;
172          operatorIndex++;
173        }
174        lock (list) {
175          next = list.next;
176          list.next++;
177        }
178
179        if (next < list.tasks.Length) {
180          Stack<IOperation> stack = list.tasks[next];
181          while ((!Canceled) && (stack.Count > 0))
182            ProcessNextOperation(stack, currentOperatorIndex);
183        }
184      } while ((!Canceled) && (next < list.tasks.Length));
185      list.semaphore.Release();
186    }
187
188    /// <summary>
189    /// Occurs when the number of workers has been changed.
190    /// </summary>
191    public event EventHandler WorkersChanged;
192    /// <summary>
193    /// Fires a new <c>WorkersChanged</c> event.
194    /// </summary>
195    protected virtual void OnWorkersChanged() {
196      if (WorkersChanged != null)
197        WorkersChanged(this, new EventArgs());
198    }
199  }
200}
Note: See TracBrowser for help on using the repository browser.