Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/26/17 09:45:36 (7 years ago)
Author:
jkarder
Message:

#2258: refactored async methods

  • synchronously called IExecutables are now executed in the caller's thread
  • removed old synchronization code from unit tests
Location:
branches/Async/HeuristicLab.Algorithms.DataAnalysis/3.4
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • branches/Async/HeuristicLab.Algorithms.DataAnalysis/3.4/CrossValidation.cs

    r13354 r15065  
    4040  [StorableClass]
    4141  public sealed class CrossValidation : ParameterizedNamedItem, IAlgorithm, IStorableContent {
    42     private readonly ManualResetEvent signaler = new ManualResetEvent(true);
    43     private CancellationToken cancellationToken;
     42    private SemaphoreSlim ticket;
     43    private ManualResetEventSlim signal;
    4444
    4545    public CrossValidation()
     
    269269      Prepare();
    270270    }
     271
    271272    public void Start() {
    272       StartAsync().Wait();
    273     }
    274     public async Task StartAsync() {
    275       await StartAsync(CancellationToken.None);
    276     }
     273      Start(CancellationToken.None);
     274    }
     275    public void Start(CancellationToken cancellationToken) {
     276      if ((ExecutionState != ExecutionState.Prepared) && (ExecutionState != ExecutionState.Paused))
     277        throw new InvalidOperationException(string.Format("Start not allowed in execution state \"{0}\".", ExecutionState));
     278
     279      if (Algorithm == null) return;
     280      //create cloned algorithms
     281      if (clonedAlgorithms.Count == 0) {
     282        int testSamplesCount = (SamplesEnd.Value - SamplesStart.Value) / Folds.Value;
     283
     284        for (int i = 0; i < Folds.Value; i++) {
     285          IAlgorithm clonedAlgorithm = (IAlgorithm)algorithm.Clone();
     286          clonedAlgorithm.Name = algorithm.Name + " Fold " + i;
     287          IDataAnalysisProblem problem = clonedAlgorithm.Problem as IDataAnalysisProblem;
     288          ISymbolicDataAnalysisProblem symbolicProblem = problem as ISymbolicDataAnalysisProblem;
     289
     290          int testStart = (i * testSamplesCount) + SamplesStart.Value;
     291          int testEnd = (i + 1) == Folds.Value ? SamplesEnd.Value : (i + 1) * testSamplesCount + SamplesStart.Value;
     292
     293          problem.ProblemData.TrainingPartition.Start = SamplesStart.Value;
     294          problem.ProblemData.TrainingPartition.End = SamplesEnd.Value;
     295          problem.ProblemData.TestPartition.Start = testStart;
     296          problem.ProblemData.TestPartition.End = testEnd;
     297          DataAnalysisProblemData problemData = problem.ProblemData as DataAnalysisProblemData;
     298          if (problemData != null) {
     299            problemData.TrainingPartitionParameter.Hidden = false;
     300            problemData.TestPartitionParameter.Hidden = false;
     301          }
     302
     303          if (symbolicProblem != null) {
     304            symbolicProblem.FitnessCalculationPartition.Start = SamplesStart.Value;
     305            symbolicProblem.FitnessCalculationPartition.End = SamplesEnd.Value;
     306          }
     307          clonedAlgorithm.Prepare();
     308          clonedAlgorithms.Add(clonedAlgorithm);
     309        }
     310      }
     311
     312      OnStarted();
     313      ticket = new SemaphoreSlim(NumberOfWorkers.Value);
     314      signal = new ManualResetEventSlim(false);
     315
     316      //start prepared or paused cloned algorithms
     317      foreach (IAlgorithm clonedAlgorithm in clonedAlgorithms) {
     318        if (pausePending || stopPending) break;
     319        if (clonedAlgorithm.ExecutionState == ExecutionState.Prepared ||
     320            clonedAlgorithm.ExecutionState == ExecutionState.Paused) {
     321          ticket.Wait();
     322          clonedAlgorithm.StartAsync(cancellationToken);
     323        }
     324      }
     325
     326      signal.Wait();
     327      if (pausePending) OnPaused();
     328      else OnStopped();
     329    }
     330
     331    public async Task StartAsync() { await StartAsync(CancellationToken.None); }
    277332    public async Task StartAsync(CancellationToken cancellationToken) {
    278       this.cancellationToken = cancellationToken;
    279       signaler.Reset();
    280       await Task.Run(() => {
    281         if ((ExecutionState != ExecutionState.Prepared) && (ExecutionState != ExecutionState.Paused))
    282           throw new InvalidOperationException(string.Format("Start not allowed in execution state \"{0}\".", ExecutionState));
    283 
    284         if (Algorithm != null) {
    285           //create cloned algorithms
    286           if (clonedAlgorithms.Count == 0) {
    287             int testSamplesCount = (SamplesEnd.Value - SamplesStart.Value) / Folds.Value;
    288 
    289             for (int i = 0; i < Folds.Value; i++) {
    290               IAlgorithm clonedAlgorithm = (IAlgorithm)algorithm.Clone();
    291               clonedAlgorithm.Name = algorithm.Name + " Fold " + i;
    292               IDataAnalysisProblem problem = clonedAlgorithm.Problem as IDataAnalysisProblem;
    293               ISymbolicDataAnalysisProblem symbolicProblem = problem as ISymbolicDataAnalysisProblem;
    294 
    295               int testStart = (i * testSamplesCount) + SamplesStart.Value;
    296               int testEnd = (i + 1) == Folds.Value ? SamplesEnd.Value : (i + 1) * testSamplesCount + SamplesStart.Value;
    297 
    298               problem.ProblemData.TrainingPartition.Start = SamplesStart.Value;
    299               problem.ProblemData.TrainingPartition.End = SamplesEnd.Value;
    300               problem.ProblemData.TestPartition.Start = testStart;
    301               problem.ProblemData.TestPartition.End = testEnd;
    302               DataAnalysisProblemData problemData = problem.ProblemData as DataAnalysisProblemData;
    303               if (problemData != null) {
    304                 problemData.TrainingPartitionParameter.Hidden = false;
    305                 problemData.TestPartitionParameter.Hidden = false;
    306               }
    307 
    308               if (symbolicProblem != null) {
    309                 symbolicProblem.FitnessCalculationPartition.Start = SamplesStart.Value;
    310                 symbolicProblem.FitnessCalculationPartition.End = SamplesEnd.Value;
    311               }
    312               clonedAlgorithm.Prepare();
    313               clonedAlgorithms.Add(clonedAlgorithm);
    314             }
    315           }
    316 
    317           //start prepared or paused cloned algorithms
    318           int startedAlgorithms = 0;
    319           foreach (IAlgorithm clonedAlgorithm in clonedAlgorithms) {
    320             if (startedAlgorithms < NumberOfWorkers.Value) {
    321               if (clonedAlgorithm.ExecutionState == ExecutionState.Prepared ||
    322                   clonedAlgorithm.ExecutionState == ExecutionState.Paused) {
    323 
    324                 // start and wait until the alg is started
    325                 using (var signal = new ManualResetEvent(false)) {
    326                   EventHandler signalSetter = (sender, args) => { signal.Set(); };
    327                   clonedAlgorithm.Started += signalSetter;
    328                   clonedAlgorithm.StartAsync(cancellationToken);
    329                   signal.WaitOne();
    330                   clonedAlgorithm.Started -= signalSetter;
    331 
    332                   startedAlgorithms++;
    333                 }
    334               }
    335             }
    336           }
    337           OnStarted();
    338         }
    339       }, cancellationToken);
     333      await Task.Factory.StartNew((ct) => Start((CancellationToken)ct), cancellationToken, cancellationToken);
    340334    }
    341335
     
    346340      if (!pausePending) {
    347341        pausePending = true;
    348         PauseAllClonedAlgorithms();
    349       }
    350     }
    351     private void PauseAllClonedAlgorithms() {
    352       foreach (IAlgorithm clonedAlgorithm in clonedAlgorithms) {
    353         if (clonedAlgorithm.ExecutionState == ExecutionState.Started)
    354           clonedAlgorithm.Pause();
     342        var toPause = clonedAlgorithms.Where(x => x.ExecutionState == ExecutionState.Started);
     343        if (toPause.Any()) {
     344          foreach (var optimizer in toPause) {
     345            // a race-condition may occur when the optimizer has changed the state by itself in the meantime
     346            try { optimizer.Pause(); } catch (InvalidOperationException) { }
     347          }
     348        }
     349        if (ExecutionState != ExecutionState.Paused) OnPaused();
    355350      }
    356351    }
     
    363358      if (!stopPending) {
    364359        stopPending = true;
    365         StopAllClonedAlgorithms();
    366       }
    367     }
    368     private void StopAllClonedAlgorithms() {
    369       foreach (IAlgorithm clonedAlgorithm in clonedAlgorithms) {
    370         if (clonedAlgorithm.ExecutionState == ExecutionState.Started ||
    371             clonedAlgorithm.ExecutionState == ExecutionState.Paused)
    372           clonedAlgorithm.Stop();
     360        var toStop = clonedAlgorithms.Where(x => x.ExecutionState == ExecutionState.Started || x.ExecutionState == ExecutionState.Paused);
     361        if (toStop.Any()) {
     362          foreach (var optimizer in toStop) {
     363            // a race-condition may occur when the optimizer has changed the state by itself in the meantime
     364            try { optimizer.Stop(); } catch (InvalidOperationException) { }
     365          }
     366        }
     367        if (ExecutionState != ExecutionState.Stopped) OnStopped();
    373368      }
    374369    }
     
    602597    private void Algorithm_ExecutionStateChanged(object sender, EventArgs e) {
    603598      switch (Algorithm.ExecutionState) {
    604         case ExecutionState.Prepared: OnPrepared();
     599        case ExecutionState.Prepared:
     600          OnPrepared();
    605601          break;
    606602        case ExecutionState.Started: throw new InvalidOperationException("Algorithm template can not be started.");
    607603        case ExecutionState.Paused: throw new InvalidOperationException("Algorithm template can not be paused.");
    608         case ExecutionState.Stopped: OnStopped();
     604        case ExecutionState.Stopped:
     605          OnStopped();
    609606          break;
    610607      }
     
    674671    private void ClonedAlgorithm_Paused(object sender, EventArgs e) {
    675672      lock (locker) {
    676         if (pausePending && clonedAlgorithms.All(alg => alg.ExecutionState != ExecutionState.Started))
    677           OnPaused();
     673        if (ExecutionState != ExecutionState.Paused) {
     674          if (clonedAlgorithms.All(alg => alg.ExecutionState != ExecutionState.Started)) {
     675            pausePending = true;
     676            signal.Set();
     677            ticket.Release();
     678          }
     679        }
    678680      }
    679681    }
     
    681683    private void ClonedAlgorithm_Stopped(object sender, EventArgs e) {
    682684      lock (locker) {
    683         if (!stopPending && ExecutionState == ExecutionState.Started) {
    684           IAlgorithm preparedAlgorithm = clonedAlgorithms.FirstOrDefault(alg => alg.ExecutionState == ExecutionState.Prepared ||
    685                                                                                 alg.ExecutionState == ExecutionState.Paused);
    686           if (preparedAlgorithm != null) preparedAlgorithm.StartAsync(cancellationToken);
    687         }
    688685        if (ExecutionState != ExecutionState.Stopped) {
    689           if (clonedAlgorithms.All(alg => alg.ExecutionState == ExecutionState.Stopped))
    690             OnStopped();
    691           else if (stopPending &&
    692                    clonedAlgorithms.All(
    693                      alg => alg.ExecutionState == ExecutionState.Prepared || alg.ExecutionState == ExecutionState.Stopped))
    694             OnStopped();
     686          if (clonedAlgorithms.All(alg => alg.ExecutionState == ExecutionState.Stopped || stopPending && alg.ExecutionState == ExecutionState.Prepared)) {
     687            stopPending = true;
     688            signal.Set();
     689          }
     690          ticket.Release();
    695691        }
    696692      }
     
    727723      pausePending = false;
    728724      ExecutionState = ExecutionState.Paused;
    729       signaler.Set();
    730725      EventHandler handler = Paused;
    731726      if (handler != null) handler(this, EventArgs.Empty);
     
    740735      runs.Add(new Run(string.Format("{0} Run {1}", Name, runsCounter), this));
    741736      ExecutionState = ExecutionState.Stopped;
    742       signaler.Set();
    743737      EventHandler handler = Stopped;
    744738      if (handler != null) handler(this, EventArgs.Empty);
  • branches/Async/HeuristicLab.Algorithms.DataAnalysis/3.4/FixedDataAnalysisAlgorithm.cs

    r13349 r15065  
    2222using System;
    2323using System.Threading;
    24 using System.Threading.Tasks;
    2524using HeuristicLab.Common;
    2625using HeuristicLab.Optimization;
     
    7069    }
    7170
    72     public override async Task StartAsync(CancellationToken cancellationToken) {
    73       await base.StartAsync(cancellationToken);
    74       var cancellationTokenSource = new CancellationTokenSource();
     71    public override void Start(CancellationToken cancellationToken) {
     72      base.Start(cancellationToken);
     73      OnStarted();
    7574
    76       OnStarted();
    77       using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenSource.Token, cancellationToken)) {
    78         Task task = Task.Factory.StartNew(Run, cts.Token, cts.Token);
    79         await task.ContinueWith(t => {
    80           try {
    81             t.Wait();
    82           }
    83           catch (AggregateException ex) {
    84             try {
    85               ex.Flatten().Handle(x => x is OperationCanceledException);
    86             }
    87             catch (AggregateException remaining) {
    88               if (remaining.InnerExceptions.Count == 1) OnExceptionOccurred(remaining.InnerExceptions[0]);
    89               else OnExceptionOccurred(remaining);
    90             }
    91           }
    92           cancellationTokenSource.Dispose();
    93           cancellationTokenSource = null;
    94           OnStopped();
    95         });
     75      try {
     76        Run(cancellationToken);
     77      } catch (OperationCanceledException) {
     78      } catch (AggregateException ae) {
     79        if (ae.InnerExceptions.Count == 1) OnExceptionOccurred(ae.InnerExceptions[0]);
     80        else OnExceptionOccurred(ae);
     81      } catch (Exception e) {
     82        OnExceptionOccurred(e);
    9683      }
     84
     85      OnStopped();
    9786    }
    9887    private void Run(object state) {
Note: See TracChangeset for help on using the changeset viewer.