Changeset 5185 for branches/ParallelEngine/HeuristicLab.ParallelEngine
- Timestamp:
- 12/31/10 04:29:19 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/ParallelEngine/HeuristicLab.ParallelEngine/3.3/ParallelEngine.cs
r5176 r5185 36 36 [Item("Parallel Engine", "Engine for parallel execution of algorithms using multiple threads (suitable for shared memory systems with multiple cores).")] 37 37 public class ParallelEngine : Engine { 38 private CancellationTokenSource cancellationTokenSource;39 38 private CancellationToken cancellationToken; 40 39 … … 48 47 } 49 48 50 protected override void ProcessNextOperation() { 51 using (cancellationTokenSource = new CancellationTokenSource()) { 52 cancellationToken = cancellationTokenSource.Token; 53 try { 54 ProcessNextOperation(ExecutionStack); 55 } 56 catch (Exception ex) { 57 OnExceptionOccurred(ex); 58 Pause(); 59 } 60 } 61 cancellationTokenSource = null; 49 protected override void Run(CancellationToken cancellationToken) { 50 this.cancellationToken = cancellationToken; 51 Run(ExecutionStack); 62 52 } 63 53 64 private void ProcessOperations(object state) {54 private void Run(object state) { 65 55 Stack<IOperation> executionStack = (Stack<IOperation>)state; 66 while ((executionStack.Count > 0) && (!cancellationToken.IsCancellationRequested)) 67 ProcessNextOperation(executionStack); 68 cancellationToken.ThrowIfCancellationRequested(); 69 } 56 IOperation next; 57 OperationCollection coll; 58 IAtomicOperation operation; 70 59 71 private void ProcessNextOperation(Stack<IOperation> executionStack) { 72 IOperation next = executionStack.Pop(); 73 OperationCollection coll = next as OperationCollection; 74 while (coll != null) { 75 if (coll.Parallel) { 76 Task[] tasks = new Task[coll.Count]; 77 Stack<IOperation>[] stacks = new Stack<IOperation>[coll.Count]; 78 for (int i = 0; i < coll.Count; i++) { 79 stacks[i] = new Stack<IOperation>(); 80 stacks[i].Push(coll[i]); 81 tasks[i] = Task.Factory.StartNew(new Action<object>(ProcessOperations), stacks[i], cancellationToken); 60 while (executionStack.Count > 0) { 61 cancellationToken.ThrowIfCancellationRequested(); 62 63 next = executionStack.Pop(); 64 if (next is OperationCollection) { 65 coll = (OperationCollection)next; 66 if (coll.Parallel) { 67 Task[] tasks = new Task[coll.Count]; 68 Stack<IOperation>[] stacks = new Stack<IOperation>[coll.Count]; 69 for (int i = 0; i < coll.Count; i++) { 70 stacks[i] = new Stack<IOperation>(); 71 stacks[i].Push(coll[i]); 72 tasks[i] = Task.Factory.StartNew(Run, stacks[i], cancellationToken); 73 } 74 try { 75 Task.WaitAll(tasks); 76 } 77 catch (AggregateException ex) { 78 OperationCollection remaining = new OperationCollection() { Parallel = true }; 79 for (int i = 0; i < stacks.Length; i++) { 80 if (stacks[i].Count == 1) 81 remaining.Add(stacks[i].Pop()); 82 if (stacks[i].Count > 1) { 83 OperationCollection ops = new OperationCollection(); 84 while (stacks[i].Count > 0) 85 ops.Add(stacks[i].Pop()); 86 remaining.Add(ops); 87 } 88 } 89 if (remaining.Count > 0) executionStack.Push(remaining); 90 throw ex; 91 } 92 } else { 93 for (int i = coll.Count - 1; i >= 0; i--) 94 if (coll[i] != null) ExecutionStack.Push(coll[i]); 82 95 } 96 } else if (next is IAtomicOperation) { 97 operation = (IAtomicOperation)next; 83 98 try { 84 Task.WaitAll(tasks);99 next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken); 85 100 } 86 catch (AggregateException ex) { 87 OperationCollection remaining = new OperationCollection() { Parallel = true }; 88 for (int i = 0; i < stacks.Length; i++) { 89 if (stacks[i].Count > 0) { 90 OperationCollection ops = new OperationCollection(); 91 while (stacks[i].Count > 0) 92 ops.Add(stacks[i].Pop()); 93 remaining.Add(ops); 94 } 95 } 96 executionStack.Push(remaining); 97 ex.Flatten().Handle(x => x is OperationCanceledException); 98 return; 101 catch (Exception ex) { 102 executionStack.Push(operation); 103 throw new OperatorExecutionException(operation.Operator, ex); 99 104 } 100 101 next = executionStack.Count > 0 ? executionStack.Pop() : null; 102 coll = next as OperationCollection; 103 } else { 104 for (int i = coll.Count - 1; i >= 0; i--) 105 executionStack.Push(coll[i]); 106 next = executionStack.Count > 0 ? executionStack.Pop() : null; 107 coll = next as OperationCollection; 108 } 109 } 110 IAtomicOperation operation = next as IAtomicOperation; 111 if (operation != null) { 112 try { 113 executionStack.Push(operation.Operator.Execute((IExecutionContext)operation)); 114 } 115 catch (Exception ex) { 116 executionStack.Push(operation); 117 throw new OperatorExecutionException(operation.Operator, ex); 105 if (next != null) executionStack.Push(next); 118 106 } 119 107 } 120 108 } 121 122 public override void Pause() {123 base.Pause();124 if (cancellationTokenSource != null) cancellationTokenSource.Cancel();125 }126 public override void Stop() {127 base.Stop();128 if (cancellationTokenSource != null) cancellationTokenSource.Cancel();129 }130 109 } 131 110 }
Note: See TracChangeset
for help on using the changeset viewer.