Changeset 8939 for trunk/sources
- Timestamp:
- 11/26/12 11:12:57 (12 years ago)
- Location:
- trunk/sources
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.Clients.Hive.JobManager/3.3/Views/RefreshableHiveJobView.cs
r8914 r8939 539 539 private RunCollection GetAllRunsFromJob(RefreshableJob job) { 540 540 if (job != null) { 541 RunCollection runs = new RunCollection() ;541 RunCollection runs = new RunCollection() { AlgorithmName = job.ItemName }; 542 542 543 543 foreach (HiveTask subTask in job.HiveTasks) { 544 544 if (subTask is OptimizerHiveTask) { 545 545 OptimizerHiveTask ohTask = subTask as OptimizerHiveTask; 546 runs.AddRange(ohTask.ItemTask.Item.Runs); 546 ohTask.ExecuteReadActionOnItemTask(new Action(delegate() { 547 runs.AddRange(ohTask.ItemTask.Item.Runs); 548 })); 547 549 } 548 550 } -
trunk/sources/HeuristicLab.Clients.Hive.Views/3.3/HiveTasks/OptimizerHiveTaskView.cs
r8884 r8939 53 53 protected override void Job_ItemChanged(object sender, EventArgs e) { 54 54 if (Content != null && Content.Task != null && Content.ItemTask.Item != null) { 55 runCollectionViewHost.Content = Content.ItemTask.Item.Runs; 55 Content.ExecuteReadActionOnItemTask(new Action(delegate() { 56 runCollectionViewHost.Content = Content.ItemTask.Item.Runs; 57 })); 56 58 } else { 57 59 runCollectionViewHost.Content = null; -
trunk/sources/HeuristicLab.Clients.Hive/3.3/HiveClient.cs
r8914 r8939 294 294 cancellationToken.ThrowIfCancellationRequested(); 295 295 296 // upload tasks 297 refreshableJob.Progress.Status = "Uploading tasks..."; 298 299 var tasks = new List<TS.Task>(); 300 foreach (HiveTask hiveTask in refreshableJob.HiveTasks) { 301 var task = TS.Task.Factory.StartNew((hj) => { 302 UploadTaskWithChildren(refreshableJob.Progress, (HiveTask)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, refreshableJob.Job.IsPrivileged, cancellationToken); 303 }, hiveTask); 304 task.ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted); 305 tasks.Add(task); 306 } 307 TS.Task.WaitAll(tasks.ToArray()); 308 } 309 finally { 296 310 refreshableJob.RefreshAutomatically = true; 297 311 refreshableJob.StartResultPolling(); 298 299 // upload tasks300 refreshableJob.Progress.Status = "Uploading tasks...";301 302 var tasks = new List<TS.Task>();303 foreach (HiveTask hiveTask in refreshableJob.HiveTasks) {304 tasks.Add(TS.Task.Factory.StartNew((hj) => {305 UploadTaskWithChildren(refreshableJob.Progress, (HiveTask)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, refreshableJob.Job.IsPrivileged, cancellationToken);306 }, hiveTask)307 .ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted));308 }309 try {310 TS.Task.WaitAll(tasks.ToArray());311 }312 catch (AggregateException ae) {313 if (!ae.InnerExceptions.All(e => e is TaskCanceledException)) throw ae; // for some reason the WaitAll throws a AggregateException containg a TaskCanceledException. i don't know where it comes from, however the tasks all finish properly, so for now just ignore it314 }315 312 refreshableJob.Job.Modified = false; 316 }317 finally {318 313 refreshableJob.IsProgressing = false; 319 314 refreshableJob.Progress.Finish(); … … 405 400 var tasks = new List<TS.Task>(); 406 401 foreach (HiveTask child in hiveTask.ChildHiveTasks) { 407 tasks.Add(TS.Task.Factory.StartNew((tuple) => {402 var task = TS.Task.Factory.StartNew((tuple) => { 408 403 var arguments = (Tuple<HiveTask, HiveTask>)tuple; 409 404 UploadTaskWithChildren(progress, arguments.Item1, arguments.Item2, groups, taskCount, totalJobCount, configPluginId, jobId, log, isPrivileged, cancellationToken); 410 }, new Tuple<HiveTask, HiveTask>(child, hiveTask)) 411 .ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted)); 405 }, new Tuple<HiveTask, HiveTask>(child, hiveTask)); 406 task.ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted); 407 tasks.Add(task); 412 408 } 413 409 taskUploadSemaphore.Release(); semaphoreReleased = true; // the semaphore has to be release before waitall! 414 try { 415 TS.Task.WaitAll(tasks.ToArray()); 416 } 417 catch (AggregateException ae) { 418 if (!ae.InnerExceptions.All(e => e is TaskCanceledException)) throw ae; // for some reason the WaitAll throws a AggregateException containg a TaskCanceledException. i don't know where it comes from, however the tasks all finish properly, so for now just ignore it 419 } 410 TS.Task.WaitAll(tasks.ToArray()); 420 411 } 421 412 finally { -
trunk/sources/HeuristicLab.Clients.Hive/3.3/HiveTasks/HiveTask.cs
r8871 r8939 39 39 protected static object locker = new object(); 40 40 protected ReaderWriterLockSlim childHiveTasksLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); 41 protected ReaderWriterLockSlim itemTaskLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); 41 42 42 43 public static new Image StaticItemImage { … … 83 84 set { 84 85 if (itemTask != null && syncTasksWithOptimizers) { 85 this.childHiveTasks.Clear(); 86 childHiveTasksLock.EnterWriteLock(); 87 try { 88 childHiveTasks.Clear(); 89 } 90 finally { childHiveTasksLock.ExitWriteLock(); } 86 91 } 87 92 if (itemTask != value) { 88 DergisterItemTaskEvents(); 89 itemTask = value; 90 RegisterItemTaskEvents(); 93 itemTaskLock.EnterWriteLock(); 94 try { 95 DergisterItemTaskEvents(); 96 itemTask = value; 97 RegisterItemTaskEvents(); 98 } 99 finally { itemTaskLock.ExitWriteLock(); } 91 100 OnItemTaskChanged(); 92 101 IsFinishedTaskDownloaded = true; 93 102 } 103 94 104 } 95 105 } -
trunk/sources/HeuristicLab.Clients.Hive/3.3/HiveTasks/OptimizerHiveTask.cs
r8884 r8939 31 31 namespace HeuristicLab.Clients.Hive { 32 32 public class OptimizerHiveTask : HiveTask<OptimizerTask> { 33 34 Object batchRunLocker = new Object();35 36 33 #region Constructors and Cloning 37 34 public OptimizerHiveTask() { } … … 60 57 protected override void UpdateChildHiveTasks() { 61 58 base.UpdateChildHiveTasks(); 62 if (Task != null && syncTasksWithOptimizers) { 63 if (!ItemTask.ComputeInParallel) { 64 this.childHiveTasks.Clear(); 65 } else { 66 if (ItemTask.Item is Optimization.Experiment) { 67 Optimization.Experiment experiment = (Optimization.Experiment)ItemTask.Item; 68 foreach (IOptimizer childOpt in experiment.Optimizers) { 69 var optimizerHiveTask = new OptimizerHiveTask(childOpt); 70 optimizerHiveTask.Task.Priority = Task.Priority; //inherit priority from parent 71 this.childHiveTasks.Add(optimizerHiveTask); 72 } 73 } else if (ItemTask.Item is Optimization.BatchRun) { 74 Optimization.BatchRun batchRun = ItemTask.OptimizerAsBatchRun; 75 if (batchRun.Optimizer != null) { 76 while (this.childHiveTasks.Count < batchRun.Repetitions) { 77 var optimizerHiveTask = new OptimizerHiveTask(batchRun.Optimizer); 78 optimizerHiveTask.Task.Priority = Task.Priority; 59 childHiveTasksLock.EnterWriteLock(); 60 try { 61 if (Task != null && syncTasksWithOptimizers) { 62 if (!ItemTask.ComputeInParallel) { 63 this.childHiveTasks.Clear(); 64 } else { 65 if (ItemTask.Item is Optimization.Experiment) { 66 Optimization.Experiment experiment = (Optimization.Experiment)ItemTask.Item; 67 foreach (IOptimizer childOpt in experiment.Optimizers) { 68 var optimizerHiveTask = new OptimizerHiveTask(childOpt); 69 optimizerHiveTask.Task.Priority = Task.Priority; //inherit priority from parent 79 70 this.childHiveTasks.Add(optimizerHiveTask); 80 71 } 81 while (this.childHiveTasks.Count > batchRun.Repetitions) { 82 this.childHiveTasks.Remove(this.childHiveTasks.Last()); 72 } else if (ItemTask.Item is Optimization.BatchRun) { 73 Optimization.BatchRun batchRun = ItemTask.OptimizerAsBatchRun; 74 if (batchRun.Optimizer != null) { 75 while (this.childHiveTasks.Count < batchRun.Repetitions) { 76 var optimizerHiveTask = new OptimizerHiveTask(batchRun.Optimizer); 77 optimizerHiveTask.Task.Priority = Task.Priority; 78 this.childHiveTasks.Add(optimizerHiveTask); 79 } 80 while (this.childHiveTasks.Count > batchRun.Repetitions) { 81 this.childHiveTasks.Remove(this.childHiveTasks.Last()); 82 } 83 83 } 84 84 } 85 85 } 86 86 } 87 } 88 finally { 89 childHiveTasksLock.ExitWriteLock(); 87 90 } 88 91 } … … 223 226 /// </summary> 224 227 private void UpdateOptimizerInBatchRun(BatchRun batchRun, OptimizerTask optimizerTask) { 225 if (batchRun.Optimizer == null) { 226 batchRun.Optimizer = (IOptimizer)optimizerTask.Item; // only set the first optimizer as Optimizer. if every time the Optimizer would be set, the runs would be cleared each time 227 } 228 lock (batchRunLocker) { 228 itemTaskLock.EnterWriteLock(); 229 try { 230 if (batchRun.Optimizer == null) { 231 batchRun.Optimizer = (IOptimizer)optimizerTask.Item; // only set the first optimizer as Optimizer. if every time the Optimizer would be set, the runs would be cleared each time 232 } 229 233 foreach (IRun run in optimizerTask.Item.Runs) { 230 234 if (!batchRun.Runs.Contains(run)) { … … 234 238 } 235 239 } 240 finally { 241 itemTaskLock.ExitWriteLock(); 242 } 236 243 } 237 244 … … 241 248 /// </summary> 242 249 private void UpdateOptimizerInExperiment(Optimization.Experiment experiment, OptimizerTask optimizerTask) { 243 if (optimizerTask.IndexInParentOptimizerList < 0) 244 throw new IndexOutOfRangeException("IndexInParentOptimizerList must be equal or greater than zero! The Task is invalid and the optimizer-tree cannot be reassembled."); 245 246 while (experiment.Optimizers.Count < optimizerTask.IndexInParentOptimizerList) { 247 experiment.Optimizers.Add(new UserDefinedAlgorithm("Placeholder")); // add dummy-entries to Optimizers so that its possible to insert the optimizerTask at the correct position 248 } 249 if (experiment.Optimizers.Count < optimizerTask.IndexInParentOptimizerList + 1) { 250 experiment.Optimizers.Add(optimizerTask.Item); 251 } else { 252 // if ComputeInParallel==true, don't replace the optimizer (except it is still a Placeholder) 253 // this is because Jobs with ComputeInParallel get submitted to hive with their child-optimizers deleted 254 if (!optimizerTask.ComputeInParallel || experiment.Optimizers[optimizerTask.IndexInParentOptimizerList].Name == "Placeholder") { 255 experiment.Optimizers[optimizerTask.IndexInParentOptimizerList] = optimizerTask.Item; 256 } 250 itemTaskLock.EnterWriteLock(); 251 try { 252 if (optimizerTask.IndexInParentOptimizerList < 0) 253 throw new IndexOutOfRangeException("IndexInParentOptimizerList must be equal or greater than zero! The Task is invalid and the optimizer-tree cannot be reassembled."); 254 255 while (experiment.Optimizers.Count < optimizerTask.IndexInParentOptimizerList) { 256 experiment.Optimizers.Add(new UserDefinedAlgorithm("Placeholder")); // add dummy-entries to Optimizers so that its possible to insert the optimizerTask at the correct position 257 } 258 if (experiment.Optimizers.Count < optimizerTask.IndexInParentOptimizerList + 1) { 259 experiment.Optimizers.Add(optimizerTask.Item); 260 } else { 261 // if ComputeInParallel==true, don't replace the optimizer (except it is still a Placeholder) 262 // this is because Jobs with ComputeInParallel get submitted to hive with their child-optimizers deleted 263 if (!optimizerTask.ComputeInParallel || experiment.Optimizers[optimizerTask.IndexInParentOptimizerList].Name == "Placeholder") { 264 experiment.Optimizers[optimizerTask.IndexInParentOptimizerList] = optimizerTask.Item; 265 } 266 } 267 } 268 finally { 269 itemTaskLock.ExitWriteLock(); 257 270 } 258 271 } … … 365 378 } 366 379 380 public void ExecuteReadActionOnItemTask(Action action) { 381 itemTaskLock.EnterReadLock(); 382 try { 383 action(); 384 } 385 finally { 386 itemTaskLock.ExitReadLock(); 387 } 388 } 389 367 390 #region Helpers 368 391 /// <summary> -
trunk/sources/HeuristicLab.Clients.Hive/3.3/RefreshableJob.cs
r8914 r8939 36 36 private object locker = new object(); 37 37 private object downloadFinishedLocker = new object(); 38 object jobResultReceivedLocker = new object(); 38 39 39 40 public bool IsProgressing { get; set; } … … 261 262 262 263 private void jobResultPoller_JobResultReceived(object sender, EventArgs<IEnumerable<LightweightTask>> e) { 263 foreach (LightweightTask lightweightTask in e.Value) { 264 HiveTask hiveTask = GetHiveTaskById(lightweightTask.Id); 265 if (hiveTask != null) { 266 // lastJobDataUpdate equals DateTime.MinValue right after it was uploaded. When the first results are polled, this value is updated 267 if (hiveTask.Task.State == TaskState.Offline && lightweightTask.State != TaskState.Finished && lightweightTask.State != TaskState.Failed && lightweightTask.State != TaskState.Aborted) { 268 hiveTask.Task.LastTaskDataUpdate = lightweightTask.LastTaskDataUpdate; 269 } 270 271 hiveTask.UpdateFromLightweightJob(lightweightTask); 272 273 if (!hiveTask.IsFinishedTaskDownloaded && !hiveTask.IsDownloading && hiveTask.Task.LastTaskDataUpdate < lightweightTask.LastTaskDataUpdate) { 274 log.LogMessage(string.Format("Downloading task {0}", lightweightTask.Id)); 275 hiveTask.IsDownloading = true; 276 jobDownloader.DownloadTaskData(hiveTask.Task, (localJob, itemJob) => { 277 lock (downloadFinishedLocker) { 278 log.LogMessage(string.Format("Finished downloading task {0}", localJob.Id)); 279 HiveTask localHiveTask = GetHiveTaskById(localJob.Id); 280 281 if (itemJob == null) { 282 // something bad happened to this task. bad task, BAAAD task! 283 localHiveTask.IsDownloading = false; 284 } else { 285 // if the task is paused, download but don't integrate into parent optimizer (to avoid Prepare) 286 if (localJob.State == TaskState.Paused) { 287 localHiveTask.ItemTask = itemJob; 264 lock (jobResultReceivedLocker) { 265 foreach (LightweightTask lightweightTask in e.Value) { 266 HiveTask hiveTask = GetHiveTaskById(lightweightTask.Id); 267 if (hiveTask != null) { 268 // lastJobDataUpdate equals DateTime.MinValue right after it was uploaded. When the first results are polled, this value is updated 269 if (hiveTask.Task.State == TaskState.Offline && lightweightTask.State != TaskState.Finished && lightweightTask.State != TaskState.Failed && lightweightTask.State != TaskState.Aborted) { 270 hiveTask.Task.LastTaskDataUpdate = lightweightTask.LastTaskDataUpdate; 271 } 272 273 hiveTask.UpdateFromLightweightJob(lightweightTask); 274 275 if (!hiveTask.IsFinishedTaskDownloaded && !hiveTask.IsDownloading && hiveTask.Task.LastTaskDataUpdate < lightweightTask.LastTaskDataUpdate) { 276 log.LogMessage(string.Format("Downloading task {0}", lightweightTask.Id)); 277 hiveTask.IsDownloading = true; 278 jobDownloader.DownloadTaskData(hiveTask.Task, (localJob, itemJob) => { 279 lock (downloadFinishedLocker) { 280 log.LogMessage(string.Format("Finished downloading task {0}", localJob.Id)); 281 HiveTask localHiveTask = GetHiveTaskById(localJob.Id); 282 283 if (itemJob == null) { 284 // something bad happened to this task. bad task, BAAAD task! 285 localHiveTask.IsDownloading = false; 288 286 } else { 289 if (localJob.ParentTaskId.HasValue) {290 HiveTask parentHiveTask = GetHiveTaskById(localJob.ParentTaskId.Value);291 parentHiveTask.IntegrateChild(itemJob, localJob.Id);287 // if the task is paused, download but don't integrate into parent optimizer (to avoid Prepare) 288 if (localJob.State == TaskState.Paused) { 289 localHiveTask.ItemTask = itemJob; 292 290 } else { 293 localHiveTask.ItemTask = itemJob; 291 if (localJob.ParentTaskId.HasValue) { 292 HiveTask parentHiveTask = GetHiveTaskById(localJob.ParentTaskId.Value); 293 parentHiveTask.IntegrateChild(itemJob, localJob.Id); 294 } else { 295 localHiveTask.ItemTask = itemJob; 296 } 294 297 } 298 localHiveTask.IsDownloading = false; 299 localHiveTask.Task.LastTaskDataUpdate = lightweightTask.LastTaskDataUpdate; 295 300 } 296 localHiveTask.IsDownloading = false;297 localHiveTask.Task.LastTaskDataUpdate = lightweightTask.LastTaskDataUpdate;298 301 } 299 } 300 }); 301 } 302 } 303 } 304 GC.Collect(); // force GC, because .NET is too lazy here (deserialization takes a lot of memory) 305 if (AllJobsFinished()) { 306 this.ExecutionState = Core.ExecutionState.Stopped; 307 StopResultPolling(); 308 } 309 UpdateTotalExecutionTime(); 310 UpdateStatistics(); 311 OnStateLogListChanged(); 312 OnTaskReceived(); 302 }); 303 } 304 } else 305 throw new Exception("This should not happen"); 306 } 307 GC.Collect(); // force GC, because .NET is too lazy here (deserialization takes a lot of memory) 308 if (AllJobsFinished()) { 309 this.ExecutionState = Core.ExecutionState.Stopped; 310 StopResultPolling(); 311 } 312 UpdateTotalExecutionTime(); 313 UpdateStatistics(); 314 OnStateLogListChanged(); 315 OnTaskReceived(); 316 } 313 317 } 314 318
Note: See TracChangeset
for help on using the changeset viewer.