Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
12/31/10 04:29:19 (13 years ago)
Author:
swagner
Message:

Worked on cancellation and refactored code (#1333)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/ParallelEngine/HeuristicLab.ParallelEngine/3.3/ParallelEngine.cs

    r5176 r5185  
    3636  [Item("Parallel Engine", "Engine for parallel execution of algorithms using multiple threads (suitable for shared memory systems with multiple cores).")]
    3737  public class ParallelEngine : Engine {
    38     private CancellationTokenSource cancellationTokenSource;
    3938    private CancellationToken cancellationToken;
    4039
     
    4847    }
    4948
    50     protected override void ProcessNextOperation() {
    51       using (cancellationTokenSource = new CancellationTokenSource()) {
    52         cancellationToken = cancellationTokenSource.Token;
    53         try {
    54           ProcessNextOperation(ExecutionStack);
    55         }
    56         catch (Exception ex) {
    57           OnExceptionOccurred(ex);
    58           Pause();
    59         }
    60       }
    61       cancellationTokenSource = null;
     49    protected override void Run(CancellationToken cancellationToken) {
     50      this.cancellationToken = cancellationToken;
     51      Run(ExecutionStack);
    6252    }
    6353
    64     private void ProcessOperations(object state) {
     54    private void Run(object state) {
    6555      Stack<IOperation> executionStack = (Stack<IOperation>)state;
    66       while ((executionStack.Count > 0) && (!cancellationToken.IsCancellationRequested))
    67         ProcessNextOperation(executionStack);
    68       cancellationToken.ThrowIfCancellationRequested();
    69     }
     56      IOperation next;
     57      OperationCollection coll;
     58      IAtomicOperation operation;
    7059
    71     private void ProcessNextOperation(Stack<IOperation> executionStack) {
    72       IOperation next = executionStack.Pop();
    73       OperationCollection coll = next as OperationCollection;
    74       while (coll != null) {
    75         if (coll.Parallel) {
    76           Task[] tasks = new Task[coll.Count];
    77           Stack<IOperation>[] stacks = new Stack<IOperation>[coll.Count];
    78           for (int i = 0; i < coll.Count; i++) {
    79             stacks[i] = new Stack<IOperation>();
    80             stacks[i].Push(coll[i]);
    81             tasks[i] = Task.Factory.StartNew(new Action<object>(ProcessOperations), stacks[i], cancellationToken);
     60      while (executionStack.Count > 0) {
     61        cancellationToken.ThrowIfCancellationRequested();
     62
     63        next = executionStack.Pop();
     64        if (next is OperationCollection) {
     65          coll = (OperationCollection)next;
     66          if (coll.Parallel) {
     67            Task[] tasks = new Task[coll.Count];
     68            Stack<IOperation>[] stacks = new Stack<IOperation>[coll.Count];
     69            for (int i = 0; i < coll.Count; i++) {
     70              stacks[i] = new Stack<IOperation>();
     71              stacks[i].Push(coll[i]);
     72              tasks[i] = Task.Factory.StartNew(Run, stacks[i], cancellationToken);
     73            }
     74            try {
     75              Task.WaitAll(tasks);
     76            }
     77            catch (AggregateException ex) {
     78              OperationCollection remaining = new OperationCollection() { Parallel = true };
     79              for (int i = 0; i < stacks.Length; i++) {
     80                if (stacks[i].Count == 1)
     81                  remaining.Add(stacks[i].Pop());
     82                if (stacks[i].Count > 1) {
     83                  OperationCollection ops = new OperationCollection();
     84                  while (stacks[i].Count > 0)
     85                    ops.Add(stacks[i].Pop());
     86                  remaining.Add(ops);
     87                }
     88              }
     89              if (remaining.Count > 0) executionStack.Push(remaining);
     90              throw ex;
     91            }
     92          } else {
     93            for (int i = coll.Count - 1; i >= 0; i--)
     94              if (coll[i] != null) ExecutionStack.Push(coll[i]);
    8295          }
     96        } else if (next is IAtomicOperation) {
     97          operation = (IAtomicOperation)next;
    8398          try {
    84             Task.WaitAll(tasks);
     99            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
    85100          }
    86           catch (AggregateException ex) {
    87             OperationCollection remaining = new OperationCollection() { Parallel = true };
    88             for (int i = 0; i < stacks.Length; i++) {
    89               if (stacks[i].Count > 0) {
    90                 OperationCollection ops = new OperationCollection();
    91                 while (stacks[i].Count > 0)
    92                   ops.Add(stacks[i].Pop());
    93                 remaining.Add(ops);
    94               }
    95             }
    96             executionStack.Push(remaining);
    97             ex.Flatten().Handle(x => x is OperationCanceledException);
    98             return;
     101          catch (Exception ex) {
     102            executionStack.Push(operation);
     103            throw new OperatorExecutionException(operation.Operator, ex);
    99104          }
    100 
    101           next = executionStack.Count > 0 ? executionStack.Pop() : null;
    102           coll = next as OperationCollection;
    103         } else {
    104           for (int i = coll.Count - 1; i >= 0; i--)
    105             executionStack.Push(coll[i]);
    106           next = executionStack.Count > 0 ? executionStack.Pop() : null;
    107           coll = next as OperationCollection;
    108         }
    109       }
    110       IAtomicOperation operation = next as IAtomicOperation;
    111       if (operation != null) {
    112         try {
    113           executionStack.Push(operation.Operator.Execute((IExecutionContext)operation));
    114         }
    115         catch (Exception ex) {
    116           executionStack.Push(operation);
    117           throw new OperatorExecutionException(operation.Operator, ex);
     105          if (next != null) executionStack.Push(next);
    118106        }
    119107      }
    120108    }
    121 
    122     public override void Pause() {
    123       base.Pause();
    124       if (cancellationTokenSource != null) cancellationTokenSource.Cancel();
    125     }
    126     public override void Stop() {
    127       base.Stop();
    128       if (cancellationTokenSource != null) cancellationTokenSource.Cancel();
    129     }
    130109  }
    131110}
Note: See TracChangeset for help on using the changeset viewer.