Changeset 5185 for branches/ParallelEngine
- Timestamp:
- 12/31/10 04:29:19 (14 years ago)
- Location:
- branches/ParallelEngine
- Files:
-
- 14 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/ParallelEngine/HeuristicLab.Core/3.3/Engine.cs
r4722 r5185 23 23 using System.Collections.Generic; 24 24 using System.Threading; 25 using System.Threading.Tasks; 25 26 using HeuristicLab.Common; 26 27 using HeuristicLab.Persistence.Default.CompositeSerializers.Storable; … … 42 43 } 43 44 44 private bool pausePending, stopPending; 45 #region Variables for communication between threads 46 private CancellationTokenSource cancellationTokenSource; 47 private bool stopRequested; 45 48 private DateTime lastUpdateTime; 46 private System.Timers.Timer timer;49 #endregion 47 50 48 51 [StorableConstructor] 49 protected Engine(bool deserializing) 50 : base(deserializing) { 51 pausePending = stopPending = false; 52 timer = new System.Timers.Timer(100); 53 timer.AutoReset = true; 54 timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed); 55 } 52 protected Engine(bool deserializing) : base(deserializing) { } 56 53 protected Engine(Engine original, Cloner cloner) 57 54 : base(original, cloner) { … … 62 59 for (int i = contexts.Length - 1; i >= 0; i--) 63 60 executionStack.Push(cloner.Clone(contexts[i])); 64 pausePending = original.pausePending;65 stopPending = original.stopPending;66 timer = new System.Timers.Timer(100);67 timer.AutoReset = true;68 timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);69 61 } 70 62 protected Engine() … … 72 64 log = new Log(); 73 65 executionStack = new Stack<IOperation>(); 74 pausePending = stopPending = false;75 timer = new System.Timers.Timer(100);76 timer.AutoReset = true;77 timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);78 66 } 79 67 … … 97 85 public override void Start() { 98 86 base.Start(); 99 ThreadPool.QueueUserWorkItem(new WaitCallback(Run), null); 87 cancellationTokenSource = new CancellationTokenSource(); 88 stopRequested = false; 89 Task task = Task.Factory.StartNew(Run, cancellationTokenSource.Token, cancellationTokenSource.Token); 90 task.ContinueWith(t => { 91 try { 92 t.Wait(); 93 } 94 catch (AggregateException ex) { 95 ex.Flatten().Handle(x => { 96 if (!(x is OperationCanceledException)) OnExceptionOccurred(x); 97 return true; 98 }); 99 } 100 cancellationTokenSource.Dispose(); 101 cancellationTokenSource = null; 102 if (stopRequested) { 103 executionStack.Clear(); 104 OnStopped(); 105 } else { 106 if (executionStack.Count == 0) OnStopped(); 107 else OnPaused(); 108 } 109 }); 100 110 } 101 111 protected override void OnStarted() { … … 106 116 public override void Pause() { 107 117 base.Pause(); 108 pausePending = true;118 cancellationTokenSource.Cancel(); 109 119 } 110 120 protected override void OnPaused() { … … 115 125 public override void Stop() { 116 126 base.Stop(); 117 stopPending = true;118 127 if (ExecutionState == ExecutionState.Paused) OnStopped(); 128 else { 129 stopRequested = true; 130 cancellationTokenSource.Cancel(); 131 } 119 132 } 120 133 protected override void OnStopped() { … … 129 142 130 143 private void Run(object state) { 144 CancellationToken cancellationToken = (CancellationToken)state; 145 131 146 OnStarted(); 132 pausePending = stopPending = false; 147 lastUpdateTime = DateTime.Now; 148 System.Timers.Timer timer = new System.Timers.Timer(100); 149 timer.AutoReset = true; 150 timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed); 151 timer.Start(); 152 try { 153 Run(cancellationToken); 154 } 155 finally { 156 timer.Stop(); 157 timer.Dispose(); 158 ExecutionTime += DateTime.Now - lastUpdateTime; 159 } 133 160 134 lastUpdateTime = DateTime.Now; 135 timer.Start(); 136 while (!pausePending && !stopPending && (executionStack.Count > 0)) { 137 ProcessNextOperation(); 138 } 139 timer.Stop(); 140 ExecutionTime += DateTime.Now - lastUpdateTime; 141 142 if (pausePending) OnPaused(); 143 else OnStopped(); 161 cancellationToken.ThrowIfCancellationRequested(); 144 162 } 145 146 protected abstract void ProcessNextOperation(); 163 protected abstract void Run(CancellationToken cancellationToken); 147 164 148 165 private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { -
branches/ParallelEngine/HeuristicLab.Core/3.3/Interfaces/IOperator.cs
r5183 r5185 21 21 22 22 using System; 23 using System.Threading; 23 24 24 25 namespace HeuristicLab.Core { … … 29 30 bool Breakpoint { get; set; } 30 31 31 IOperation Execute(IExecutionContext context); 32 void Abort(); 32 IOperation Execute(IExecutionContext context, CancellationToken cancellationToken); 33 33 34 34 event EventHandler BreakpointChanged; -
branches/ParallelEngine/HeuristicLab.Operators/3.3/Operator.cs
r5183 r5185 53 53 } 54 54 } 55 56 private bool canceled; 57 protected bool Canceled { 58 get { return canceled; } 59 private set { 60 if (value != canceled) { 61 canceled = value; 62 OnCanceledChanged(); 63 } 64 } 55 private CancellationToken cancellationToken; 56 protected CancellationToken CancellationToken { 57 get { return cancellationToken; } 65 58 } 66 59 … … 82 75 : base(deserializing) { 83 76 executionContexts = new Lazy<ThreadLocal<IExecutionContext>>(() => { return new ThreadLocal<IExecutionContext>(); }, LazyThreadSafetyMode.ExecutionAndPublication); 84 canceled = false;85 77 } 86 78 protected Operator(Operator original, Cloner cloner) 87 79 : base(original, cloner) { 88 80 executionContexts = new Lazy<ThreadLocal<IExecutionContext>>(() => { return new ThreadLocal<IExecutionContext>(); }, LazyThreadSafetyMode.ExecutionAndPublication); 89 this.canceled = original.canceled;90 81 this.breakpoint = original.breakpoint; 91 82 } … … 93 84 : base() { 94 85 executionContexts = new Lazy<ThreadLocal<IExecutionContext>>(() => { return new ThreadLocal<IExecutionContext>(); }, LazyThreadSafetyMode.ExecutionAndPublication); 95 canceled = false;96 86 breakpoint = false; 97 87 } … … 99 89 : base(name) { 100 90 executionContexts = new Lazy<ThreadLocal<IExecutionContext>>(() => { return new ThreadLocal<IExecutionContext>(); }, LazyThreadSafetyMode.ExecutionAndPublication); 101 canceled = false;102 91 breakpoint = false; 103 92 } … … 105 94 : base(name, parameters) { 106 95 executionContexts = new Lazy<ThreadLocal<IExecutionContext>>(() => { return new ThreadLocal<IExecutionContext>(); }, LazyThreadSafetyMode.ExecutionAndPublication); 107 canceled = false;108 96 breakpoint = false; 109 97 } … … 111 99 : base(name, description) { 112 100 executionContexts = new Lazy<ThreadLocal<IExecutionContext>>(() => { return new ThreadLocal<IExecutionContext>(); }, LazyThreadSafetyMode.ExecutionAndPublication); 113 canceled = false;114 101 breakpoint = false; 115 102 } … … 117 104 : base(name, description, parameters) { 118 105 executionContexts = new Lazy<ThreadLocal<IExecutionContext>>(() => { return new ThreadLocal<IExecutionContext>(); }, LazyThreadSafetyMode.ExecutionAndPublication); 119 canceled = false;120 106 breakpoint = false; 121 107 } 122 108 123 public virtual IOperation Execute(IExecutionContext context ) {109 public virtual IOperation Execute(IExecutionContext context, CancellationToken cancellationToken) { 124 110 try { 125 Canceled = false;126 111 ExecutionContext = context; 112 this.cancellationToken = cancellationToken; 127 113 foreach (IParameter param in Parameters) 128 114 param.ExecutionContext = context; … … 137 123 } 138 124 } 139 public void Abort() {140 Canceled = true;141 }142 125 public abstract IOperation Apply(); 143 126 144 protected virtual void OnCanceledChanged() { }145 127 public event EventHandler BreakpointChanged; 146 128 protected virtual void OnBreakpointChanged() { -
branches/ParallelEngine/HeuristicLab.Optimization/3.3/UserDefinedProblem.cs
r4722 r5185 30 30 using HeuristicLab.Parameters; 31 31 using HeuristicLab.Persistence.Default.CompositeSerializers.Storable; 32 using System.Threading; 32 33 33 34 namespace HeuristicLab.Optimization { … … 248 249 public bool Breakpoint { get; set; } 249 250 250 public IOperation Execute(IExecutionContext context) { 251 throw new InvalidOperationException("Please choose an appropriate evaluation operator."); 252 } 253 254 public void Abort() { 251 public IOperation Execute(IExecutionContext context, CancellationToken cancellationToken) { 255 252 throw new InvalidOperationException("Please choose an appropriate evaluation operator."); 256 253 } -
branches/ParallelEngine/HeuristicLab.ParallelEngine/3.3/ParallelEngine.cs
r5176 r5185 36 36 [Item("Parallel Engine", "Engine for parallel execution of algorithms using multiple threads (suitable for shared memory systems with multiple cores).")] 37 37 public class ParallelEngine : Engine { 38 private CancellationTokenSource cancellationTokenSource;39 38 private CancellationToken cancellationToken; 40 39 … … 48 47 } 49 48 50 protected override void ProcessNextOperation() { 51 using (cancellationTokenSource = new CancellationTokenSource()) { 52 cancellationToken = cancellationTokenSource.Token; 53 try { 54 ProcessNextOperation(ExecutionStack); 55 } 56 catch (Exception ex) { 57 OnExceptionOccurred(ex); 58 Pause(); 59 } 60 } 61 cancellationTokenSource = null; 49 protected override void Run(CancellationToken cancellationToken) { 50 this.cancellationToken = cancellationToken; 51 Run(ExecutionStack); 62 52 } 63 53 64 private void ProcessOperations(object state) {54 private void Run(object state) { 65 55 Stack<IOperation> executionStack = (Stack<IOperation>)state; 66 while ((executionStack.Count > 0) && (!cancellationToken.IsCancellationRequested)) 67 ProcessNextOperation(executionStack); 68 cancellationToken.ThrowIfCancellationRequested(); 69 } 56 IOperation next; 57 OperationCollection coll; 58 IAtomicOperation operation; 70 59 71 private void ProcessNextOperation(Stack<IOperation> executionStack) { 72 IOperation next = executionStack.Pop(); 73 OperationCollection coll = next as OperationCollection; 74 while (coll != null) { 75 if (coll.Parallel) { 76 Task[] tasks = new Task[coll.Count]; 77 Stack<IOperation>[] stacks = new Stack<IOperation>[coll.Count]; 78 for (int i = 0; i < coll.Count; i++) { 79 stacks[i] = new Stack<IOperation>(); 80 stacks[i].Push(coll[i]); 81 tasks[i] = Task.Factory.StartNew(new Action<object>(ProcessOperations), stacks[i], cancellationToken); 60 while (executionStack.Count > 0) { 61 cancellationToken.ThrowIfCancellationRequested(); 62 63 next = executionStack.Pop(); 64 if (next is OperationCollection) { 65 coll = (OperationCollection)next; 66 if (coll.Parallel) { 67 Task[] tasks = new Task[coll.Count]; 68 Stack<IOperation>[] stacks = new Stack<IOperation>[coll.Count]; 69 for (int i = 0; i < coll.Count; i++) { 70 stacks[i] = new Stack<IOperation>(); 71 stacks[i].Push(coll[i]); 72 tasks[i] = Task.Factory.StartNew(Run, stacks[i], cancellationToken); 73 } 74 try { 75 Task.WaitAll(tasks); 76 } 77 catch (AggregateException ex) { 78 OperationCollection remaining = new OperationCollection() { Parallel = true }; 79 for (int i = 0; i < stacks.Length; i++) { 80 if (stacks[i].Count == 1) 81 remaining.Add(stacks[i].Pop()); 82 if (stacks[i].Count > 1) { 83 OperationCollection ops = new OperationCollection(); 84 while (stacks[i].Count > 0) 85 ops.Add(stacks[i].Pop()); 86 remaining.Add(ops); 87 } 88 } 89 if (remaining.Count > 0) executionStack.Push(remaining); 90 throw ex; 91 } 92 } else { 93 for (int i = coll.Count - 1; i >= 0; i--) 94 if (coll[i] != null) ExecutionStack.Push(coll[i]); 82 95 } 96 } else if (next is IAtomicOperation) { 97 operation = (IAtomicOperation)next; 83 98 try { 84 Task.WaitAll(tasks);99 next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken); 85 100 } 86 catch (AggregateException ex) { 87 OperationCollection remaining = new OperationCollection() { Parallel = true }; 88 for (int i = 0; i < stacks.Length; i++) { 89 if (stacks[i].Count > 0) { 90 OperationCollection ops = new OperationCollection(); 91 while (stacks[i].Count > 0) 92 ops.Add(stacks[i].Pop()); 93 remaining.Add(ops); 94 } 95 } 96 executionStack.Push(remaining); 97 ex.Flatten().Handle(x => x is OperationCanceledException); 98 return; 101 catch (Exception ex) { 102 executionStack.Push(operation); 103 throw new OperatorExecutionException(operation.Operator, ex); 99 104 } 100 101 next = executionStack.Count > 0 ? executionStack.Pop() : null; 102 coll = next as OperationCollection; 103 } else { 104 for (int i = coll.Count - 1; i >= 0; i--) 105 executionStack.Push(coll[i]); 106 next = executionStack.Count > 0 ? executionStack.Pop() : null; 107 coll = next as OperationCollection; 108 } 109 } 110 IAtomicOperation operation = next as IAtomicOperation; 111 if (operation != null) { 112 try { 113 executionStack.Push(operation.Operator.Execute((IExecutionContext)operation)); 114 } 115 catch (Exception ex) { 116 executionStack.Push(operation); 117 throw new OperatorExecutionException(operation.Operator, ex); 105 if (next != null) executionStack.Push(next); 118 106 } 119 107 } 120 108 } 121 122 public override void Pause() {123 base.Pause();124 if (cancellationTokenSource != null) cancellationTokenSource.Cancel();125 }126 public override void Stop() {127 base.Stop();128 if (cancellationTokenSource != null) cancellationTokenSource.Cancel();129 }130 109 } 131 110 } -
branches/ParallelEngine/HeuristicLab.Problems.VehicleRouting/3.3/Encodings/Alba/Crossovers/AlbaPermutationCrossover.cs
r5178 r5185 54 54 IAtomicOperation op = this.ExecutionContext.CreateOperation( 55 55 InnerCrossoverParameter.ActualValue, this.ExecutionContext.Scope); 56 op.Operator.Execute((IExecutionContext)op );56 op.Operator.Execute((IExecutionContext)op, CancellationToken); 57 57 58 58 string childName = InnerCrossoverParameter.ActualValue.ChildParameter.ActualName; -
branches/ParallelEngine/HeuristicLab.Problems.VehicleRouting/3.3/Encodings/Alba/Manipulators/AlbaPermutationManipulator.cs
r5178 r5185 51 51 IAtomicOperation op = this.ExecutionContext.CreateOperation( 52 52 InnerManipulatorParameter.ActualValue, this.ExecutionContext.Scope); 53 op.Operator.Execute((IExecutionContext)op );53 op.Operator.Execute((IExecutionContext)op, CancellationToken); 54 54 } 55 55 } -
branches/ParallelEngine/HeuristicLab.Problems.VehicleRouting/3.3/Encodings/Alba/Moves/AlbaPermutationMoveOperator.cs
r5178 r5185 46 46 PermutationMoveOperatorParameter.PermutationParameter.ActualName = VRPToursParameter.ActualName; 47 47 IAtomicOperation op = this.ExecutionContext.CreateChildOperation(PermutationMoveOperatorParameter); 48 op.Operator.Execute((IExecutionContext)op );48 op.Operator.Execute((IExecutionContext)op, CancellationToken); 49 49 50 50 return next; -
branches/ParallelEngine/HeuristicLab.Problems.VehicleRouting/3.3/Encodings/Alba/Moves/ThreeOpt/AlbaTranslocationMoveMaker.cs
r5178 r5185 68 68 moveMaker.PermutationParameter.ActualName = VRPToursParameter.ActualName; 69 69 IAtomicOperation op = this.ExecutionContext.CreateChildOperation(moveMaker); 70 op.Operator.Execute((IExecutionContext)op );70 op.Operator.Execute((IExecutionContext)op, CancellationToken); 71 71 72 72 return next; -
branches/ParallelEngine/HeuristicLab.Problems.VehicleRouting/3.3/Encodings/Prins/Crossovers/PrinsPermutationCrossover.cs
r5178 r5185 29 29 [Item("PrinsPermutationCrossover", "An operator which crosses two VRP representations using a standard permutation operator. It is implemented as described in Prins, C. (2004). A simple and effective evolutionary algorithm for the vehicle routing problem. Computers & Operations Research, 12:1985-2002.")] 30 30 [StorableClass] 31 public sealed class PrinsPermutationCrossover : PrinsCrossover, IPrinsOperator { 31 public sealed class PrinsPermutationCrossover : PrinsCrossover, IPrinsOperator { 32 32 public IValueLookupParameter<IPermutationCrossover> InnerCrossoverParameter { 33 33 get { return (IValueLookupParameter<IPermutationCrossover>)Parameters["InnerCrossover"]; } … … 52 52 IAtomicOperation op = this.ExecutionContext.CreateOperation( 53 53 InnerCrossoverParameter.ActualValue, this.ExecutionContext.Scope); 54 op.Operator.Execute((IExecutionContext)op );54 op.Operator.Execute((IExecutionContext)op, CancellationToken); 55 55 56 56 string childName = InnerCrossoverParameter.ActualValue.ChildParameter.ActualName; -
branches/ParallelEngine/HeuristicLab.Problems.VehicleRouting/3.3/Encodings/Prins/Manipulators/PrinsPermutationManipulator.cs
r5178 r5185 42 42 public PrinsPermutationManipulator() 43 43 : base() { 44 44 Parameters.Add(new ValueLookupParameter<IPermutationManipulator>("InnerManipulator", "The permutation manipulator.", new TranslocationManipulator())); 45 45 } 46 46 … … 50 50 IAtomicOperation op = this.ExecutionContext.CreateOperation( 51 51 InnerManipulatorParameter.ActualValue, this.ExecutionContext.Scope); 52 op.Operator.Execute((IExecutionContext)op );52 op.Operator.Execute((IExecutionContext)op, CancellationToken); 53 53 } 54 54 } -
branches/ParallelEngine/HeuristicLab.Problems.VehicleRouting/3.3/Encodings/Zhu/Crossovers/ZhuPermutationCrossover.cs
r5178 r5185 54 54 IAtomicOperation op = this.ExecutionContext.CreateOperation( 55 55 InnerCrossoverParameter.ActualValue, this.ExecutionContext.Scope); 56 op.Operator.Execute((IExecutionContext)op );56 op.Operator.Execute((IExecutionContext)op, CancellationToken); 57 57 58 58 string childName = InnerCrossoverParameter.ActualValue.ChildParameter.ActualName; -
branches/ParallelEngine/HeuristicLab.Problems.VehicleRouting/3.3/Encodings/Zhu/Manipulators/ZhuPermutationManipulator.cs
r5178 r5185 52 52 IAtomicOperation op = this.ExecutionContext.CreateOperation( 53 53 InnerManipulatorParameter.ActualValue, this.ExecutionContext.Scope); 54 op.Operator.Execute((IExecutionContext)op );54 op.Operator.Execute((IExecutionContext)op, CancellationToken); 55 55 } 56 56 } -
branches/ParallelEngine/HeuristicLab.SequentialEngine/3.3/SequentialEngine.cs
r4722 r5185 21 21 22 22 using System; 23 using System.Threading; 23 24 using HeuristicLab.Common; 24 25 using HeuristicLab.Core; … … 27 28 namespace HeuristicLab.SequentialEngine { 28 29 /// <summary> 29 /// Represents an engine that executes its steps sequentially, also if they could be executed 30 /// in parallel. 30 /// Engine for sequential execution of algorithms. 31 31 /// </summary> 32 32 [StorableClass] 33 33 [Item("Sequential Engine", "Engine for sequential execution of algorithms.")] 34 34 public class SequentialEngine : Engine { 35 private IOperator currentOperator;36 37 35 [StorableConstructor] 38 36 protected SequentialEngine(bool deserializing) : base(deserializing) { } … … 44 42 } 45 43 46 /// <summary> 47 /// Deals with the next operation, if it is an <see cref="AtomicOperation"/> it is executed, 48 /// if it is a <see cref="CompositeOperation"/> its single operations are pushed on the execution stack. 49 /// </summary> 50 /// <remarks>If an error occurs during the execution the operation is aborted and the operation 51 /// is pushed on the stack again.<br/> 52 /// If the execution was successful <see cref="EngineBase.OnOperationExecuted"/> is called.</remarks> 53 protected override void ProcessNextOperation() { 54 currentOperator = null; 55 IOperation next = ExecutionStack.Pop(); 56 OperationCollection coll = next as OperationCollection; 57 while (coll != null) { 58 for (int i = coll.Count - 1; i >= 0; i--) 59 ExecutionStack.Push(coll[i]); 60 next = ExecutionStack.Count > 0 ? ExecutionStack.Pop() : null; 61 coll = next as OperationCollection; 62 } 63 IAtomicOperation operation = next as IAtomicOperation; 64 if (operation != null) { 65 try { 66 currentOperator = operation.Operator; 67 ExecutionStack.Push(operation.Operator.Execute((IExecutionContext)operation)); 68 } 69 catch (Exception ex) { 70 ExecutionStack.Push(operation); 71 OnExceptionOccurred(new OperatorExecutionException(operation.Operator, ex)); 72 Pause(); 73 } 74 if (operation.Operator.Breakpoint) { 75 Log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName)); 76 Pause(); 44 protected override void Run(CancellationToken cancellationToken) { 45 IOperation next; 46 OperationCollection coll; 47 IAtomicOperation operation; 48 49 while (ExecutionStack.Count > 0) { 50 cancellationToken.ThrowIfCancellationRequested(); 51 52 next = ExecutionStack.Pop(); 53 if (next is OperationCollection) { 54 coll = (OperationCollection)next; 55 for (int i = coll.Count - 1; i >= 0; i--) 56 if (coll[i] != null) ExecutionStack.Push(coll[i]); 57 } else if (next is IAtomicOperation) { 58 operation = (IAtomicOperation)next; 59 try { 60 next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken); 61 } 62 catch (Exception ex) { 63 ExecutionStack.Push(operation); 64 throw new OperatorExecutionException(operation.Operator, ex); 65 } 66 if (next != null) ExecutionStack.Push(next); 67 68 if (operation.Operator.Breakpoint) { 69 Log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName)); 70 Pause(); 71 } 77 72 } 78 73 } 79 74 } 80 81 public override void Pause() {82 base.Pause();83 if (currentOperator != null) currentOperator.Abort();84 }85 public override void Stop() {86 base.Stop();87 if (currentOperator != null) currentOperator.Abort();88 }89 75 } 90 76 }
Note: See TracChangeset
for help on using the changeset viewer.