Changeset 6725 for branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.3/HiveClient.cs
- Timestamp:
- 09/08/11 16:38:28 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.3/HiveClient.cs
r6723 r6725 45 45 46 46 #region Properties 47 private ItemCollection<Refreshable HiveExperiment> hiveExperiments;48 public ItemCollection<Refreshable HiveExperiment> HiveExperiments {49 get { return hiveExperiments; }47 private ItemCollection<RefreshableJob> jobs; 48 public ItemCollection<RefreshableJob> Jobs { 49 get { return jobs; } 50 50 set { 51 if (value != hiveExperiments) {52 hiveExperiments = value;51 if (value != jobs) { 52 jobs = value; 53 53 OnHiveExperimentsChanged(); 54 54 } … … 84 84 this.IsAllowedPrivileged = ServiceLocator.Instance.CallHiveService((s) => s.IsAllowedPrivileged()); 85 85 86 var oldExperiments = hiveExperiments ?? new ItemCollection<RefreshableHiveExperiment>();87 hiveExperiments = new HiveItemCollection<RefreshableHiveExperiment>();86 var oldExperiments = jobs ?? new ItemCollection<RefreshableJob>(); 87 jobs = new HiveItemCollection<RefreshableJob>(); 88 88 var experimentsLoaded = ServiceLocator.Instance.CallHiveService<IEnumerable<Job>>(s => s.GetHiveExperiments()); 89 89 … … 92 92 if (hiveExperiment == null) { 93 93 // new 94 hiveExperiments.Add(new RefreshableHiveExperiment(he) { IsAllowedPrivileged = this.isAllowedPrivileged });94 jobs.Add(new RefreshableJob(he) { IsAllowedPrivileged = this.isAllowedPrivileged }); 95 95 } else { 96 96 // update 97 hiveExperiment. HiveExperiment= he;97 hiveExperiment.Job = he; 98 98 hiveExperiment.IsAllowedPrivileged = this.isAllowedPrivileged; 99 hiveExperiments.Add(hiveExperiment);99 jobs.Add(hiveExperiment); 100 100 } 101 101 } … … 104 104 if (experiment.Id == Guid.Empty) { 105 105 // experiment not uploaded... keep 106 hiveExperiments.Add(experiment);106 jobs.Add(experiment); 107 107 } else { 108 108 experiment.RefreshAutomatically = false; // stop results polling … … 111 111 } 112 112 catch { 113 hiveExperiments = null;113 jobs = null; 114 114 throw; 115 115 } … … 138 138 public static void Store(IHiveItem item, CancellationToken cancellationToken) { 139 139 if (item.Id == Guid.Empty) { 140 if (item is Refreshable HiveExperiment) {141 HiveClient.Instance.UploadExperiment((Refreshable HiveExperiment)item, cancellationToken);140 if (item is RefreshableJob) { 141 HiveClient.Instance.UploadExperiment((RefreshableJob)item, cancellationToken); 142 142 } 143 143 if (item is JobPermission) { … … 178 178 if (item is Job) 179 179 ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(item.Id)); 180 if (item is Refreshable HiveExperiment)180 if (item is RefreshableJob) 181 181 ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(item.Id)); 182 182 if (item is JobPermission) { … … 206 206 #endregion 207 207 208 public static void StartExperiment(Action<Exception> exceptionCallback, Refreshable HiveExperiment refreshableHiveExperiment, CancellationToken cancellationToken) {208 public static void StartExperiment(Action<Exception> exceptionCallback, RefreshableJob refreshableJob, CancellationToken cancellationToken) { 209 209 HiveClient.StoreAsync( 210 210 new Action<Exception>((Exception ex) => { 211 refreshable HiveExperiment.ExecutionState = ExecutionState.Prepared;211 refreshableJob.ExecutionState = ExecutionState.Prepared; 212 212 exceptionCallback(ex); 213 }), refreshable HiveExperiment, cancellationToken);214 refreshable HiveExperiment.ExecutionState = ExecutionState.Started;215 } 216 217 public static void PauseExperiment(Refreshable HiveExperimentrefreshableHiveExperiment) {213 }), refreshableJob, cancellationToken); 214 refreshableJob.ExecutionState = ExecutionState.Started; 215 } 216 217 public static void PauseExperiment(RefreshableJob refreshableHiveExperiment) { 218 218 ServiceLocator.Instance.CallHiveService(service => { 219 foreach (Hive Jobjob in refreshableHiveExperiment.GetAllHiveJobs()) {220 if (job. Job.State != TaskState.Finished && job.Job.State != TaskState.Aborted && job.Job.State != TaskState.Failed)221 service.PauseJob(job. Job.Id);219 foreach (HiveTask job in refreshableHiveExperiment.GetAllHiveJobs()) { 220 if (job.Task.State != TaskState.Finished && job.Task.State != TaskState.Aborted && job.Task.State != TaskState.Failed) 221 service.PauseJob(job.Task.Id); 222 222 } 223 223 }); … … 225 225 } 226 226 227 public static void StopExperiment(Refreshable HiveExperiment refreshableHiveExperiment) {227 public static void StopExperiment(RefreshableJob refreshableJob) { 228 228 ServiceLocator.Instance.CallHiveService(service => { 229 foreach (Hive Job job in refreshableHiveExperiment.GetAllHiveJobs()) {230 if (job. Job.State != TaskState.Finished && job.Job.State != TaskState.Aborted && job.Job.State != TaskState.Failed)231 service.StopJob(job. Job.Id);229 foreach (HiveTask job in refreshableJob.GetAllHiveJobs()) { 230 if (job.Task.State != TaskState.Finished && job.Task.State != TaskState.Aborted && job.Task.State != TaskState.Failed) 231 service.StopJob(job.Task.Id); 232 232 } 233 233 }); … … 239 239 private static object jobCountLocker = new object(); 240 240 private static object pluginLocker = new object(); 241 private void UploadExperiment(Refreshable HiveExperiment refreshableHiveExperiment, CancellationToken cancellationToken) {241 private void UploadExperiment(RefreshableJob refreshableJob, CancellationToken cancellationToken) { 242 242 try { 243 refreshable HiveExperiment.Progress = new Progress("Connecting to server...");244 refreshable HiveExperiment.IsProgressing = true;245 246 IEnumerable<string> resourceNames = ToResourceNameList(refreshable HiveExperiment.HiveExperiment.ResourceNames);243 refreshableJob.Progress = new Progress("Connecting to server..."); 244 refreshableJob.IsProgressing = true; 245 246 IEnumerable<string> resourceNames = ToResourceNameList(refreshableJob.Job.ResourceNames); 247 247 var resourceIds = new List<Guid>(); 248 248 foreach (var resourceName in resourceNames) { … … 254 254 } 255 255 256 foreach (OptimizerHive Job hiveJob in refreshableHiveExperiment.HiveJobs.OfType<OptimizerHiveJob>()) {256 foreach (OptimizerHiveTask hiveJob in refreshableJob.HiveJobs.OfType<OptimizerHiveTask>()) { 257 257 hiveJob.SetIndexInParentOptimizerList(null); 258 258 } 259 259 260 // upload Job261 refreshable HiveExperiment.Progress.Status = "Uploading Job...";262 refreshable HiveExperiment.HiveExperiment.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddHiveExperiment(refreshableHiveExperiment.HiveExperiment));263 bool isPrivileged = refreshable HiveExperiment.HiveExperiment.IsPrivileged;264 refreshable HiveExperiment.HiveExperiment = ServiceLocator.Instance.CallHiveService((s) => s.GetHiveExperiment(refreshableHiveExperiment.HiveExperiment.Id)); // update owner and permissions265 refreshable HiveExperiment.HiveExperiment.IsPrivileged = isPrivileged;266 cancellationToken.ThrowIfCancellationRequested(); 267 268 int totalJobCount = refreshable HiveExperiment.GetAllHiveJobs().Count();260 // upload Task 261 refreshableJob.Progress.Status = "Uploading Task..."; 262 refreshableJob.Job.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddHiveExperiment(refreshableJob.Job)); 263 bool isPrivileged = refreshableJob.Job.IsPrivileged; 264 refreshableJob.Job = ServiceLocator.Instance.CallHiveService((s) => s.GetHiveExperiment(refreshableJob.Job.Id)); // update owner and permissions 265 refreshableJob.Job.IsPrivileged = isPrivileged; 266 cancellationToken.ThrowIfCancellationRequested(); 267 268 int totalJobCount = refreshableJob.GetAllHiveJobs().Count(); 269 269 int[] jobCount = new int[1]; // use a reference type (int-array) instead of value type (int) in order to pass the value via a delegate to task-parallel-library 270 270 cancellationToken.ThrowIfCancellationRequested(); 271 271 272 272 // upload plugins 273 refreshable HiveExperiment.Progress.Status = "Uploading plugins...";273 refreshableJob.Progress.Status = "Uploading plugins..."; 274 274 this.OnlinePlugins = ServiceLocator.Instance.CallHiveService((s) => s.GetPlugins()); 275 275 this.AlreadyUploadedPlugins = new List<Plugin>(); … … 278 278 cancellationToken.ThrowIfCancellationRequested(); 279 279 280 if (refreshable HiveExperiment.RefreshAutomatically) refreshableHiveExperiment.StartResultPolling();280 if (refreshableJob.RefreshAutomatically) refreshableJob.StartResultPolling(); 281 281 282 282 // upload jobs 283 refreshable HiveExperiment.Progress.Status = "Uploading jobs...";283 refreshableJob.Progress.Status = "Uploading jobs..."; 284 284 285 285 var tasks = new List<TS.Task>(); 286 foreach (Hive Job hiveJob in refreshableHiveExperiment.HiveJobs) {286 foreach (HiveTask hiveJob in refreshableJob.HiveJobs) { 287 287 tasks.Add(TS.Task.Factory.StartNew((hj) => { 288 UploadJobWithChildren(refreshable HiveExperiment.Progress, (HiveJob)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableHiveExperiment.HiveExperiment.Id, refreshableHiveExperiment.Log, refreshableHiveExperiment.HiveExperiment.IsPrivileged, cancellationToken);288 UploadJobWithChildren(refreshableJob.Progress, (HiveTask)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, refreshableJob.Job.IsPrivileged, cancellationToken); 289 289 }, hiveJob) 290 .ContinueWith((x) => refreshable HiveExperiment.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted));290 .ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted)); 291 291 } 292 292 try { … … 296 296 if (!ae.InnerExceptions.All(e => e is TaskCanceledException)) throw ae; // for some reason the WaitAll throws a AggregateException containg a TaskCanceledException. i don't know where it comes from, however the tasks all finish properly, so for now just ignore it 297 297 } 298 refreshable HiveExperiment.HiveExperiment.Modified = false;298 refreshableJob.Job.Modified = false; 299 299 } 300 300 finally { 301 refreshable HiveExperiment.IsProgressing = false;301 refreshableJob.IsProgressing = false; 302 302 } 303 303 } … … 331 331 332 332 /// <summary> 333 /// Uploads the given joband all its child-jobs while setting the proper parentJobId values for the childs333 /// Uploads the given task and all its child-jobs while setting the proper parentJobId values for the childs 334 334 /// </summary> 335 /// <param name="parentHive Job">shall be null if its the root job</param>336 private void UploadJobWithChildren(IProgress progress, Hive Job hiveJob, HiveJobparentHiveJob, IEnumerable<Guid> groups, int[] jobCount, int totalJobCount, Guid configPluginId, Guid hiveExperimentId, ILog log, bool isPrivileged, CancellationToken cancellationToken) {335 /// <param name="parentHiveTask">shall be null if its the root task</param> 336 private void UploadJobWithChildren(IProgress progress, HiveTask hiveJob, HiveTask parentHiveJob, IEnumerable<Guid> groups, int[] jobCount, int totalJobCount, Guid configPluginId, Guid hiveExperimentId, ILog log, bool isPrivileged, CancellationToken cancellationToken) { 337 337 jobUploadSemaphore.WaitOne(); 338 338 bool semaphoreReleased = false; … … 345 345 List<IPluginDescription> plugins; 346 346 347 if (hiveJob.Item Job.ComputeInParallel && (hiveJob.ItemJob.Item is Optimization.Experiment || hiveJob.ItemJob.Item is Optimization.BatchRun)) {348 hiveJob. Job.IsParentJob= true;349 hiveJob. Job.FinishWhenChildJobsFinished = true;350 jobData = hiveJob.GetAs JobData(true, out plugins);347 if (hiveJob.ItemTask.ComputeInParallel && (hiveJob.ItemTask.Item is Optimization.Experiment || hiveJob.ItemTask.Item is Optimization.BatchRun)) { 348 hiveJob.Task.IsParentTask = true; 349 hiveJob.Task.FinishWhenChildJobsFinished = true; 350 jobData = hiveJob.GetAsTaskData(true, out plugins); 351 351 } else { 352 hiveJob. Job.IsParentJob= false;353 hiveJob. Job.FinishWhenChildJobsFinished = false;354 jobData = hiveJob.GetAs JobData(false, out plugins);352 hiveJob.Task.IsParentTask = false; 353 hiveJob.Task.FinishWhenChildJobsFinished = false; 354 jobData = hiveJob.GetAsTaskData(false, out plugins); 355 355 } 356 356 cancellationToken.ThrowIfCancellationRequested(); … … 359 359 if (!cancellationToken.IsCancellationRequested) { 360 360 lock (pluginLocker) { 361 ServiceLocator.Instance.CallHiveService((s) => hiveJob. Job.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins));361 ServiceLocator.Instance.CallHiveService((s) => hiveJob.Task.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins)); 362 362 } 363 363 } 364 364 }, -1, "Failed to upload plugins"); 365 365 cancellationToken.ThrowIfCancellationRequested(); 366 hiveJob. Job.PluginsNeededIds.Add(configPluginId);367 hiveJob. Job.JobId = hiveExperimentId;368 hiveJob. Job.IsPrivileged = isPrivileged;369 370 log.LogMessage(string.Format("Uploading job ({0} kb, {1} objects)", jobData.Data.Count() / 1024, hiveJob.ItemJob.GetObjectGraphObjects().Count()));366 hiveJob.Task.PluginsNeededIds.Add(configPluginId); 367 hiveJob.Task.JobId = hiveExperimentId; 368 hiveJob.Task.IsPrivileged = isPrivileged; 369 370 log.LogMessage(string.Format("Uploading task ({0} kb, {1} objects)", jobData.Data.Count() / 1024, hiveJob.ItemTask.GetObjectGraphObjects().Count())); 371 371 TryAndRepeat(() => { 372 372 if (!cancellationToken.IsCancellationRequested) { 373 373 if (parentHiveJob != null) { 374 hiveJob. Job.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddChildJob(parentHiveJob.Job.Id, hiveJob.Job, jobData));374 hiveJob.Task.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddChildJob(parentHiveJob.Task.Id, hiveJob.Task, jobData)); 375 375 } else { 376 hiveJob. Job.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddJob(hiveJob.Job, jobData, groups.ToList()));376 hiveJob.Task.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddJob(hiveJob.Task, jobData, groups.ToList())); 377 377 } 378 378 } 379 }, 50, "Failed to add job", log);379 }, 50, "Failed to add task", log); 380 380 cancellationToken.ThrowIfCancellationRequested(); 381 381 382 382 lock (jobCountLocker) { 383 383 progress.ProgressValue = (double)jobCount[0] / totalJobCount; 384 progress.Status = string.Format("Uploaded job({0} of {1})", jobCount[0], totalJobCount);384 progress.Status = string.Format("Uploaded task ({0} of {1})", jobCount[0], totalJobCount); 385 385 } 386 386 387 387 var tasks = new List<TS.Task>(); 388 foreach (Hive Job child in hiveJob.ChildHiveJobs) {388 foreach (HiveTask child in hiveJob.ChildHiveTasks) { 389 389 tasks.Add(TS.Task.Factory.StartNew((tuple) => { 390 var arguments = (Tuple<Hive Job, HiveJob>)tuple;390 var arguments = (Tuple<HiveTask, HiveTask>)tuple; 391 391 UploadJobWithChildren(progress, arguments.Item1, arguments.Item2, groups, jobCount, totalJobCount, configPluginId, hiveExperimentId, log, isPrivileged, cancellationToken); 392 }, new Tuple<Hive Job, HiveJob>(child, hiveJob))392 }, new Tuple<HiveTask, HiveTask>(child, hiveJob)) 393 393 .ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted)); 394 394 } … … 408 408 409 409 #region Download Experiment 410 public static void LoadExperiment(Refreshable HiveExperiment refreshableHiveExperiment) {411 var hiveExperiment = refreshable HiveExperiment.HiveExperiment;412 refreshable HiveExperiment.Progress = new Progress();410 public static void LoadExperiment(RefreshableJob refreshableJob) { 411 var hiveExperiment = refreshableJob.Job; 412 refreshableJob.Progress = new Progress(); 413 413 414 414 try { 415 refreshable HiveExperiment.IsProgressing = true;415 refreshableJob.IsProgressing = true; 416 416 int totalJobCount = 0; 417 417 IEnumerable<LightweightTask> allJobs; 418 418 419 refreshable HiveExperiment.Progress.Status = "Connecting to Server...";420 // fetch all Task objects to create the full tree of tree of Hive Jobobjects421 refreshable HiveExperiment.Progress.Status = "Downloading list of jobs...";419 refreshableJob.Progress.Status = "Connecting to Server..."; 420 // fetch all Task objects to create the full tree of tree of HiveTask objects 421 refreshableJob.Progress.Status = "Downloading list of jobs..."; 422 422 allJobs = ServiceLocator.Instance.CallHiveService(s => s.GetLightweightExperimentJobs(hiveExperiment.Id)); 423 423 totalJobCount = allJobs.Count(); 424 424 425 HiveJobDownloader downloader = new HiveJobDownloader(allJobs.Select(x => x.Id));425 TaskDownloader downloader = new TaskDownloader(allJobs.Select(x => x.Id)); 426 426 downloader.StartAsync(); 427 427 428 428 while (!downloader.IsFinished) { 429 refreshable HiveExperiment.Progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount;430 refreshable HiveExperiment.Progress.Status = string.Format("Downloading/deserializing jobs... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount);429 refreshableJob.Progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount; 430 refreshableJob.Progress.Status = string.Format("Downloading/deserializing jobs... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount); 431 431 Thread.Sleep(500); 432 432 … … 435 435 } 436 436 } 437 IDictionary<Guid, Hive Job> allHiveJobs = downloader.Results;438 439 refreshable HiveExperiment.HiveJobs = new ItemCollection<HiveJob>(allHiveJobs.Values.Where(x => !x.Job.ParentTaskId.HasValue));440 441 if (refreshable HiveExperiment.IsFinished()) {442 refreshable HiveExperiment.ExecutionState = Core.ExecutionState.Stopped;437 IDictionary<Guid, HiveTask> allHiveJobs = downloader.Results; 438 439 refreshableJob.HiveJobs = new ItemCollection<HiveTask>(allHiveJobs.Values.Where(x => !x.Task.ParentTaskId.HasValue)); 440 441 if (refreshableJob.IsFinished()) { 442 refreshableJob.ExecutionState = Core.ExecutionState.Stopped; 443 443 } else { 444 refreshable HiveExperiment.ExecutionState = Core.ExecutionState.Started;445 } 446 447 // build child- jobtree448 foreach (Hive Job hiveJob in refreshableHiveExperiment.HiveJobs) {444 refreshableJob.ExecutionState = Core.ExecutionState.Started; 445 } 446 447 // build child-task tree 448 foreach (HiveTask hiveJob in refreshableJob.HiveJobs) { 449 449 BuildHiveJobTree(hiveJob, allJobs, allHiveJobs); 450 450 } 451 451 452 refreshable HiveExperiment.OnLoaded();452 refreshableJob.OnLoaded(); 453 453 } 454 454 finally { 455 refreshable HiveExperiment.IsProgressing = false;456 } 457 } 458 459 private static void BuildHiveJobTree(Hive Job parentHiveJob, IEnumerable<LightweightTask> allJobs, IDictionary<Guid, HiveJob> allHiveJobs) {455 refreshableJob.IsProgressing = false; 456 } 457 } 458 459 private static void BuildHiveJobTree(HiveTask parentHiveJob, IEnumerable<LightweightTask> allJobs, IDictionary<Guid, HiveTask> allHiveJobs) { 460 460 IEnumerable<LightweightTask> childJobs = from job in allJobs 461 where job.ParentTaskId.HasValue && job.ParentTaskId.Value == parentHiveJob. Job.Id461 where job.ParentTaskId.HasValue && job.ParentTaskId.Value == parentHiveJob.Task.Id 462 462 orderby job.DateCreated ascending 463 463 select job; 464 464 foreach (LightweightTask job in childJobs) { 465 Hive JobchildHiveJob = allHiveJobs[job.Id];465 HiveTask childHiveJob = allHiveJobs[job.Id]; 466 466 parentHiveJob.AddChildHiveJob(childHiveJob); 467 467 BuildHiveJobTree(childHiveJob, allJobs, allHiveJobs); … … 481 481 } 482 482 483 public static Item JobLoadItemJob(Guid jobId) {483 public static ItemTask LoadItemJob(Guid jobId) { 484 484 TaskData jobData = ServiceLocator.Instance.CallHiveService(s => s.GetJobData(jobId)); 485 485 try { 486 return PersistenceUtil.Deserialize<Item Job>(jobData.Data);486 return PersistenceUtil.Deserialize<ItemTask>(jobData.Data); 487 487 } 488 488 catch {
Note: See TracChangeset
for help on using the changeset viewer.