Changeset 5188
- Timestamp:
- 01/02/11 05:32:59 (14 years ago)
- Location:
- branches/ParallelEngine
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/ParallelEngine/HeuristicLab.Core/3.3/Engine.cs
r5187 r5188 45 45 #region Variables for communication between threads 46 46 private CancellationTokenSource cancellationTokenSource; 47 private bool stop Requested;47 private bool stopPending; 48 48 private DateTime lastUpdateTime; 49 49 #endregion … … 86 86 base.Start(); 87 87 cancellationTokenSource = new CancellationTokenSource(); 88 stop Requested= false;88 stopPending = false; 89 89 Task task = Task.Factory.StartNew(Run, cancellationTokenSource.Token, cancellationTokenSource.Token); 90 90 task.ContinueWith(t => { … … 103 103 cancellationTokenSource.Dispose(); 104 104 cancellationTokenSource = null; 105 if (stop Requested) executionStack.Clear();105 if (stopPending) executionStack.Clear(); 106 106 if (executionStack.Count == 0) OnStopped(); 107 107 else OnPaused(); … … 124 124 public override void Stop() { 125 125 base.Stop(); 126 if (ExecutionState == ExecutionState.Paused) OnStopped(); 127 else { 128 stopRequested = true; 126 if (ExecutionState == ExecutionState.Paused) { 127 executionStack.Clear(); 128 OnStopped(); 129 } else { 130 stopPending = true; 129 131 cancellationTokenSource.Cancel(); 130 132 } -
branches/ParallelEngine/HeuristicLab.DebugEngine/3.3/DebugEngine.cs
r5117 r5188 23 23 using System.Linq; 24 24 using System.Threading; 25 using System.Threading.Tasks; 25 26 using HeuristicLab.Common; 26 27 using HeuristicLab.Core; … … 38 39 protected DebugEngine(bool deserializing) 39 40 : base(deserializing) { 40 pausePending = stopPending = false;41 41 InitializeTimer(); 42 42 } … … 48 48 ExecutionStack = cloner.Clone(original.ExecutionStack); 49 49 OperatorTrace = cloner.Clone(original.OperatorTrace); 50 pausePending = original.pausePending;51 stopPending = original.stopPending;52 50 InitializeTimer(); 53 51 currentOperation = cloner.Clone(original.currentOperation); 54 currentOperator = cloner.Clone(original.currentOperator);55 52 } 56 53 public DebugEngine() … … 59 56 ExecutionStack = new ExecutionStack(); 60 57 OperatorTrace = new OperatorTrace(); 61 pausePending = stopPending = false;62 58 InitializeTimer(); 63 59 } … … 86 82 public OperatorTrace OperatorTrace { get; private set; } 87 83 88 private bool pausePending, stopPending; 84 private CancellationTokenSource cancellationTokenSource; 85 private bool stopPending; 89 86 private DateTime lastUpdateTime; 90 87 private System.Timers.Timer timer; 91 92 [Storable]93 private IOperator currentOperator;94 88 95 89 [Storable] … … 159 153 public virtual void Step(bool skipStackOperations) { 160 154 OnStarted(); 155 cancellationTokenSource = new CancellationTokenSource(); 156 stopPending = false; 161 157 lastUpdateTime = DateTime.Now; 162 158 timer.Start(); 163 ProcessNextOperation(true); 164 while (skipStackOperations && !(CurrentOperation is IAtomicOperation) && CanContinue) 165 ProcessNextOperation(true); 159 try { 160 ProcessNextOperation(true, cancellationTokenSource.Token); 161 while (skipStackOperations && !(CurrentOperation is IAtomicOperation) && CanContinue) 162 ProcessNextOperation(true, cancellationTokenSource.Token); 163 } 164 catch (Exception ex) { 165 OnExceptionOccurred(ex); 166 } 166 167 timer.Stop(); 167 168 ExecutionTime += DateTime.Now - lastUpdateTime; 168 OnPaused(); 169 cancellationTokenSource.Dispose(); 170 cancellationTokenSource = null; 171 if (stopPending) ExecutionStack.Clear(); 172 if (stopPending || !CanContinue) OnStopped(); 173 else OnPaused(); 169 174 } 170 175 171 176 public override void Start() { 172 177 base.Start(); 173 ThreadPool.QueueUserWorkItem(new WaitCallback(Run), null); 178 cancellationTokenSource = new CancellationTokenSource(); 179 stopPending = false; 180 Task task = Task.Factory.StartNew(Run, cancellationTokenSource.Token, cancellationTokenSource.Token); 181 task.ContinueWith(t => { 182 try { 183 t.Wait(); 184 } 185 catch (AggregateException ex) { 186 try { 187 ex.Flatten().Handle(x => x is OperationCanceledException); 188 } 189 catch (AggregateException remaining) { 190 if (remaining.InnerExceptions.Count == 1) OnExceptionOccurred(remaining.InnerExceptions[0]); 191 else OnExceptionOccurred(remaining); 192 } 193 } 194 cancellationTokenSource.Dispose(); 195 cancellationTokenSource = null; 196 197 if (stopPending) ExecutionStack.Clear(); 198 if (stopPending || !CanContinue) OnStopped(); 199 else OnPaused(); 200 }); 174 201 } 175 202 … … 181 208 public override void Pause() { 182 209 base.Pause(); 183 pausePending = true; 184 if (currentOperator != null) currentOperator.Abort(); 210 cancellationTokenSource.Cancel(); 185 211 } 186 212 … … 193 219 CurrentOperation = null; 194 220 base.Stop(); 195 stopPending = true; 196 if (currentOperator != null) currentOperator.Abort(); 197 if (ExecutionState == ExecutionState.Paused) OnStopped(); 221 if (ExecutionState == ExecutionState.Paused) { 222 ExecutionStack.Clear(); 223 OnStopped(); 224 } else { 225 stopPending = true; 226 cancellationTokenSource.Cancel(); 227 } 198 228 } 199 229 … … 209 239 210 240 private void Run(object state) { 241 CancellationToken cancellationToken = (CancellationToken)state; 242 211 243 OnStarted(); 212 pausePending = stopPending = false;213 214 244 lastUpdateTime = DateTime.Now; 215 245 timer.Start(); 216 if (!pausePending && !stopPending && CanContinue) 217 ProcessNextOperation(false); 218 while (!pausePending && !stopPending && CanContinue && !IsAtBreakpoint) 219 ProcessNextOperation(false); 220 timer.Stop(); 221 ExecutionTime += DateTime.Now - lastUpdateTime; 222 223 if (IsAtBreakpoint) 224 Log.LogMessage(string.Format("Breaking before: {0}", CurrentAtomicOperation.Operator.Name)); 225 if (pausePending || IsAtBreakpoint) 226 OnPaused(); 227 else 228 OnStopped(); 246 try { 247 if (!cancellationToken.IsCancellationRequested && CanContinue) 248 ProcessNextOperation(false, cancellationToken); 249 while (!cancellationToken.IsCancellationRequested && CanContinue && !IsAtBreakpoint) 250 ProcessNextOperation(false, cancellationToken); 251 cancellationToken.ThrowIfCancellationRequested(); 252 } 253 finally { 254 timer.Stop(); 255 ExecutionTime += DateTime.Now - lastUpdateTime; 256 257 if (IsAtBreakpoint) 258 Log.LogMessage(string.Format("Breaking before: {0}", CurrentAtomicOperation.Operator.Name)); 259 } 229 260 } 230 261 … … 247 278 /// is pushed on the stack again.<br/> 248 279 /// If the execution was successful <see cref="EngineBase.OnOperationExecuted"/> is called.</remarks> 249 protected virtual void ProcessNextOperation(bool logOperations) { 250 try { 251 IAtomicOperation atomicOperation = CurrentOperation as IAtomicOperation; 252 OperationCollection operations = CurrentOperation as OperationCollection; 253 if (atomicOperation != null && operations != null) 254 throw new InvalidOperationException("Current operation is both atomic and an operation collection"); 255 256 if (atomicOperation != null) { 257 if (logOperations) 258 Log.LogMessage(string.Format("Performing atomic operation {0}", Utils.Name(atomicOperation))); 259 PerformAtomicOperation(atomicOperation); 260 } else if (operations != null) { 261 if (logOperations) 262 Log.LogMessage("Expanding operation collection"); 263 ExecutionStack.AddRange(operations.Reverse()); 264 CurrentOperation = null; 265 } else if (ExecutionStack.Count > 0) { 266 if (logOperations) 267 Log.LogMessage("Popping execution stack"); 268 CurrentOperation = ExecutionStack.Last(); 269 ExecutionStack.RemoveAt(ExecutionStack.Count - 1); 270 } else { 271 if (logOperations) 272 Log.LogMessage("Nothing to do"); 273 } 274 OperatorTrace.Regenerate(CurrentAtomicOperation); 275 } catch (Exception x) { 276 OnExceptionOccurred(x); 277 } 278 } 279 280 protected virtual void PerformAtomicOperation(IAtomicOperation operation) { 280 protected virtual void ProcessNextOperation(bool logOperations, CancellationToken cancellationToken) { 281 IAtomicOperation atomicOperation = CurrentOperation as IAtomicOperation; 282 OperationCollection operations = CurrentOperation as OperationCollection; 283 if (atomicOperation != null && operations != null) 284 throw new InvalidOperationException("Current operation is both atomic and an operation collection"); 285 286 if (atomicOperation != null) { 287 if (logOperations) 288 Log.LogMessage(string.Format("Performing atomic operation {0}", Utils.Name(atomicOperation))); 289 PerformAtomicOperation(atomicOperation, cancellationToken); 290 } else if (operations != null) { 291 if (logOperations) 292 Log.LogMessage("Expanding operation collection"); 293 ExecutionStack.AddRange(operations.Reverse()); 294 CurrentOperation = null; 295 } else if (ExecutionStack.Count > 0) { 296 if (logOperations) 297 Log.LogMessage("Popping execution stack"); 298 CurrentOperation = ExecutionStack.Last(); 299 ExecutionStack.RemoveAt(ExecutionStack.Count - 1); 300 } else { 301 if (logOperations) 302 Log.LogMessage("Nothing to do"); 303 } 304 OperatorTrace.Regenerate(CurrentAtomicOperation); 305 } 306 307 protected virtual void PerformAtomicOperation(IAtomicOperation operation, CancellationToken cancellationToken) { 281 308 if (operation != null) { 282 309 try { 283 currentOperator = operation.Operator; 284 IOperation successor = operation.Operator.Execute((IExecutionContext)operation); 310 IOperation successor = operation.Operator.Execute((IExecutionContext)operation, cancellationToken); 285 311 if (successor != null) { 286 312 OperatorTrace.RegisterParenthood(operation, successor); 287 313 ExecutionStack.Add(successor); 288 314 } 289 currentOperator = null;290 315 CurrentOperation = null; 291 } catch (Exception ex) { 292 OnExceptionOccurred(new OperatorExecutionException(operation.Operator, ex)); 293 Pause(); 316 } 317 catch (Exception ex) { 318 if (ex is OperationCanceledException) throw ex; 319 else throw new OperatorExecutionException(operation.Operator, ex); 294 320 } 295 321 }
Note: See TracChangeset
for help on using the changeset viewer.