Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
11/26/12 11:12:57 (11 years ago)
Author:
ascheibe
Message:

#1950

  • added more aggressive locking so that the views don't read run collections that get modified in the meantime
  • start downloading of tasks after the job has been uploaded completely
  • fixed exceptions that got thrown when waiting for the threads that upload the tasks
Location:
trunk/sources/HeuristicLab.Clients.Hive/3.3
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Clients.Hive/3.3/HiveClient.cs

    r8914 r8939  
    294294        cancellationToken.ThrowIfCancellationRequested();
    295295
     296        // upload tasks
     297        refreshableJob.Progress.Status = "Uploading tasks...";
     298
     299        var tasks = new List<TS.Task>();
     300        foreach (HiveTask hiveTask in refreshableJob.HiveTasks) {
     301          var task = TS.Task.Factory.StartNew((hj) => {
     302            UploadTaskWithChildren(refreshableJob.Progress, (HiveTask)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, refreshableJob.Job.IsPrivileged, cancellationToken);
     303          }, hiveTask);
     304          task.ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
     305          tasks.Add(task);
     306        }
     307        TS.Task.WaitAll(tasks.ToArray());
     308      }
     309      finally {
    296310        refreshableJob.RefreshAutomatically = true;
    297311        refreshableJob.StartResultPolling();
    298 
    299         // upload tasks
    300         refreshableJob.Progress.Status = "Uploading tasks...";
    301 
    302         var tasks = new List<TS.Task>();
    303         foreach (HiveTask hiveTask in refreshableJob.HiveTasks) {
    304           tasks.Add(TS.Task.Factory.StartNew((hj) => {
    305             UploadTaskWithChildren(refreshableJob.Progress, (HiveTask)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, refreshableJob.Job.IsPrivileged, cancellationToken);
    306           }, hiveTask)
    307           .ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted));
    308         }
    309         try {
    310           TS.Task.WaitAll(tasks.ToArray());
    311         }
    312         catch (AggregateException ae) {
    313           if (!ae.InnerExceptions.All(e => e is TaskCanceledException)) throw ae; // for some reason the WaitAll throws a AggregateException containg a TaskCanceledException. i don't know where it comes from, however the tasks all finish properly, so for now just ignore it
    314         }
    315312        refreshableJob.Job.Modified = false;
    316       }
    317       finally {
    318313        refreshableJob.IsProgressing = false;
    319314        refreshableJob.Progress.Finish();
     
    405400        var tasks = new List<TS.Task>();
    406401        foreach (HiveTask child in hiveTask.ChildHiveTasks) {
    407           tasks.Add(TS.Task.Factory.StartNew((tuple) => {
     402          var task = TS.Task.Factory.StartNew((tuple) => {
    408403            var arguments = (Tuple<HiveTask, HiveTask>)tuple;
    409404            UploadTaskWithChildren(progress, arguments.Item1, arguments.Item2, groups, taskCount, totalJobCount, configPluginId, jobId, log, isPrivileged, cancellationToken);
    410           }, new Tuple<HiveTask, HiveTask>(child, hiveTask))
    411           .ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted));
     405          }, new Tuple<HiveTask, HiveTask>(child, hiveTask));
     406          task.ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
     407          tasks.Add(task);
    412408        }
    413409        taskUploadSemaphore.Release(); semaphoreReleased = true; // the semaphore has to be release before waitall!
    414         try {
    415           TS.Task.WaitAll(tasks.ToArray());
    416         }
    417         catch (AggregateException ae) {
    418           if (!ae.InnerExceptions.All(e => e is TaskCanceledException)) throw ae; // for some reason the WaitAll throws a AggregateException containg a TaskCanceledException. i don't know where it comes from, however the tasks all finish properly, so for now just ignore it
    419         }
     410        TS.Task.WaitAll(tasks.ToArray());
    420411      }
    421412      finally {
  • trunk/sources/HeuristicLab.Clients.Hive/3.3/HiveTasks/HiveTask.cs

    r8871 r8939  
    3939    protected static object locker = new object();
    4040    protected ReaderWriterLockSlim childHiveTasksLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
     41    protected ReaderWriterLockSlim itemTaskLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    4142
    4243    public static new Image StaticItemImage {
     
    8384      set {
    8485        if (itemTask != null && syncTasksWithOptimizers) {
    85           this.childHiveTasks.Clear();
     86          childHiveTasksLock.EnterWriteLock();
     87          try {
     88            childHiveTasks.Clear();
     89          }
     90          finally { childHiveTasksLock.ExitWriteLock(); }
    8691        }
    8792        if (itemTask != value) {
    88           DergisterItemTaskEvents();
    89           itemTask = value;
    90           RegisterItemTaskEvents();
     93          itemTaskLock.EnterWriteLock();
     94          try {
     95            DergisterItemTaskEvents();
     96            itemTask = value;
     97            RegisterItemTaskEvents();
     98          }
     99          finally { itemTaskLock.ExitWriteLock(); }
    91100          OnItemTaskChanged();
    92101          IsFinishedTaskDownloaded = true;
    93102        }
     103
    94104      }
    95105    }
  • trunk/sources/HeuristicLab.Clients.Hive/3.3/HiveTasks/OptimizerHiveTask.cs

    r8884 r8939  
    3131namespace HeuristicLab.Clients.Hive {
    3232  public class OptimizerHiveTask : HiveTask<OptimizerTask> {
    33 
    34     Object batchRunLocker = new Object();
    35 
    3633    #region Constructors and Cloning
    3734    public OptimizerHiveTask() { }
     
    6057    protected override void UpdateChildHiveTasks() {
    6158      base.UpdateChildHiveTasks();
    62       if (Task != null && syncTasksWithOptimizers) {
    63         if (!ItemTask.ComputeInParallel) {
    64           this.childHiveTasks.Clear();
    65         } else {
    66           if (ItemTask.Item is Optimization.Experiment) {
    67             Optimization.Experiment experiment = (Optimization.Experiment)ItemTask.Item;
    68             foreach (IOptimizer childOpt in experiment.Optimizers) {
    69               var optimizerHiveTask = new OptimizerHiveTask(childOpt);
    70               optimizerHiveTask.Task.Priority = Task.Priority; //inherit priority from parent
    71               this.childHiveTasks.Add(optimizerHiveTask);
    72             }
    73           } else if (ItemTask.Item is Optimization.BatchRun) {
    74             Optimization.BatchRun batchRun = ItemTask.OptimizerAsBatchRun;
    75             if (batchRun.Optimizer != null) {
    76               while (this.childHiveTasks.Count < batchRun.Repetitions) {
    77                 var optimizerHiveTask = new OptimizerHiveTask(batchRun.Optimizer);
    78                 optimizerHiveTask.Task.Priority = Task.Priority;
     59      childHiveTasksLock.EnterWriteLock();
     60      try {
     61        if (Task != null && syncTasksWithOptimizers) {
     62          if (!ItemTask.ComputeInParallel) {
     63            this.childHiveTasks.Clear();
     64          } else {
     65            if (ItemTask.Item is Optimization.Experiment) {
     66              Optimization.Experiment experiment = (Optimization.Experiment)ItemTask.Item;
     67              foreach (IOptimizer childOpt in experiment.Optimizers) {
     68                var optimizerHiveTask = new OptimizerHiveTask(childOpt);
     69                optimizerHiveTask.Task.Priority = Task.Priority; //inherit priority from parent
    7970                this.childHiveTasks.Add(optimizerHiveTask);
    8071              }
    81               while (this.childHiveTasks.Count > batchRun.Repetitions) {
    82                 this.childHiveTasks.Remove(this.childHiveTasks.Last());
     72            } else if (ItemTask.Item is Optimization.BatchRun) {
     73              Optimization.BatchRun batchRun = ItemTask.OptimizerAsBatchRun;
     74              if (batchRun.Optimizer != null) {
     75                while (this.childHiveTasks.Count < batchRun.Repetitions) {
     76                  var optimizerHiveTask = new OptimizerHiveTask(batchRun.Optimizer);
     77                  optimizerHiveTask.Task.Priority = Task.Priority;
     78                  this.childHiveTasks.Add(optimizerHiveTask);
     79                }
     80                while (this.childHiveTasks.Count > batchRun.Repetitions) {
     81                  this.childHiveTasks.Remove(this.childHiveTasks.Last());
     82                }
    8383              }
    8484            }
    8585          }
    8686        }
     87      }
     88      finally {
     89        childHiveTasksLock.ExitWriteLock();
    8790      }
    8891    }
     
    223226    /// </summary>
    224227    private void UpdateOptimizerInBatchRun(BatchRun batchRun, OptimizerTask optimizerTask) {
    225       if (batchRun.Optimizer == null) {
    226         batchRun.Optimizer = (IOptimizer)optimizerTask.Item; // only set the first optimizer as Optimizer. if every time the Optimizer would be set, the runs would be cleared each time
    227       }
    228       lock (batchRunLocker) {
     228      itemTaskLock.EnterWriteLock();
     229      try {
     230        if (batchRun.Optimizer == null) {
     231          batchRun.Optimizer = (IOptimizer)optimizerTask.Item; // only set the first optimizer as Optimizer. if every time the Optimizer would be set, the runs would be cleared each time
     232        }
    229233        foreach (IRun run in optimizerTask.Item.Runs) {
    230234          if (!batchRun.Runs.Contains(run)) {
     
    234238        }
    235239      }
     240      finally {
     241        itemTaskLock.ExitWriteLock();
     242      }
    236243    }
    237244
     
    241248    /// </summary>
    242249    private void UpdateOptimizerInExperiment(Optimization.Experiment experiment, OptimizerTask optimizerTask) {
    243       if (optimizerTask.IndexInParentOptimizerList < 0)
    244         throw new IndexOutOfRangeException("IndexInParentOptimizerList must be equal or greater than zero! The Task is invalid and the optimizer-tree cannot be reassembled.");
    245 
    246       while (experiment.Optimizers.Count < optimizerTask.IndexInParentOptimizerList) {
    247         experiment.Optimizers.Add(new UserDefinedAlgorithm("Placeholder")); // add dummy-entries to Optimizers so that its possible to insert the optimizerTask at the correct position
    248       }
    249       if (experiment.Optimizers.Count < optimizerTask.IndexInParentOptimizerList + 1) {
    250         experiment.Optimizers.Add(optimizerTask.Item);
    251       } else {
    252         // if ComputeInParallel==true, don't replace the optimizer (except it is still a Placeholder)
    253         // this is because Jobs with ComputeInParallel get submitted to hive with their child-optimizers deleted
    254         if (!optimizerTask.ComputeInParallel || experiment.Optimizers[optimizerTask.IndexInParentOptimizerList].Name == "Placeholder") {
    255           experiment.Optimizers[optimizerTask.IndexInParentOptimizerList] = optimizerTask.Item;
    256         }
     250      itemTaskLock.EnterWriteLock();
     251      try {
     252        if (optimizerTask.IndexInParentOptimizerList < 0)
     253          throw new IndexOutOfRangeException("IndexInParentOptimizerList must be equal or greater than zero! The Task is invalid and the optimizer-tree cannot be reassembled.");
     254
     255        while (experiment.Optimizers.Count < optimizerTask.IndexInParentOptimizerList) {
     256          experiment.Optimizers.Add(new UserDefinedAlgorithm("Placeholder")); // add dummy-entries to Optimizers so that its possible to insert the optimizerTask at the correct position
     257        }
     258        if (experiment.Optimizers.Count < optimizerTask.IndexInParentOptimizerList + 1) {
     259          experiment.Optimizers.Add(optimizerTask.Item);
     260        } else {
     261          // if ComputeInParallel==true, don't replace the optimizer (except it is still a Placeholder)
     262          // this is because Jobs with ComputeInParallel get submitted to hive with their child-optimizers deleted
     263          if (!optimizerTask.ComputeInParallel || experiment.Optimizers[optimizerTask.IndexInParentOptimizerList].Name == "Placeholder") {
     264            experiment.Optimizers[optimizerTask.IndexInParentOptimizerList] = optimizerTask.Item;
     265          }
     266        }
     267      }
     268      finally {
     269        itemTaskLock.ExitWriteLock();
    257270      }
    258271    }
     
    365378    }
    366379
     380    public void ExecuteReadActionOnItemTask(Action action) {
     381      itemTaskLock.EnterReadLock();
     382      try {
     383        action();
     384      }
     385      finally {
     386        itemTaskLock.ExitReadLock();
     387      }
     388    }
     389
    367390    #region Helpers
    368391    /// <summary>
  • trunk/sources/HeuristicLab.Clients.Hive/3.3/RefreshableJob.cs

    r8914 r8939  
    3636    private object locker = new object();
    3737    private object downloadFinishedLocker = new object();
     38    object jobResultReceivedLocker = new object();
    3839
    3940    public bool IsProgressing { get; set; }
     
    261262
    262263    private void jobResultPoller_JobResultReceived(object sender, EventArgs<IEnumerable<LightweightTask>> e) {
    263       foreach (LightweightTask lightweightTask in e.Value) {
    264         HiveTask hiveTask = GetHiveTaskById(lightweightTask.Id);
    265         if (hiveTask != null) {
    266           // lastJobDataUpdate equals DateTime.MinValue right after it was uploaded. When the first results are polled, this value is updated
    267           if (hiveTask.Task.State == TaskState.Offline && lightweightTask.State != TaskState.Finished && lightweightTask.State != TaskState.Failed && lightweightTask.State != TaskState.Aborted) {
    268             hiveTask.Task.LastTaskDataUpdate = lightweightTask.LastTaskDataUpdate;
    269           }
    270 
    271           hiveTask.UpdateFromLightweightJob(lightweightTask);
    272 
    273           if (!hiveTask.IsFinishedTaskDownloaded && !hiveTask.IsDownloading && hiveTask.Task.LastTaskDataUpdate < lightweightTask.LastTaskDataUpdate) {
    274             log.LogMessage(string.Format("Downloading task {0}", lightweightTask.Id));
    275             hiveTask.IsDownloading = true;
    276             jobDownloader.DownloadTaskData(hiveTask.Task, (localJob, itemJob) => {
    277               lock (downloadFinishedLocker) {
    278                 log.LogMessage(string.Format("Finished downloading task {0}", localJob.Id));
    279                 HiveTask localHiveTask = GetHiveTaskById(localJob.Id);
    280 
    281                 if (itemJob == null) {
    282                   // something bad happened to this task. bad task, BAAAD task!
    283                   localHiveTask.IsDownloading = false;
    284                 } else {
    285                   // if the task is paused, download but don't integrate into parent optimizer (to avoid Prepare)
    286                   if (localJob.State == TaskState.Paused) {
    287                     localHiveTask.ItemTask = itemJob;
     264      lock (jobResultReceivedLocker) {
     265        foreach (LightweightTask lightweightTask in e.Value) {
     266          HiveTask hiveTask = GetHiveTaskById(lightweightTask.Id);
     267          if (hiveTask != null) {
     268            // lastJobDataUpdate equals DateTime.MinValue right after it was uploaded. When the first results are polled, this value is updated
     269            if (hiveTask.Task.State == TaskState.Offline && lightweightTask.State != TaskState.Finished && lightweightTask.State != TaskState.Failed && lightweightTask.State != TaskState.Aborted) {
     270              hiveTask.Task.LastTaskDataUpdate = lightweightTask.LastTaskDataUpdate;
     271            }
     272
     273            hiveTask.UpdateFromLightweightJob(lightweightTask);
     274
     275            if (!hiveTask.IsFinishedTaskDownloaded && !hiveTask.IsDownloading && hiveTask.Task.LastTaskDataUpdate < lightweightTask.LastTaskDataUpdate) {
     276              log.LogMessage(string.Format("Downloading task {0}", lightweightTask.Id));
     277              hiveTask.IsDownloading = true;
     278              jobDownloader.DownloadTaskData(hiveTask.Task, (localJob, itemJob) => {
     279                lock (downloadFinishedLocker) {
     280                  log.LogMessage(string.Format("Finished downloading task {0}", localJob.Id));
     281                  HiveTask localHiveTask = GetHiveTaskById(localJob.Id);
     282
     283                  if (itemJob == null) {
     284                    // something bad happened to this task. bad task, BAAAD task!
     285                    localHiveTask.IsDownloading = false;
    288286                  } else {
    289                     if (localJob.ParentTaskId.HasValue) {
    290                       HiveTask parentHiveTask = GetHiveTaskById(localJob.ParentTaskId.Value);
    291                       parentHiveTask.IntegrateChild(itemJob, localJob.Id);
     287                    // if the task is paused, download but don't integrate into parent optimizer (to avoid Prepare)
     288                    if (localJob.State == TaskState.Paused) {
     289                      localHiveTask.ItemTask = itemJob;
    292290                    } else {
    293                       localHiveTask.ItemTask = itemJob;
     291                      if (localJob.ParentTaskId.HasValue) {
     292                        HiveTask parentHiveTask = GetHiveTaskById(localJob.ParentTaskId.Value);
     293                        parentHiveTask.IntegrateChild(itemJob, localJob.Id);
     294                      } else {
     295                        localHiveTask.ItemTask = itemJob;
     296                      }
    294297                    }
     298                    localHiveTask.IsDownloading = false;
     299                    localHiveTask.Task.LastTaskDataUpdate = lightweightTask.LastTaskDataUpdate;
    295300                  }
    296                   localHiveTask.IsDownloading = false;
    297                   localHiveTask.Task.LastTaskDataUpdate = lightweightTask.LastTaskDataUpdate;
    298301                }
    299               }
    300             });
    301           }
    302         }
    303       }
    304       GC.Collect(); // force GC, because .NET is too lazy here (deserialization takes a lot of memory)
    305       if (AllJobsFinished()) {
    306         this.ExecutionState = Core.ExecutionState.Stopped;
    307         StopResultPolling();
    308       }
    309       UpdateTotalExecutionTime();
    310       UpdateStatistics();
    311       OnStateLogListChanged();
    312       OnTaskReceived();
     302              });
     303            }
     304          } else
     305            throw new Exception("This should not happen");
     306        }
     307        GC.Collect(); // force GC, because .NET is too lazy here (deserialization takes a lot of memory)
     308        if (AllJobsFinished()) {
     309          this.ExecutionState = Core.ExecutionState.Stopped;
     310          StopResultPolling();
     311        }
     312        UpdateTotalExecutionTime();
     313        UpdateStatistics();
     314        OnStateLogListChanged();
     315        OnTaskReceived();
     316      }
    313317    }
    314318
Note: See TracChangeset for help on using the changeset viewer.