Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.ThreadParallelEngine/3.3/ThreadParallelEngine.cs @ 2636

Last change on this file since 2636 was 2546, checked in by swagner, 15 years ago

Continued work on Optimizer and on adapting all views to the new MainForm concept (#770)

File size: 7.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Xml;
26using System.Threading;
27using HeuristicLab.Core;
28using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
29
30namespace HeuristicLab.ThreadParallelEngine {
31  /// <summary>
32  /// Implementation of an engine being able to run in parallel with threads.
33  /// </summary>
34  [Creatable("Engines")]
35  [Item("Parallel Engine", "Engine for parallel execution of algorithms using multiple threads.")]
36  public class ThreadParallelEngine : EngineBase {
37    #region Inner Class Task
38    private class TaskList {
39      public Stack<IOperation>[] tasks;
40      public int next;
41      public Semaphore semaphore;
42    }
43    #endregion
44
45    // currently executed operators
46    private IOperator[] currentOperators;
47    private int operatorIndex;
48
49    [Storable]
50    private int myWorkers;
51    /// <summary>
52    /// Gets or sets the number of worker threads of the current engine.
53    /// </summary>
54    /// <remarks>Calls <see cref="OnWorkersChanged"/> in the setter.</remarks>
55    public int Workers {
56      get { return myWorkers; }
57      set {
58        if (value != myWorkers) {
59          myWorkers = value;
60          OnWorkersChanged();
61        }
62      }
63    }
64
65    /// <summary>
66    /// Initializes a new instance of <see cref="ThreadParallelEngine"/> with the number of processors
67    /// as number of worker threads.
68    /// </summary>
69    public ThreadParallelEngine() {
70      myWorkers = Environment.ProcessorCount;
71      currentOperators = new IOperator[1000];
72    }
73
74
75    /// <inheritdoc/>
76    /// <returns>The cloned object as <see cref="ThreadParallelEngine"/>.</returns>
77    public override IItem Clone(ICloner cloner) {
78      ThreadParallelEngine clone = (ThreadParallelEngine)base.Clone(cloner);
79      clone.myWorkers = Workers;
80      return clone;
81    }
82
83    /// <summary>
84    /// This execution method is not supported by ThreadParallelEngines.
85    /// </summary>
86    /// <exception cref="InvalidOperationException">Thrown because the current instance of an engine
87    /// does not support stepwise execution.</exception>
88    /// <param name="steps">The number of steps to execute.</param>
89    public override void ExecuteSteps(int steps) {
90      throw new InvalidOperationException("ThreadParallelEngine doesn't support stepwise execution");
91    }
92
93    /// <inheritdoc/>
94    public override void Abort() {
95      base.Abort();
96      for (int i = 0; i < currentOperators.Length; i++) {
97        if (currentOperators[i] != null)
98          currentOperators[i].Abort();
99      }
100    }
101    /// <summary>
102    /// Processes the next operation (if it is a compositeOperation and it can be executed in parallel it is
103    /// done).
104    /// </summary>
105    protected override void ProcessNextOperation() {
106      operatorIndex = 1;
107      ProcessNextOperation(myExecutionStack, 0);
108    }
109    private void ProcessNextOperation(Stack<IOperation> stack, int currentOperatorIndex) {
110      IOperation operation = stack.Pop();
111      if (operation is AtomicOperation) {
112        AtomicOperation atomicOperation = (AtomicOperation)operation;
113        IOperation next = null;
114        try {
115          currentOperators[currentOperatorIndex] = atomicOperation.Operator;
116          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
117        }
118        catch (Exception ex) {
119          // push operation on stack again
120          stack.Push(atomicOperation);
121          Abort();
122          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
123        }
124        if (next != null)
125          stack.Push(next);
126        OnOperationExecuted(atomicOperation);
127        if (atomicOperation.Operator.Breakpoint) Abort();
128      } else if (operation is CompositeOperation) {
129        CompositeOperation compositeOperation = (CompositeOperation)operation;
130        if (compositeOperation.ExecuteInParallel) {
131          TaskList list = new TaskList();
132          list.tasks = new Stack<IOperation>[compositeOperation.Operations.Count];
133          for (int i = 0; i < list.tasks.Length; i++) {
134            list.tasks[i] = new Stack<IOperation>();
135            list.tasks[i].Push(compositeOperation.Operations[i]);
136          }
137          list.next = 0;
138          list.semaphore = new Semaphore(0, Workers);
139
140          for (int i = 0; i < Workers; i++)
141            ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessParallelOperation), list);
142          for (int i = 0; i < Workers; i++)
143            list.semaphore.WaitOne();
144          list.semaphore.Close();
145
146          if (Canceled) {
147            // write back not finished tasks
148            CompositeOperation remaining = new CompositeOperation();
149            remaining.ExecuteInParallel = true;
150            for (int i = 0; i < list.tasks.Length; i++) {
151              if (list.tasks[i].Count > 0) {
152                CompositeOperation task = new CompositeOperation();
153                while (list.tasks[i].Count > 0)
154                  task.AddOperation(list.tasks[i].Pop());
155                remaining.AddOperation(task);
156              }
157            }
158            if (remaining.Operations.Count > 0)
159              stack.Push(remaining);
160          }
161        } else {
162          for (int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
163            stack.Push(compositeOperation.Operations[i]);
164        }
165      }
166    }
167    private void ProcessParallelOperation(object state) {
168      TaskList list = (TaskList)state;
169      int currentOperatorIndex, next;
170
171      do {
172        lock (currentOperators) {
173          currentOperatorIndex = operatorIndex;
174          operatorIndex++;
175        }
176        lock (list) {
177          next = list.next;
178          list.next++;
179        }
180
181        if (next < list.tasks.Length) {
182          Stack<IOperation> stack = list.tasks[next];
183          while ((!Canceled) && (stack.Count > 0))
184            ProcessNextOperation(stack, currentOperatorIndex);
185        }
186      } while ((!Canceled) && (next < list.tasks.Length));
187      list.semaphore.Release();
188    }
189
190    /// <summary>
191    /// Occurs when the number of workers has been changed.
192    /// </summary>
193    public event EventHandler WorkersChanged;
194    /// <summary>
195    /// Fires a new <c>WorkersChanged</c> event.
196    /// </summary>
197    protected virtual void OnWorkersChanged() {
198      if (WorkersChanged != null)
199        WorkersChanged(this, new EventArgs());
200    }
201  }
202}
Note: See TracBrowser for help on using the repository browser.