Changeset 15373


Ignore:
Timestamp:
09/19/17 15:58:21 (2 years ago)
Author:
pfleck
Message:

#2822:

  • Merged recent trunk-changes into branch.
  • Using task-continuations for semaphore-releasing to avoid manually started optimizers to mess with the semaphore.
Location:
branches/ParallelExperiment
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • branches/ParallelExperiment/HeuristicLab.Optimization

  • branches/ParallelExperiment/HeuristicLab.Optimization.Views

  • branches/ParallelExperiment/HeuristicLab.Optimization.Views/3.3/IOptimizerView.cs

    r15287 r15373  
    136136
    137137    #region Control events
    138     protected virtual void startButton_Click(object sender, EventArgs e) {
    139       Content.StartAsync();
     138    protected virtual async void startButton_Click(object sender, EventArgs e) {
     139      await Content.StartAsync();
    140140    }
    141141    protected virtual void pauseButton_Click(object sender, EventArgs e) {
  • branches/ParallelExperiment/HeuristicLab.Optimization/3.3/Algorithms/BasicAlgorithm.cs

    r15337 r15373  
    8282      } catch (OperationCanceledException) {
    8383      } catch (AggregateException ae) {
    84         OnExceptionOccurred(ae.InnerExceptions.SingleOrDefault() ?? ae);
     84        ae.FlattenAndHandle(new[] { typeof(OperationCanceledException) }, e => OnExceptionOccurred(e));
    8585      } catch (Exception e) {
    8686        OnExceptionOccurred(e);
  • branches/ParallelExperiment/HeuristicLab.Optimization/3.3/MetaOptimizers/Experiment.cs

    r15337 r15373  
    122122    private bool experimentStopped = false;
    123123
    124     private ManualResetEventSlim allOptimizerFinished; // this indicates that all started optimizers have been paused or stopped
    125     private SemaphoreSlim availableWorkers; // limits the number of concurrent optimizer executions
     124    private readonly ManualResetEventSlim allOptimizerFinished = new ManualResetEventSlim(false); // this indicates that all started optimizers have been paused or stopped
    126125
    127126    public Experiment()
     
    202201      Start(CancellationToken.None);
    203202    }
    204 
    205203    public void Start(CancellationToken cancellationToken) {
    206204      if ((ExecutionState != ExecutionState.Prepared) && (ExecutionState != ExecutionState.Paused))
    207205        throw new InvalidOperationException(string.Format("Start not allowed in execution state \"{0}\".", ExecutionState));
    208       if (Optimizers.Count == 0) return;
     206      if (!Optimizers.Any(x => x.ExecutionState == ExecutionState.Prepared || x.ExecutionState == ExecutionState.Paused)) return;
    209207
    210208      experimentStarted = true;
    211209      experimentStopped = false;
    212       allOptimizerFinished = new ManualResetEventSlim(false);
    213       availableWorkers = new SemaphoreSlim(NumberOfWorkers, NumberOfWorkers);
    214 
    215       var runnableOptimizers = Optimizers.Where(o => o.ExecutionState == ExecutionState.Prepared || o.ExecutionState == ExecutionState.Paused).ToList();
    216       if (!runnableOptimizers.Any()) return;
    217 
    218       while (runnableOptimizers.Any()) {
    219         try {
    220           availableWorkers.Wait(cancellationToken); // an optimizer was pause/stopped previously
    221           if (experimentStopped || !experimentStarted) break;
    222           // some optimizers might be started manually somewhere else
    223           runnableOptimizers.RemoveAll(o => !(o.ExecutionState == ExecutionState.Prepared || o.ExecutionState == ExecutionState.Paused));
    224           var optimizer = runnableOptimizers.FirstOrDefault();
    225           runnableOptimizers.Remove(optimizer);
    226           optimizer?.StartAsync(cancellationToken);
    227         } catch (InvalidOperationException) { } catch (OperationCanceledException) { }
    228       }
    229 
    230       allOptimizerFinished.Wait();
     210
     211      var startedTasks = new List<Task>();
     212      using (var availableWorkers = new SemaphoreSlim(NumberOfWorkers, NumberOfWorkers)) {
     213        var runnableOptimizers = Optimizers.Where(o => o.ExecutionState == ExecutionState.Prepared || o.ExecutionState == ExecutionState.Paused).ToList();
     214        if (!runnableOptimizers.Any()) return;
     215
     216        while (runnableOptimizers.Any()) {
     217          try {
     218            availableWorkers.Wait(cancellationToken);
     219            if (experimentStopped || !experimentStarted) break;
     220            // some optimizers might be started manually somewhere else
     221            runnableOptimizers.RemoveAll(o => !(o.ExecutionState == ExecutionState.Prepared || o.ExecutionState == ExecutionState.Paused));
     222            if (runnableOptimizers.Any()) {
     223              var optimizer = runnableOptimizers.First();
     224              runnableOptimizers.Remove(optimizer);
     225              var startedTask = optimizer.StartAsync(cancellationToken).ContinueWith(x => {
     226                availableWorkers.Release(); // is not disposed yet because Task.WaitAll at the end of the using is blocking
     227              });
     228              startedTasks.Add(startedTask);
     229            }
     230          } catch (InvalidOperationException) { } catch (OperationCanceledException) { }
     231        }
     232
     233        allOptimizerFinished.Wait();
     234        Task.WaitAll(startedTasks.ToArray()); // retreive exeptions of the asyncrounously started optimizer
     235      }
    231236    }
    232237    public async Task StartAsync() { await StartAsync(CancellationToken.None); }
     
    294299    private void OnPaused() {
    295300      ExecutionState = ExecutionState.Paused;
     301      allOptimizerFinished.Set();
    296302      EventHandler handler = Paused;
    297303      if (handler != null) handler(this, EventArgs.Empty);
    298       allOptimizerFinished.Set();
    299304    }
    300305    public event EventHandler Stopped;
    301306    private void OnStopped() {
    302307      ExecutionState = ExecutionState.Stopped;
     308      allOptimizerFinished.Set();
    303309      EventHandler handler = Stopped;
    304310      if (handler != null) handler(this, EventArgs.Empty);
    305       allOptimizerFinished.Set();
    306311    }
    307312    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
     
    398403    private void optimizer_Paused(object sender, EventArgs e) {
    399404      lock (locker) {
    400         if (availableWorkers.CurrentCount < NumberOfWorkers)
    401           availableWorkers.Release();
    402405        if (Optimizers.All(x => x.ExecutionState != ExecutionState.Started)) {
    403406          OnPaused();
     
    415418    private void optimizer_Stopped(object sender, EventArgs e) {
    416419      lock (locker) {
    417         if (availableWorkers.CurrentCount < NumberOfWorkers)
    418           availableWorkers.Release();
    419420        if (experimentStopped) {
    420           if (Optimizers.All(x => (x.ExecutionState == ExecutionState.Stopped) || (x.ExecutionState == ExecutionState.Prepared))) {
    421             OnStopped();
    422           }
     421          if (Optimizers.All(x => (x.ExecutionState == ExecutionState.Stopped) || (x.ExecutionState == ExecutionState.Prepared))) OnStopped();
    423422        } else {
    424423          if (experimentStarted && Optimizers.Any(x => (x.ExecutionState == ExecutionState.Prepared) || (x.ExecutionState == ExecutionState.Paused))) return;
Note: See TracChangeset for help on using the changeset viewer.