- Timestamp:
- 04/16/13 13:13:41 (11 years ago)
- Location:
- branches/OaaS
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/OaaS
- Property svn:ignore
-
old new 21 21 protoc.exe 22 22 _ReSharper.HeuristicLab 3.3 Tests 23 Google.ProtocolBuffers-2.4.1.473.dll 23 24 packages
-
- Property svn:mergeinfo changed
- Property svn:ignore
-
branches/OaaS/HeuristicLab.Clients.Hive/3.3/HiveClient.cs
r8165 r9363 46 46 47 47 #region Properties 48 private ItemCollection<RefreshableJob> jobs;49 public ItemCollection<RefreshableJob> Jobs {48 private HiveItemCollection<RefreshableJob> jobs; 49 public HiveItemCollection<RefreshableJob> Jobs { 50 50 get { return jobs; } 51 51 set { 52 52 if (value != jobs) { 53 53 jobs = value; 54 OnHive ExperimentsChanged();54 OnHiveJobsChanged(); 55 55 } 56 56 } … … 76 76 #endregion 77 77 78 private HiveClient() { } 78 private HiveClient() { 79 //this will never be deregistered 80 TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException); 81 } 82 83 private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) { 84 e.SetObserved(); // avoid crash of process because task crashes. first exception found is handled in Results property 85 throw new HiveException("Unobserved Exception in ConcurrentTaskDownloader", e.Exception); 86 } 87 88 public void ClearHiveClient() { 89 Jobs.ClearWithoutHiveDeletion(); 90 foreach (var j in Jobs) { 91 if (j.RefreshAutomatically) { 92 j.RefreshAutomatically = false; // stop result polling 93 } 94 j.Dispose(); 95 } 96 Jobs = null; 97 98 if (onlinePlugins != null) 99 onlinePlugins.Clear(); 100 if (alreadyUploadedPlugins != null) 101 alreadyUploadedPlugins.Clear(); 102 } 79 103 80 104 #region Refresh … … 83 107 84 108 try { 85 this.IsAllowedPrivileged = HiveServiceLocator.Instance.CallHiveService((s) => s.IsAllowedPrivileged()); 86 87 var oldJobs = jobs ?? new ItemCollection<RefreshableJob>(); 109 IsAllowedPrivileged = HiveServiceLocator.Instance.CallHiveService((s) => s.IsAllowedPrivileged()); 110 88 111 jobs = new HiveItemCollection<RefreshableJob>(); 89 112 var jobsLoaded = HiveServiceLocator.Instance.CallHiveService<IEnumerable<Job>>(s => s.GetJobs()); 90 113 91 114 foreach (var j in jobsLoaded) { 92 var job = oldJobs.SingleOrDefault(x => x.Id == j.Id); 93 if (job == null) { 94 // new 95 jobs.Add(new RefreshableJob(j) { IsAllowedPrivileged = this.isAllowedPrivileged }); 96 } else { 97 // update 98 job.Job = j; 99 job.IsAllowedPrivileged = this.isAllowedPrivileged; 100 jobs.Add(job); 101 } 102 } 103 // remove those which were not in the list of loaded hiveexperiments 104 foreach (var job in oldJobs) { 105 if (job.Id == Guid.Empty) { 106 // experiment not uploaded... keep 107 jobs.Add(job); 108 } else { 109 job.RefreshAutomatically = false; // stop results polling 110 } 115 jobs.Add(new RefreshableJob(j) { IsAllowedPrivileged = this.isAllowedPrivileged }); 111 116 } 112 117 } … … 119 124 } 120 125 } 126 121 127 public void RefreshAsync(Action<Exception> exceptionCallback) { 122 128 var call = new Func<Exception>(delegate() { … … 205 211 if (handler != null) handler(this, EventArgs.Empty); 206 212 } 207 public event EventHandler Hive ExperimentsChanged;208 private void OnHive ExperimentsChanged() {209 var handler = Hive ExperimentsChanged;213 public event EventHandler HiveJobsChanged; 214 private void OnHiveJobsChanged() { 215 var handler = HiveJobsChanged; 210 216 if (handler != null) handler(this, EventArgs.Empty); 211 217 } … … 294 300 cancellationToken.ThrowIfCancellationRequested(); 295 301 296 if (refreshableJob.RefreshAutomatically) refreshableJob.StartResultPolling();297 298 302 // upload tasks 299 303 refreshableJob.Progress.Status = "Uploading tasks..."; … … 301 305 var tasks = new List<TS.Task>(); 302 306 foreach (HiveTask hiveTask in refreshableJob.HiveTasks) { 303 tasks.Add(TS.Task.Factory.StartNew((hj) => {307 var task = TS.Task.Factory.StartNew((hj) => { 304 308 UploadTaskWithChildren(refreshableJob.Progress, (HiveTask)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, refreshableJob.Job.IsPrivileged, cancellationToken); 305 }, hiveTask) 306 .ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted)); 307 } 308 try { 309 TS.Task.WaitAll(tasks.ToArray()); 310 } 311 catch (AggregateException ae) { 312 if (!ae.InnerExceptions.All(e => e is TaskCanceledException)) throw ae; // for some reason the WaitAll throws a AggregateException containg a TaskCanceledException. i don't know where it comes from, however the tasks all finish properly, so for now just ignore it 313 } 309 }, hiveTask); 310 task.ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted); 311 tasks.Add(task); 312 } 313 TS.Task.WaitAll(tasks.ToArray()); 314 } 315 finally { 314 316 refreshableJob.Job.Modified = false; 315 }316 finally {317 317 refreshableJob.IsProgressing = false; 318 318 refreshableJob.Progress.Finish(); … … 404 404 var tasks = new List<TS.Task>(); 405 405 foreach (HiveTask child in hiveTask.ChildHiveTasks) { 406 tasks.Add(TS.Task.Factory.StartNew((tuple) => {406 var task = TS.Task.Factory.StartNew((tuple) => { 407 407 var arguments = (Tuple<HiveTask, HiveTask>)tuple; 408 408 UploadTaskWithChildren(progress, arguments.Item1, arguments.Item2, groups, taskCount, totalJobCount, configPluginId, jobId, log, isPrivileged, cancellationToken); 409 }, new Tuple<HiveTask, HiveTask>(child, hiveTask)) 410 .ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted)); 409 }, new Tuple<HiveTask, HiveTask>(child, hiveTask)); 410 task.ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted); 411 tasks.Add(task); 411 412 } 412 413 taskUploadSemaphore.Release(); semaphoreReleased = true; // the semaphore has to be release before waitall! 413 try { 414 TS.Task.WaitAll(tasks.ToArray()); 415 } 416 catch (AggregateException ae) { 417 if (!ae.InnerExceptions.All(e => e is TaskCanceledException)) throw ae; // for some reason the WaitAll throws a AggregateException containg a TaskCanceledException. i don't know where it comes from, however the tasks all finish properly, so for now just ignore it 418 } 414 TS.Task.WaitAll(tasks.ToArray()); 419 415 } 420 416 finally { … … 429 425 refreshableJob.IsProgressing = true; 430 426 refreshableJob.Progress = new Progress(); 427 TaskDownloader downloader = null; 431 428 432 429 try { … … 437 434 // fetch all task objects to create the full tree of tree of HiveTask objects 438 435 refreshableJob.Progress.Status = "Downloading list of tasks..."; 439 allTasks = HiveServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobTasks (hiveExperiment.Id));436 allTasks = HiveServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobTasksWithoutStateLog(hiveExperiment.Id)); 440 437 totalJobCount = allTasks.Count(); 441 438 442 439 refreshableJob.Progress.Status = "Downloading tasks..."; 443 TaskDownloaderdownloader = new TaskDownloader(allTasks.Select(x => x.Id));440 downloader = new TaskDownloader(allTasks.Select(x => x.Id)); 444 441 downloader.StartAsync(); 445 442 … … 473 470 refreshableJob.IsProgressing = false; 474 471 refreshableJob.Progress.Finish(); 472 if (downloader != null) { 473 downloader.Dispose(); 474 } 475 475 } 476 476 } … … 483 483 foreach (LightweightTask task in childTasks) { 484 484 HiveTask childHiveTask = allHiveTasks[task.Id]; 485 BuildHiveJobTree(childHiveTask, allTasks, allHiveTasks); 485 486 parentHiveTask.AddChildHiveTask(childHiveTask); 486 BuildHiveJobTree(childHiveTask, allTasks, allHiveTasks);487 487 } 488 488 }
Note: See TracChangeset
for help on using the changeset viewer.