Changeset 15065 for branches/Async/HeuristicLab.Algorithms.DataAnalysis/3.4
- Timestamp:
- 06/26/17 09:45:36 (7 years ago)
- Location:
- branches/Async/HeuristicLab.Algorithms.DataAnalysis/3.4
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/Async/HeuristicLab.Algorithms.DataAnalysis/3.4/CrossValidation.cs
r13354 r15065 40 40 [StorableClass] 41 41 public sealed class CrossValidation : ParameterizedNamedItem, IAlgorithm, IStorableContent { 42 private readonly ManualResetEvent signaler = new ManualResetEvent(true);43 private CancellationToken cancellationToken;42 private SemaphoreSlim ticket; 43 private ManualResetEventSlim signal; 44 44 45 45 public CrossValidation() … … 269 269 Prepare(); 270 270 } 271 271 272 public void Start() { 272 StartAsync().Wait(); 273 } 274 public async Task StartAsync() { 275 await StartAsync(CancellationToken.None); 276 } 273 Start(CancellationToken.None); 274 } 275 public void Start(CancellationToken cancellationToken) { 276 if ((ExecutionState != ExecutionState.Prepared) && (ExecutionState != ExecutionState.Paused)) 277 throw new InvalidOperationException(string.Format("Start not allowed in execution state \"{0}\".", ExecutionState)); 278 279 if (Algorithm == null) return; 280 //create cloned algorithms 281 if (clonedAlgorithms.Count == 0) { 282 int testSamplesCount = (SamplesEnd.Value - SamplesStart.Value) / Folds.Value; 283 284 for (int i = 0; i < Folds.Value; i++) { 285 IAlgorithm clonedAlgorithm = (IAlgorithm)algorithm.Clone(); 286 clonedAlgorithm.Name = algorithm.Name + " Fold " + i; 287 IDataAnalysisProblem problem = clonedAlgorithm.Problem as IDataAnalysisProblem; 288 ISymbolicDataAnalysisProblem symbolicProblem = problem as ISymbolicDataAnalysisProblem; 289 290 int testStart = (i * testSamplesCount) + SamplesStart.Value; 291 int testEnd = (i + 1) == Folds.Value ? SamplesEnd.Value : (i + 1) * testSamplesCount + SamplesStart.Value; 292 293 problem.ProblemData.TrainingPartition.Start = SamplesStart.Value; 294 problem.ProblemData.TrainingPartition.End = SamplesEnd.Value; 295 problem.ProblemData.TestPartition.Start = testStart; 296 problem.ProblemData.TestPartition.End = testEnd; 297 DataAnalysisProblemData problemData = problem.ProblemData as DataAnalysisProblemData; 298 if (problemData != null) { 299 problemData.TrainingPartitionParameter.Hidden = false; 300 problemData.TestPartitionParameter.Hidden = false; 301 } 302 303 if (symbolicProblem != null) { 304 symbolicProblem.FitnessCalculationPartition.Start = SamplesStart.Value; 305 symbolicProblem.FitnessCalculationPartition.End = SamplesEnd.Value; 306 } 307 clonedAlgorithm.Prepare(); 308 clonedAlgorithms.Add(clonedAlgorithm); 309 } 310 } 311 312 OnStarted(); 313 ticket = new SemaphoreSlim(NumberOfWorkers.Value); 314 signal = new ManualResetEventSlim(false); 315 316 //start prepared or paused cloned algorithms 317 foreach (IAlgorithm clonedAlgorithm in clonedAlgorithms) { 318 if (pausePending || stopPending) break; 319 if (clonedAlgorithm.ExecutionState == ExecutionState.Prepared || 320 clonedAlgorithm.ExecutionState == ExecutionState.Paused) { 321 ticket.Wait(); 322 clonedAlgorithm.StartAsync(cancellationToken); 323 } 324 } 325 326 signal.Wait(); 327 if (pausePending) OnPaused(); 328 else OnStopped(); 329 } 330 331 public async Task StartAsync() { await StartAsync(CancellationToken.None); } 277 332 public async Task StartAsync(CancellationToken cancellationToken) { 278 this.cancellationToken = cancellationToken; 279 signaler.Reset(); 280 await Task.Run(() => { 281 if ((ExecutionState != ExecutionState.Prepared) && (ExecutionState != ExecutionState.Paused)) 282 throw new InvalidOperationException(string.Format("Start not allowed in execution state \"{0}\".", ExecutionState)); 283 284 if (Algorithm != null) { 285 //create cloned algorithms 286 if (clonedAlgorithms.Count == 0) { 287 int testSamplesCount = (SamplesEnd.Value - SamplesStart.Value) / Folds.Value; 288 289 for (int i = 0; i < Folds.Value; i++) { 290 IAlgorithm clonedAlgorithm = (IAlgorithm)algorithm.Clone(); 291 clonedAlgorithm.Name = algorithm.Name + " Fold " + i; 292 IDataAnalysisProblem problem = clonedAlgorithm.Problem as IDataAnalysisProblem; 293 ISymbolicDataAnalysisProblem symbolicProblem = problem as ISymbolicDataAnalysisProblem; 294 295 int testStart = (i * testSamplesCount) + SamplesStart.Value; 296 int testEnd = (i + 1) == Folds.Value ? SamplesEnd.Value : (i + 1) * testSamplesCount + SamplesStart.Value; 297 298 problem.ProblemData.TrainingPartition.Start = SamplesStart.Value; 299 problem.ProblemData.TrainingPartition.End = SamplesEnd.Value; 300 problem.ProblemData.TestPartition.Start = testStart; 301 problem.ProblemData.TestPartition.End = testEnd; 302 DataAnalysisProblemData problemData = problem.ProblemData as DataAnalysisProblemData; 303 if (problemData != null) { 304 problemData.TrainingPartitionParameter.Hidden = false; 305 problemData.TestPartitionParameter.Hidden = false; 306 } 307 308 if (symbolicProblem != null) { 309 symbolicProblem.FitnessCalculationPartition.Start = SamplesStart.Value; 310 symbolicProblem.FitnessCalculationPartition.End = SamplesEnd.Value; 311 } 312 clonedAlgorithm.Prepare(); 313 clonedAlgorithms.Add(clonedAlgorithm); 314 } 315 } 316 317 //start prepared or paused cloned algorithms 318 int startedAlgorithms = 0; 319 foreach (IAlgorithm clonedAlgorithm in clonedAlgorithms) { 320 if (startedAlgorithms < NumberOfWorkers.Value) { 321 if (clonedAlgorithm.ExecutionState == ExecutionState.Prepared || 322 clonedAlgorithm.ExecutionState == ExecutionState.Paused) { 323 324 // start and wait until the alg is started 325 using (var signal = new ManualResetEvent(false)) { 326 EventHandler signalSetter = (sender, args) => { signal.Set(); }; 327 clonedAlgorithm.Started += signalSetter; 328 clonedAlgorithm.StartAsync(cancellationToken); 329 signal.WaitOne(); 330 clonedAlgorithm.Started -= signalSetter; 331 332 startedAlgorithms++; 333 } 334 } 335 } 336 } 337 OnStarted(); 338 } 339 }, cancellationToken); 333 await Task.Factory.StartNew((ct) => Start((CancellationToken)ct), cancellationToken, cancellationToken); 340 334 } 341 335 … … 346 340 if (!pausePending) { 347 341 pausePending = true; 348 PauseAllClonedAlgorithms(); 349 } 350 } 351 private void PauseAllClonedAlgorithms() { 352 foreach (IAlgorithm clonedAlgorithm in clonedAlgorithms) { 353 if (clonedAlgorithm.ExecutionState == ExecutionState.Started) 354 clonedAlgorithm.Pause(); 342 var toPause = clonedAlgorithms.Where(x => x.ExecutionState == ExecutionState.Started); 343 if (toPause.Any()) { 344 foreach (var optimizer in toPause) { 345 // a race-condition may occur when the optimizer has changed the state by itself in the meantime 346 try { optimizer.Pause(); } catch (InvalidOperationException) { } 347 } 348 } 349 if (ExecutionState != ExecutionState.Paused) OnPaused(); 355 350 } 356 351 } … … 363 358 if (!stopPending) { 364 359 stopPending = true; 365 StopAllClonedAlgorithms();366 }367 }368 private void StopAllClonedAlgorithms() {369 foreach (IAlgorithm clonedAlgorithm in clonedAlgorithms) {370 if (clonedAlgorithm.ExecutionState == ExecutionState.Started ||371 clonedAlgorithm.ExecutionState == ExecutionState.Paused)372 clonedAlgorithm.Stop();360 var toStop = clonedAlgorithms.Where(x => x.ExecutionState == ExecutionState.Started || x.ExecutionState == ExecutionState.Paused); 361 if (toStop.Any()) { 362 foreach (var optimizer in toStop) { 363 // a race-condition may occur when the optimizer has changed the state by itself in the meantime 364 try { optimizer.Stop(); } catch (InvalidOperationException) { } 365 } 366 } 367 if (ExecutionState != ExecutionState.Stopped) OnStopped(); 373 368 } 374 369 } … … 602 597 private void Algorithm_ExecutionStateChanged(object sender, EventArgs e) { 603 598 switch (Algorithm.ExecutionState) { 604 case ExecutionState.Prepared: OnPrepared(); 599 case ExecutionState.Prepared: 600 OnPrepared(); 605 601 break; 606 602 case ExecutionState.Started: throw new InvalidOperationException("Algorithm template can not be started."); 607 603 case ExecutionState.Paused: throw new InvalidOperationException("Algorithm template can not be paused."); 608 case ExecutionState.Stopped: OnStopped(); 604 case ExecutionState.Stopped: 605 OnStopped(); 609 606 break; 610 607 } … … 674 671 private void ClonedAlgorithm_Paused(object sender, EventArgs e) { 675 672 lock (locker) { 676 if (pausePending && clonedAlgorithms.All(alg => alg.ExecutionState != ExecutionState.Started)) 677 OnPaused(); 673 if (ExecutionState != ExecutionState.Paused) { 674 if (clonedAlgorithms.All(alg => alg.ExecutionState != ExecutionState.Started)) { 675 pausePending = true; 676 signal.Set(); 677 ticket.Release(); 678 } 679 } 678 680 } 679 681 } … … 681 683 private void ClonedAlgorithm_Stopped(object sender, EventArgs e) { 682 684 lock (locker) { 683 if (!stopPending && ExecutionState == ExecutionState.Started) {684 IAlgorithm preparedAlgorithm = clonedAlgorithms.FirstOrDefault(alg => alg.ExecutionState == ExecutionState.Prepared ||685 alg.ExecutionState == ExecutionState.Paused);686 if (preparedAlgorithm != null) preparedAlgorithm.StartAsync(cancellationToken);687 }688 685 if (ExecutionState != ExecutionState.Stopped) { 689 if (clonedAlgorithms.All(alg => alg.ExecutionState == ExecutionState.Stopped)) 690 OnStopped(); 691 else if (stopPending && 692 clonedAlgorithms.All( 693 alg => alg.ExecutionState == ExecutionState.Prepared || alg.ExecutionState == ExecutionState.Stopped)) 694 OnStopped(); 686 if (clonedAlgorithms.All(alg => alg.ExecutionState == ExecutionState.Stopped || stopPending && alg.ExecutionState == ExecutionState.Prepared)) { 687 stopPending = true; 688 signal.Set(); 689 } 690 ticket.Release(); 695 691 } 696 692 } … … 727 723 pausePending = false; 728 724 ExecutionState = ExecutionState.Paused; 729 signaler.Set();730 725 EventHandler handler = Paused; 731 726 if (handler != null) handler(this, EventArgs.Empty); … … 740 735 runs.Add(new Run(string.Format("{0} Run {1}", Name, runsCounter), this)); 741 736 ExecutionState = ExecutionState.Stopped; 742 signaler.Set();743 737 EventHandler handler = Stopped; 744 738 if (handler != null) handler(this, EventArgs.Empty); -
branches/Async/HeuristicLab.Algorithms.DataAnalysis/3.4/FixedDataAnalysisAlgorithm.cs
r13349 r15065 22 22 using System; 23 23 using System.Threading; 24 using System.Threading.Tasks;25 24 using HeuristicLab.Common; 26 25 using HeuristicLab.Optimization; … … 70 69 } 71 70 72 public override async Task StartAsync(CancellationToken cancellationToken) {73 await base.StartAsync(cancellationToken);74 var cancellationTokenSource = new CancellationTokenSource();71 public override void Start(CancellationToken cancellationToken) { 72 base.Start(cancellationToken); 73 OnStarted(); 75 74 76 OnStarted(); 77 using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenSource.Token, cancellationToken)) { 78 Task task = Task.Factory.StartNew(Run, cts.Token, cts.Token); 79 await task.ContinueWith(t => { 80 try { 81 t.Wait(); 82 } 83 catch (AggregateException ex) { 84 try { 85 ex.Flatten().Handle(x => x is OperationCanceledException); 86 } 87 catch (AggregateException remaining) { 88 if (remaining.InnerExceptions.Count == 1) OnExceptionOccurred(remaining.InnerExceptions[0]); 89 else OnExceptionOccurred(remaining); 90 } 91 } 92 cancellationTokenSource.Dispose(); 93 cancellationTokenSource = null; 94 OnStopped(); 95 }); 75 try { 76 Run(cancellationToken); 77 } catch (OperationCanceledException) { 78 } catch (AggregateException ae) { 79 if (ae.InnerExceptions.Count == 1) OnExceptionOccurred(ae.InnerExceptions[0]); 80 else OnExceptionOccurred(ae); 81 } catch (Exception e) { 82 OnExceptionOccurred(e); 96 83 } 84 85 OnStopped(); 97 86 } 98 87 private void Run(object state) {
Note: See TracChangeset
for help on using the changeset viewer.