Changeset 6743 for branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.3/HiveClient.cs
- Timestamp:
- 09/12/11 18:04:25 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.3/HiveClient.cs
r6725 r6743 86 86 var oldExperiments = jobs ?? new ItemCollection<RefreshableJob>(); 87 87 jobs = new HiveItemCollection<RefreshableJob>(); 88 var experimentsLoaded = ServiceLocator.Instance.CallHiveService<IEnumerable<Job>>(s => s.Get HiveExperiments());88 var experimentsLoaded = ServiceLocator.Instance.CallHiveService<IEnumerable<Job>>(s => s.GetJobs()); 89 89 90 90 foreach (var he in experimentsLoaded) { 91 var hiveExperiment= oldExperiments.SingleOrDefault(x => x.Id == he.Id);92 if ( hiveExperiment== null) {91 var job = oldExperiments.SingleOrDefault(x => x.Id == he.Id); 92 if (job == null) { 93 93 // new 94 94 jobs.Add(new RefreshableJob(he) { IsAllowedPrivileged = this.isAllowedPrivileged }); 95 95 } else { 96 96 // update 97 hiveExperiment.Job = he;98 hiveExperiment.IsAllowedPrivileged = this.isAllowedPrivileged;99 jobs.Add( hiveExperiment);97 job.Job = he; 98 job.IsAllowedPrivileged = this.isAllowedPrivileged; 99 jobs.Add(job); 100 100 } 101 101 } … … 139 139 if (item.Id == Guid.Empty) { 140 140 if (item is RefreshableJob) { 141 HiveClient.Instance.Upload Experiment((RefreshableJob)item, cancellationToken);141 HiveClient.Instance.UploadJob((RefreshableJob)item, cancellationToken); 142 142 } 143 143 if (item is JobPermission) { … … 151 151 } else { 152 152 if (item is Job) 153 ServiceLocator.Instance.CallHiveService(s => s.Update HiveExperiment((Job)item));153 ServiceLocator.Instance.CallHiveService(s => s.UpdateJob((Job)item)); 154 154 } 155 155 } … … 177 177 178 178 if (item is Job) 179 ServiceLocator.Instance.CallHiveService(s => s.Delete HiveExperiment(item.Id));179 ServiceLocator.Instance.CallHiveService(s => s.DeleteJob(item.Id)); 180 180 if (item is RefreshableJob) 181 ServiceLocator.Instance.CallHiveService(s => s.Delete HiveExperiment(item.Id));181 ServiceLocator.Instance.CallHiveService(s => s.DeleteJob(item.Id)); 182 182 if (item is JobPermission) { 183 183 var hep = (JobPermission)item; … … 206 206 #endregion 207 207 208 public static void Start Experiment(Action<Exception> exceptionCallback, RefreshableJob refreshableJob, CancellationToken cancellationToken) {208 public static void StartJob(Action<Exception> exceptionCallback, RefreshableJob refreshableJob, CancellationToken cancellationToken) { 209 209 HiveClient.StoreAsync( 210 210 new Action<Exception>((Exception ex) => { … … 215 215 } 216 216 217 public static void Pause Experiment(RefreshableJob refreshableHiveExperiment) {217 public static void PauseJob(RefreshableJob refreshableJob) { 218 218 ServiceLocator.Instance.CallHiveService(service => { 219 foreach (HiveTask job in refreshableHiveExperiment.GetAllHiveJobs()) {220 if ( job.Task.State != TaskState.Finished && job.Task.State != TaskState.Aborted && job.Task.State != TaskState.Failed)221 service.Pause Job(job.Task.Id);219 foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) { 220 if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed) 221 service.PauseTask(task.Task.Id); 222 222 } 223 223 }); 224 refreshable HiveExperiment.ExecutionState = ExecutionState.Paused;225 } 226 227 public static void Stop Experiment(RefreshableJob refreshableJob) {224 refreshableJob.ExecutionState = ExecutionState.Paused; 225 } 226 227 public static void StopJob(RefreshableJob refreshableJob) { 228 228 ServiceLocator.Instance.CallHiveService(service => { 229 foreach (HiveTask job in refreshableJob.GetAllHiveJobs()) {230 if ( job.Task.State != TaskState.Finished && job.Task.State != TaskState.Aborted && job.Task.State != TaskState.Failed)231 service.Stop Job(job.Task.Id);229 foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) { 230 if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed) 231 service.StopTask(task.Task.Id); 232 232 } 233 233 }); … … 235 235 } 236 236 237 #region Upload Experiment237 #region Upload Job 238 238 private Semaphore jobUploadSemaphore = new Semaphore(4, 4); // todo: take magic number into config 239 239 private static object jobCountLocker = new object(); 240 240 private static object pluginLocker = new object(); 241 private void Upload Experiment(RefreshableJob refreshableJob, CancellationToken cancellationToken) {241 private void UploadJob(RefreshableJob refreshableJob, CancellationToken cancellationToken) { 242 242 try { 243 243 refreshableJob.Progress = new Progress("Connecting to server..."); … … 254 254 } 255 255 256 foreach (OptimizerHiveTask hiveJob in refreshableJob.Hive Jobs.OfType<OptimizerHiveTask>()) {256 foreach (OptimizerHiveTask hiveJob in refreshableJob.HiveTasks.OfType<OptimizerHiveTask>()) { 257 257 hiveJob.SetIndexInParentOptimizerList(null); 258 258 } … … 260 260 // upload Task 261 261 refreshableJob.Progress.Status = "Uploading Task..."; 262 refreshableJob.Job.Id = ServiceLocator.Instance.CallHiveService((s) => s.Add HiveExperiment(refreshableJob.Job));262 refreshableJob.Job.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddJob(refreshableJob.Job)); 263 263 bool isPrivileged = refreshableJob.Job.IsPrivileged; 264 refreshableJob.Job = ServiceLocator.Instance.CallHiveService((s) => s.Get HiveExperiment(refreshableJob.Job.Id)); // update owner and permissions264 refreshableJob.Job = ServiceLocator.Instance.CallHiveService((s) => s.GetJob(refreshableJob.Job.Id)); // update owner and permissions 265 265 refreshableJob.Job.IsPrivileged = isPrivileged; 266 266 cancellationToken.ThrowIfCancellationRequested(); 267 267 268 int totalJobCount = refreshableJob.GetAllHive Jobs().Count();268 int totalJobCount = refreshableJob.GetAllHiveTasks().Count(); 269 269 int[] jobCount = new int[1]; // use a reference type (int-array) instead of value type (int) in order to pass the value via a delegate to task-parallel-library 270 270 cancellationToken.ThrowIfCancellationRequested(); … … 284 284 285 285 var tasks = new List<TS.Task>(); 286 foreach (HiveTask hiveJob in refreshableJob.Hive Jobs) {286 foreach (HiveTask hiveJob in refreshableJob.HiveTasks) { 287 287 tasks.Add(TS.Task.Factory.StartNew((hj) => { 288 Upload JobWithChildren(refreshableJob.Progress, (HiveTask)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, refreshableJob.Job.IsPrivileged, cancellationToken);288 UploadTaskWithChildren(refreshableJob.Progress, (HiveTask)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, refreshableJob.Job.IsPrivileged, cancellationToken); 289 289 }, hiveJob) 290 290 .ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted)); … … 334 334 /// </summary> 335 335 /// <param name="parentHiveTask">shall be null if its the root task</param> 336 private void Upload JobWithChildren(IProgress progress, HiveTask hiveJob, HiveTask parentHiveJob, IEnumerable<Guid> groups, int[] jobCount, int totalJobCount, Guid configPluginId, Guid hiveExperimentId, ILog log, bool isPrivileged, CancellationToken cancellationToken) {336 private void UploadTaskWithChildren(IProgress progress, HiveTask hiveTask, HiveTask parentHiveJob, IEnumerable<Guid> groups, int[] taskCount, int totalJobCount, Guid configPluginId, Guid jobId, ILog log, bool isPrivileged, CancellationToken cancellationToken) { 337 337 jobUploadSemaphore.WaitOne(); 338 338 bool semaphoreReleased = false; … … 340 340 cancellationToken.ThrowIfCancellationRequested(); 341 341 lock (jobCountLocker) { 342 jobCount[0]++;342 taskCount[0]++; 343 343 } 344 344 TaskData jobData; 345 345 List<IPluginDescription> plugins; 346 346 347 if (hive Job.ItemTask.ComputeInParallel && (hiveJob.ItemTask.Item is Optimization.Experiment || hiveJob.ItemTask.Item is Optimization.BatchRun)) {348 hive Job.Task.IsParentTask = true;349 hive Job.Task.FinishWhenChildJobsFinished = true;350 jobData = hive Job.GetAsTaskData(true, out plugins);347 if (hiveTask.ItemTask.ComputeInParallel && (hiveTask.ItemTask.Item is Optimization.Experiment || hiveTask.ItemTask.Item is Optimization.BatchRun)) { 348 hiveTask.Task.IsParentTask = true; 349 hiveTask.Task.FinishWhenChildJobsFinished = true; 350 jobData = hiveTask.GetAsTaskData(true, out plugins); 351 351 } else { 352 hive Job.Task.IsParentTask = false;353 hive Job.Task.FinishWhenChildJobsFinished = false;354 jobData = hive Job.GetAsTaskData(false, out plugins);352 hiveTask.Task.IsParentTask = false; 353 hiveTask.Task.FinishWhenChildJobsFinished = false; 354 jobData = hiveTask.GetAsTaskData(false, out plugins); 355 355 } 356 356 cancellationToken.ThrowIfCancellationRequested(); … … 359 359 if (!cancellationToken.IsCancellationRequested) { 360 360 lock (pluginLocker) { 361 ServiceLocator.Instance.CallHiveService((s) => hive Job.Task.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins));361 ServiceLocator.Instance.CallHiveService((s) => hiveTask.Task.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins)); 362 362 } 363 363 } 364 364 }, -1, "Failed to upload plugins"); 365 365 cancellationToken.ThrowIfCancellationRequested(); 366 hive Job.Task.PluginsNeededIds.Add(configPluginId);367 hive Job.Task.JobId = hiveExperimentId;368 hive Job.Task.IsPrivileged = isPrivileged;369 370 log.LogMessage(string.Format("Uploading task ({0} kb, {1} objects)", jobData.Data.Count() / 1024, hive Job.ItemTask.GetObjectGraphObjects().Count()));366 hiveTask.Task.PluginsNeededIds.Add(configPluginId); 367 hiveTask.Task.JobId = jobId; 368 hiveTask.Task.IsPrivileged = isPrivileged; 369 370 log.LogMessage(string.Format("Uploading task ({0} kb, {1} objects)", jobData.Data.Count() / 1024, hiveTask.ItemTask.GetObjectGraphObjects().Count())); 371 371 TryAndRepeat(() => { 372 372 if (!cancellationToken.IsCancellationRequested) { 373 373 if (parentHiveJob != null) { 374 hive Job.Task.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddChildJob(parentHiveJob.Task.Id, hiveJob.Task, jobData));374 hiveTask.Task.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddChildTask(parentHiveJob.Task.Id, hiveTask.Task, jobData)); 375 375 } else { 376 hive Job.Task.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddJob(hiveJob.Task, jobData, groups.ToList()));376 hiveTask.Task.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddTask(hiveTask.Task, jobData, groups.ToList())); 377 377 } 378 378 } … … 381 381 382 382 lock (jobCountLocker) { 383 progress.ProgressValue = (double) jobCount[0] / totalJobCount;384 progress.Status = string.Format("Uploaded task ({0} of {1})", jobCount[0], totalJobCount);383 progress.ProgressValue = (double)taskCount[0] / totalJobCount; 384 progress.Status = string.Format("Uploaded task ({0} of {1})", taskCount[0], totalJobCount); 385 385 } 386 386 387 387 var tasks = new List<TS.Task>(); 388 foreach (HiveTask child in hive Job.ChildHiveTasks) {388 foreach (HiveTask child in hiveTask.ChildHiveTasks) { 389 389 tasks.Add(TS.Task.Factory.StartNew((tuple) => { 390 390 var arguments = (Tuple<HiveTask, HiveTask>)tuple; 391 Upload JobWithChildren(progress, arguments.Item1, arguments.Item2, groups, jobCount, totalJobCount, configPluginId, hiveExperimentId, log, isPrivileged, cancellationToken);392 }, new Tuple<HiveTask, HiveTask>(child, hive Job))391 UploadTaskWithChildren(progress, arguments.Item1, arguments.Item2, groups, taskCount, totalJobCount, configPluginId, jobId, log, isPrivileged, cancellationToken); 392 }, new Tuple<HiveTask, HiveTask>(child, hiveTask)) 393 393 .ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted)); 394 394 } … … 408 408 409 409 #region Download Experiment 410 public static void Load Experiment(RefreshableJob refreshableJob) {410 public static void LoadJob(RefreshableJob refreshableJob) { 411 411 var hiveExperiment = refreshableJob.Job; 412 412 refreshableJob.Progress = new Progress(); … … 415 415 refreshableJob.IsProgressing = true; 416 416 int totalJobCount = 0; 417 IEnumerable<LightweightTask> all Jobs;417 IEnumerable<LightweightTask> allTasks; 418 418 419 419 refreshableJob.Progress.Status = "Connecting to Server..."; 420 420 // fetch all Task objects to create the full tree of tree of HiveTask objects 421 421 refreshableJob.Progress.Status = "Downloading list of jobs..."; 422 all Jobs = ServiceLocator.Instance.CallHiveService(s => s.GetLightweightExperimentJobs(hiveExperiment.Id));423 totalJobCount = all Jobs.Count();424 425 TaskDownloader downloader = new TaskDownloader(all Jobs.Select(x => x.Id));422 allTasks = ServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobTasks(hiveExperiment.Id)); 423 totalJobCount = allTasks.Count(); 424 425 TaskDownloader downloader = new TaskDownloader(allTasks.Select(x => x.Id)); 426 426 downloader.StartAsync(); 427 427 … … 435 435 } 436 436 } 437 IDictionary<Guid, HiveTask> allHive Jobs = downloader.Results;438 439 refreshableJob.Hive Jobs = new ItemCollection<HiveTask>(allHiveJobs.Values.Where(x => !x.Task.ParentTaskId.HasValue));437 IDictionary<Guid, HiveTask> allHiveTasks = downloader.Results; 438 439 refreshableJob.HiveTasks = new ItemCollection<HiveTask>(allHiveTasks.Values.Where(x => !x.Task.ParentTaskId.HasValue)); 440 440 441 441 if (refreshableJob.IsFinished()) { … … 446 446 447 447 // build child-task tree 448 foreach (HiveTask hive Job in refreshableJob.HiveJobs) {449 BuildHiveJobTree(hive Job, allJobs, allHiveJobs);448 foreach (HiveTask hiveTask in refreshableJob.HiveTasks) { 449 BuildHiveJobTree(hiveTask, allTasks, allHiveTasks); 450 450 } 451 451 … … 482 482 483 483 public static ItemTask LoadItemJob(Guid jobId) { 484 TaskData jobData = ServiceLocator.Instance.CallHiveService(s => s.Get JobData(jobId));484 TaskData jobData = ServiceLocator.Instance.CallHiveService(s => s.GetTaskData(jobId)); 485 485 try { 486 486 return PersistenceUtil.Deserialize<ItemTask>(jobData.Data); … … 506 506 } 507 507 508 public static HiveItemCollection<JobPermission> Get HiveExperimentPermissions(Guid hiveExperimentId) {508 public static HiveItemCollection<JobPermission> GetJobPermissions(Guid jobId) { 509 509 return ServiceLocator.Instance.CallHiveService((service) => { 510 IEnumerable<JobPermission> heps = service.GetHiveExperimentPermissions(hiveExperimentId);511 foreach (var hep in heps) {510 IEnumerable<JobPermission> jps = service.GetJobPermissions(jobId); 511 foreach (var hep in jps) { 512 512 hep.GrantedUserName = service.GetUsernameByUserId(hep.GrantedUserId); 513 513 } 514 return new HiveItemCollection<JobPermission>( heps);514 return new HiveItemCollection<JobPermission>(jps); 515 515 }); 516 516 }
Note: See TracChangeset
for help on using the changeset viewer.