#region License Information /* HeuristicLab * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Text; using System.Xml; using System.Threading; using HeuristicLab.Core; namespace HeuristicLab.ThreadParallelEngine { public class ThreadParallelEngine : EngineBase, IEditable { #region Inner Class Task private class TaskList { public Stack[] tasks; public int next; public Semaphore semaphore; } #endregion // currently executed operators private IOperator[] currentOperators; private int operatorIndex; private int myWorkers; public int Workers { get { return myWorkers; } set { if (value != myWorkers) { myWorkers = value; OnWorkersChanged(); } } } public ThreadParallelEngine() { myWorkers = Environment.ProcessorCount; currentOperators = new IOperator[1000]; } public override object Clone(IDictionary clonedObjects) { ThreadParallelEngine clone = (ThreadParallelEngine)base.Clone(clonedObjects); clone.myWorkers = Workers; return clone; } public override IView CreateView() { return new ThreadParallelEngineEditor(this); } public virtual IEditor CreateEditor() { return new ThreadParallelEngineEditor(this); } public override void ExecuteSteps(int steps) { throw new InvalidOperationException("ThreadParallelEngine doesn't support stepwise execution"); } public override void Abort() { base.Abort(); for (int i = 0; i < currentOperators.Length; i++) { if (currentOperators[i] != null) currentOperators[i].Abort(); } } protected override void ProcessNextOperation() { operatorIndex = 1; ProcessNextOperation(myExecutionStack, 0); } private void ProcessNextOperation(Stack stack, int currentOperatorIndex) { IOperation operation = stack.Pop(); if (operation is AtomicOperation) { AtomicOperation atomicOperation = (AtomicOperation)operation; IOperation next = null; try { currentOperators[currentOperatorIndex] = atomicOperation.Operator; next = atomicOperation.Operator.Execute(atomicOperation.Scope); } catch (Exception ex) { // push operation on stack again stack.Push(atomicOperation); Abort(); ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); } if (next != null) stack.Push(next); OnOperationExecuted(atomicOperation); if (atomicOperation.Operator.Breakpoint) Abort(); } else if (operation is CompositeOperation) { CompositeOperation compositeOperation = (CompositeOperation)operation; if (compositeOperation.ExecuteInParallel) { TaskList list = new TaskList(); list.tasks = new Stack[compositeOperation.Operations.Count]; for (int i = 0; i < list.tasks.Length; i++) { list.tasks[i] = new Stack(); list.tasks[i].Push(compositeOperation.Operations[i]); } list.next = 0; list.semaphore = new Semaphore(0, Workers); for (int i = 0; i < Workers; i++) ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessParallelOperation), list); for (int i = 0; i < Workers; i++) list.semaphore.WaitOne(); list.semaphore.Close(); if (Canceled) { // write back not finished tasks CompositeOperation remaining = new CompositeOperation(); remaining.ExecuteInParallel = true; for (int i = 0; i < list.tasks.Length; i++) { if (list.tasks[i].Count > 0) { CompositeOperation task = new CompositeOperation(); while (list.tasks[i].Count > 0) task.AddOperation(list.tasks[i].Pop()); remaining.AddOperation(task); } } if (remaining.Operations.Count > 0) stack.Push(remaining); } } else { for (int i = compositeOperation.Operations.Count - 1; i >= 0; i--) stack.Push(compositeOperation.Operations[i]); } } } private void ProcessParallelOperation(object state) { TaskList list = (TaskList)state; int currentOperatorIndex, next; do { lock (currentOperators) { currentOperatorIndex = operatorIndex; operatorIndex++; } lock (list) { next = list.next; list.next++; } if (next < list.tasks.Length) { Stack stack = list.tasks[next]; while ((!Canceled) && (stack.Count > 0)) ProcessNextOperation(stack, currentOperatorIndex); } } while ((!Canceled) && (next < list.tasks.Length)); list.semaphore.Release(); } public event EventHandler WorkersChanged; protected virtual void OnWorkersChanged() { if (WorkersChanged != null) WorkersChanged(this, new EventArgs()); } #region Persistence Methods public override XmlNode GetXmlNode(string name, XmlDocument document, IDictionary persistedObjects) { XmlNode node = base.GetXmlNode(name, document, persistedObjects); XmlAttribute workersAttribute = document.CreateAttribute("Workers"); workersAttribute.Value = Workers.ToString(); node.Attributes.Append(workersAttribute); return node; } public override void Populate(XmlNode node, IDictionary restoredObjects) { base.Populate(node, restoredObjects); myWorkers = int.Parse(node.Attributes["Workers"].Value); } #endregion } }