Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
01/03/11 00:46:55 (14 years ago)
Author:
swagner
Message:

Merged ParallelEngine branch back into trunk (#1333)

Location:
trunk/sources
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources

  • trunk/sources/HeuristicLab.DebugEngine/3.3/DebugEngine.cs

    r5117 r5193  
    2323using System.Linq;
    2424using System.Threading;
     25using System.Threading.Tasks;
    2526using HeuristicLab.Common;
    2627using HeuristicLab.Core;
     
    3839    protected DebugEngine(bool deserializing)
    3940      : base(deserializing) {
    40       pausePending = stopPending = false;
    4141      InitializeTimer();
    4242    }
     
    4848      ExecutionStack = cloner.Clone(original.ExecutionStack);
    4949      OperatorTrace = cloner.Clone(original.OperatorTrace);
    50       pausePending = original.pausePending;
    51       stopPending = original.stopPending;
    5250      InitializeTimer();
    5351      currentOperation = cloner.Clone(original.currentOperation);
    54       currentOperator = cloner.Clone(original.currentOperator);
    5552    }
    5653    public DebugEngine()
     
    5956      ExecutionStack = new ExecutionStack();
    6057      OperatorTrace = new OperatorTrace();
    61       pausePending = stopPending = false;
    6258      InitializeTimer();
    6359    }
     
    8682    public OperatorTrace OperatorTrace { get; private set; }
    8783
    88     private bool pausePending, stopPending;
     84    private CancellationTokenSource cancellationTokenSource;
     85    private bool stopPending;
    8986    private DateTime lastUpdateTime;
    9087    private System.Timers.Timer timer;
    91 
    92     [Storable]
    93     private IOperator currentOperator;
    9488
    9589    [Storable]
     
    159153    public virtual void Step(bool skipStackOperations) {
    160154      OnStarted();
     155      cancellationTokenSource = new CancellationTokenSource();
     156      stopPending = false;
    161157      lastUpdateTime = DateTime.Now;
    162158      timer.Start();
    163       ProcessNextOperation(true);
    164       while (skipStackOperations && !(CurrentOperation is IAtomicOperation) && CanContinue)
    165         ProcessNextOperation(true);
     159      try {
     160        ProcessNextOperation(true, cancellationTokenSource.Token);
     161        while (skipStackOperations && !(CurrentOperation is IAtomicOperation) && CanContinue)
     162          ProcessNextOperation(true, cancellationTokenSource.Token);
     163      }
     164      catch (Exception ex) {
     165        OnExceptionOccurred(ex);
     166      }
    166167      timer.Stop();
    167168      ExecutionTime += DateTime.Now - lastUpdateTime;
    168       OnPaused();
     169      cancellationTokenSource.Dispose();
     170      cancellationTokenSource = null;
     171      if (stopPending) ExecutionStack.Clear();
     172      if (stopPending || !CanContinue) OnStopped();
     173      else OnPaused();
    169174    }
    170175
    171176    public override void Start() {
    172177      base.Start();
    173       ThreadPool.QueueUserWorkItem(new WaitCallback(Run), null);
     178      cancellationTokenSource = new CancellationTokenSource();
     179      stopPending = false;
     180      Task task = Task.Factory.StartNew(Run, cancellationTokenSource.Token, cancellationTokenSource.Token);
     181      task.ContinueWith(t => {
     182        try {
     183          t.Wait();
     184        }
     185        catch (AggregateException ex) {
     186          try {
     187            ex.Flatten().Handle(x => x is OperationCanceledException);
     188          }
     189          catch (AggregateException remaining) {
     190            if (remaining.InnerExceptions.Count == 1) OnExceptionOccurred(remaining.InnerExceptions[0]);
     191            else OnExceptionOccurred(remaining);
     192          }
     193        }
     194        cancellationTokenSource.Dispose();
     195        cancellationTokenSource = null;
     196
     197        if (stopPending) ExecutionStack.Clear();
     198        if (stopPending || !CanContinue) OnStopped();
     199        else OnPaused();
     200      });
    174201    }
    175202
     
    181208    public override void Pause() {
    182209      base.Pause();
    183       pausePending = true;
    184       if (currentOperator != null) currentOperator.Abort();
     210      cancellationTokenSource.Cancel();
    185211    }
    186212
     
    193219      CurrentOperation = null;
    194220      base.Stop();
    195       stopPending = true;
    196       if (currentOperator != null) currentOperator.Abort();
    197       if (ExecutionState == ExecutionState.Paused) OnStopped();
     221      if (ExecutionState == ExecutionState.Paused) {
     222        ExecutionStack.Clear();
     223        OnStopped();
     224      } else {
     225        stopPending = true;
     226        cancellationTokenSource.Cancel();
     227      }
    198228    }
    199229
     
    209239
    210240    private void Run(object state) {
     241      CancellationToken cancellationToken = (CancellationToken)state;
     242
    211243      OnStarted();
    212       pausePending = stopPending = false;
    213 
    214244      lastUpdateTime = DateTime.Now;
    215245      timer.Start();
    216       if (!pausePending && !stopPending && CanContinue)
    217         ProcessNextOperation(false);
    218       while (!pausePending && !stopPending && CanContinue && !IsAtBreakpoint)
    219         ProcessNextOperation(false);
    220       timer.Stop();
    221       ExecutionTime += DateTime.Now - lastUpdateTime;
    222 
    223       if (IsAtBreakpoint)
    224         Log.LogMessage(string.Format("Breaking before: {0}", CurrentAtomicOperation.Operator.Name));
    225       if (pausePending || IsAtBreakpoint)
    226         OnPaused();
    227       else
    228         OnStopped();
     246      try {
     247        if (!cancellationToken.IsCancellationRequested && CanContinue)
     248          ProcessNextOperation(false, cancellationToken);
     249        while (!cancellationToken.IsCancellationRequested && CanContinue && !IsAtBreakpoint)
     250          ProcessNextOperation(false, cancellationToken);
     251        cancellationToken.ThrowIfCancellationRequested();
     252      }
     253      finally {
     254        timer.Stop();
     255        ExecutionTime += DateTime.Now - lastUpdateTime;
     256
     257        if (IsAtBreakpoint)
     258          Log.LogMessage(string.Format("Breaking before: {0}", CurrentAtomicOperation.Operator.Name));
     259      }
    229260    }
    230261
     
    247278    /// is pushed on the stack again.<br/>
    248279    /// If the execution was successful <see cref="EngineBase.OnOperationExecuted"/> is called.</remarks>
    249     protected virtual void ProcessNextOperation(bool logOperations) {
    250       try {
    251         IAtomicOperation atomicOperation = CurrentOperation as IAtomicOperation;
    252         OperationCollection operations = CurrentOperation as OperationCollection;
    253         if (atomicOperation != null && operations != null)
    254           throw new InvalidOperationException("Current operation is both atomic and an operation collection");
    255 
    256         if (atomicOperation != null) {
    257           if (logOperations)
    258             Log.LogMessage(string.Format("Performing atomic operation {0}", Utils.Name(atomicOperation)));
    259           PerformAtomicOperation(atomicOperation);
    260         } else if (operations != null) {
    261           if (logOperations)
    262             Log.LogMessage("Expanding operation collection");
    263           ExecutionStack.AddRange(operations.Reverse());
    264           CurrentOperation = null;
    265         } else if (ExecutionStack.Count > 0) {
    266           if (logOperations)
    267             Log.LogMessage("Popping execution stack");
    268           CurrentOperation = ExecutionStack.Last();
    269           ExecutionStack.RemoveAt(ExecutionStack.Count - 1);
    270         } else {
    271           if (logOperations)
    272             Log.LogMessage("Nothing to do");
    273         }
    274         OperatorTrace.Regenerate(CurrentAtomicOperation);
    275       } catch (Exception x) {
    276         OnExceptionOccurred(x);
    277       }
    278     }
    279 
    280     protected virtual void PerformAtomicOperation(IAtomicOperation operation) {
     280    protected virtual void ProcessNextOperation(bool logOperations, CancellationToken cancellationToken) {
     281      IAtomicOperation atomicOperation = CurrentOperation as IAtomicOperation;
     282      OperationCollection operations = CurrentOperation as OperationCollection;
     283      if (atomicOperation != null && operations != null)
     284        throw new InvalidOperationException("Current operation is both atomic and an operation collection");
     285
     286      if (atomicOperation != null) {
     287        if (logOperations)
     288          Log.LogMessage(string.Format("Performing atomic operation {0}", Utils.Name(atomicOperation)));
     289        PerformAtomicOperation(atomicOperation, cancellationToken);
     290      } else if (operations != null) {
     291        if (logOperations)
     292          Log.LogMessage("Expanding operation collection");
     293        ExecutionStack.AddRange(operations.Reverse());
     294        CurrentOperation = null;
     295      } else if (ExecutionStack.Count > 0) {
     296        if (logOperations)
     297          Log.LogMessage("Popping execution stack");
     298        CurrentOperation = ExecutionStack.Last();
     299        ExecutionStack.RemoveAt(ExecutionStack.Count - 1);
     300      } else {
     301        if (logOperations)
     302          Log.LogMessage("Nothing to do");
     303      }
     304      OperatorTrace.Regenerate(CurrentAtomicOperation);
     305    }
     306
     307    protected virtual void PerformAtomicOperation(IAtomicOperation operation, CancellationToken cancellationToken) {
    281308      if (operation != null) {
    282309        try {
    283           currentOperator = operation.Operator;
    284           IOperation successor = operation.Operator.Execute((IExecutionContext)operation);
     310          IOperation successor = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
    285311          if (successor != null) {
    286312            OperatorTrace.RegisterParenthood(operation, successor);
    287313            ExecutionStack.Add(successor);
    288314          }
    289           currentOperator = null;
    290315          CurrentOperation = null;
    291         } catch (Exception ex) {
    292           OnExceptionOccurred(new OperatorExecutionException(operation.Operator, ex));
    293           Pause();
     316        }
     317        catch (Exception ex) {
     318          if (ex is OperationCanceledException) throw ex;
     319          else throw new OperatorExecutionException(operation.Operator, ex);
    294320        }
    295321      }
Note: See TracChangeset for help on using the changeset viewer.