Changeset 6444 for branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.4/HiveClient.cs
- Timestamp:
- 06/19/11 23:21:21 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.4/HiveClient.cs
r6426 r6444 27 27 using System.Security.Cryptography; 28 28 using System.Threading; 29 using System.Threading.Tasks; 29 30 using HeuristicLab.Common; 30 31 using HeuristicLab.Core; … … 104 105 105 106 #region Store 106 public static void Store(IHiveItem item ) {107 public static void Store(IHiveItem item, CancellationToken cancellationToken) { 107 108 if (item.Id == Guid.Empty) { 108 109 if (item is RefreshableHiveExperiment) { 109 HiveClient.Instance.UploadExperiment((RefreshableHiveExperiment)item );110 HiveClient.Instance.UploadExperiment((RefreshableHiveExperiment)item, cancellationToken); 110 111 } 111 112 } else { … … 114 115 } 115 116 } 116 public static void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item ) {117 public static void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item, CancellationToken cancellationToken) { 117 118 var call = new Func<Exception>(delegate() { 118 119 try { 119 Store(item );120 Store(item, cancellationToken); 120 121 } 121 122 catch (Exception ex) { … … 159 160 #endregion 160 161 161 public static void StartExperiment(Action<Exception> exceptionCallback, RefreshableHiveExperiment refreshableHiveExperiment ) {162 public static void StartExperiment(Action<Exception> exceptionCallback, RefreshableHiveExperiment refreshableHiveExperiment, CancellationToken cancellationToken) { 162 163 HiveClient.StoreAsync( 163 164 new Action<Exception>((Exception ex) => { 164 165 refreshableHiveExperiment.HiveExperiment.ExecutionState = ExecutionState.Prepared; 165 166 exceptionCallback(ex); 166 }), refreshableHiveExperiment );167 }), refreshableHiveExperiment, cancellationToken); 167 168 refreshableHiveExperiment.HiveExperiment.ExecutionState = ExecutionState.Started; 168 169 } … … 189 190 190 191 #region Upload Experiment 191 private void UploadExperiment(RefreshableHiveExperiment refreshableHiveExperiment) { 192 private Semaphore jobUploadSemaphore = new Semaphore(4, 4); // todo: take magic number into config 193 private static object jobCountLocker = new object(); 194 private void UploadExperiment(RefreshableHiveExperiment refreshableHiveExperiment, CancellationToken cancellationToken) { 192 195 try { 193 196 refreshableHiveExperiment.HiveExperiment.Progress = new Progress("Connecting to server..."); 194 197 refreshableHiveExperiment.HiveExperiment.IsProgressing = true; 195 ServiceLocator.Instance.CallHiveService(service => { 196 IEnumerable<string> resourceNames = ToResourceNameList(refreshableHiveExperiment.HiveExperiment.ResourceNames); 197 var resourceIds = new List<Guid>(); 198 foreach (var resourceName in resourceNames) { 199 Guid resourceId = service.GetResourceId(resourceName); 200 if (resourceId == Guid.Empty) { 201 throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName)); 202 } 203 resourceIds.Add(resourceId); 198 199 IEnumerable<string> resourceNames = ToResourceNameList(refreshableHiveExperiment.HiveExperiment.ResourceNames); 200 var resourceIds = new List<Guid>(); 201 foreach (var resourceName in resourceNames) { 202 Guid resourceId = ServiceLocator.Instance.CallHiveService((s) => s.GetResourceId(resourceName)); 203 if (resourceId == Guid.Empty) { 204 throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName)); 204 205 } 205 206 foreach (OptimizerHiveJob hiveJob in refreshableHiveExperiment.HiveExperiment.HiveJobs.OfType<OptimizerHiveJob>()) { 207 hiveJob.SetIndexInParentOptimizerList(null); 208 } 209 210 // upload HiveExperiment 211 refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading HiveExperiment..."; 212 refreshableHiveExperiment.HiveExperiment.Id = service.AddHiveExperiment(refreshableHiveExperiment.HiveExperiment); 213 214 int totalJobCount = refreshableHiveExperiment.HiveExperiment.GetAllHiveJobs().Count(); 215 int jobCount = 0; 216 217 // upload plugins 218 refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading plugins..."; 219 this.OnlinePlugins = service.GetPlugins(); 220 this.AlreadyUploadedPlugins = new List<Plugin>(); 221 Plugin configFilePlugin = UploadConfigurationFile(service, onlinePlugins); 222 this.alreadyUploadedPlugins.Add(configFilePlugin); 223 224 // upload jobs 225 refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading jobs..."; 226 227 foreach (HiveJob hiveJob in refreshableHiveExperiment.HiveExperiment.HiveJobs) { 228 UploadJobWithChildren(refreshableHiveExperiment.HiveExperiment.Progress, service, hiveJob, null, resourceIds, ref jobCount, totalJobCount, configFilePlugin.Id, refreshableHiveExperiment.HiveExperiment.Id, refreshableHiveExperiment.Log, refreshableHiveExperiment.HiveExperiment.IsPrivileged); 229 } 230 231 if (refreshableHiveExperiment.RefreshAutomatically) refreshableHiveExperiment.StartResultPolling(); 232 }); 206 resourceIds.Add(resourceId); 207 } 208 209 foreach (OptimizerHiveJob hiveJob in refreshableHiveExperiment.HiveExperiment.HiveJobs.OfType<OptimizerHiveJob>()) { 210 hiveJob.SetIndexInParentOptimizerList(null); 211 } 212 213 // upload HiveExperiment 214 refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading HiveExperiment..."; 215 refreshableHiveExperiment.HiveExperiment.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddHiveExperiment(refreshableHiveExperiment.HiveExperiment)); 216 cancellationToken.ThrowIfCancellationRequested(); 217 218 int totalJobCount = refreshableHiveExperiment.HiveExperiment.GetAllHiveJobs().Count(); 219 int[] jobCount = new int[1]; // use a reference type (int-array) instead of value type (int) in order to pass the value via a delegate to task-parallel-library 220 cancellationToken.ThrowIfCancellationRequested(); 221 222 // upload plugins 223 refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading plugins..."; 224 this.OnlinePlugins = ServiceLocator.Instance.CallHiveService((s) => s.GetPlugins()); 225 this.AlreadyUploadedPlugins = new List<Plugin>(); 226 Plugin configFilePlugin = ServiceLocator.Instance.CallHiveService((s) => UploadConfigurationFile(s, onlinePlugins)); 227 this.alreadyUploadedPlugins.Add(configFilePlugin); 228 cancellationToken.ThrowIfCancellationRequested(); 229 230 if (refreshableHiveExperiment.RefreshAutomatically) refreshableHiveExperiment.StartResultPolling(); 231 232 // upload jobs 233 refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading jobs..."; 234 235 var tasks = new List<Task>(); 236 foreach (HiveJob hiveJob in refreshableHiveExperiment.HiveExperiment.HiveJobs) { 237 tasks.Add(Task.Factory.StartNew((hj) => { 238 UploadJobWithChildren(refreshableHiveExperiment.HiveExperiment.Progress, (HiveJob)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableHiveExperiment.HiveExperiment.Id, refreshableHiveExperiment.Log, refreshableHiveExperiment.HiveExperiment.IsPrivileged, cancellationToken); 239 }, hiveJob) 240 .ContinueWith((x) => refreshableHiveExperiment.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted)); 241 } 242 Task.WaitAll(tasks.ToArray()); 233 243 } 234 244 finally { … … 267 277 /// Uploads the given job and all its child-jobs while setting the proper parentJobId values for the childs 268 278 /// </summary> 269 /// <param name="service"></param>270 /// <param name="hiveJob"></param>271 279 /// <param name="parentHiveJob">shall be null if its the root job</param> 272 /// <param name="groups"></param> 273 private void UploadJobWithChildren(IProgress progress, IHiveService service, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable<Guid> groups, ref int jobCount, int totalJobCount, Guid configPluginId, Guid hiveExperimentId, ILog log, bool isPrivileged) { 274 jobCount++; 275 progress.Status = string.Format("Serializing job {0} of {1}", jobCount, totalJobCount); 276 JobData jobData; 277 List<IPluginDescription> plugins; 278 279 if (hiveJob.ItemJob.ComputeInParallel && (hiveJob.ItemJob.Item is Optimization.Experiment || hiveJob.ItemJob.Item is Optimization.BatchRun)) { 280 hiveJob.Job.IsParentJob = true; 281 hiveJob.Job.FinishWhenChildJobsFinished = true; 282 jobData = hiveJob.GetAsJobData(true, out plugins); 283 } else { 284 hiveJob.Job.IsParentJob = false; 285 hiveJob.Job.FinishWhenChildJobsFinished = false; 286 jobData = hiveJob.GetAsJobData(false, out plugins); 287 } 288 289 TryAndRepeat(() => { 290 hiveJob.Job.PluginsNeededIds = PluginUtil.GetPluginDependencies(service, this.onlinePlugins, this.alreadyUploadedPlugins, plugins); 291 }, -1, "Failed to upload plugins"); 292 hiveJob.Job.PluginsNeededIds.Add(configPluginId); 293 hiveJob.Job.HiveExperimentId = hiveExperimentId; 294 hiveJob.Job.IsPrivileged = isPrivileged; 295 296 progress.Status = string.Format("Uploading job {0} of {1} ({2} kb, {3} objects)", jobCount, totalJobCount, jobData.Data.Count() / 1024, hiveJob.ItemJob.GetObjectGraphObjects().Count()); 297 progress.ProgressValue = (double)jobCount / totalJobCount; 298 299 log.LogMessage(progress.Status); 300 TryAndRepeat(() => { 301 if (parentHiveJob != null) { 302 hiveJob.Job.Id = service.AddChildJob(parentHiveJob.Job.Id, hiveJob.Job, jobData); 303 } else { 304 hiveJob.Job.Id = service.AddJob(hiveJob.Job, jobData, groups.ToList()); 305 } 306 }, -1, "Failed to add job", log); 307 308 foreach (HiveJob child in hiveJob.ChildHiveJobs) { 309 UploadJobWithChildren(progress, service, child, hiveJob, groups, ref jobCount, totalJobCount, configPluginId, hiveExperimentId, log, isPrivileged); 280 private void UploadJobWithChildren(IProgress progress, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable<Guid> groups, int[] jobCount, int totalJobCount, Guid configPluginId, Guid hiveExperimentId, ILog log, bool isPrivileged, CancellationToken cancellationToken) { 281 jobUploadSemaphore.WaitOne(); 282 try { 283 cancellationToken.ThrowIfCancellationRequested(); 284 lock (jobCountLocker) { 285 jobCount[0]++; 286 } 287 JobData jobData = null; 288 List<IPluginDescription> plugins = null; 289 290 TryAndRepeat(() => { // workaround for persistence bug (thread-safe access to bitmaps) - remove later 291 if (hiveJob.ItemJob.ComputeInParallel && (hiveJob.ItemJob.Item is Optimization.Experiment || hiveJob.ItemJob.Item is Optimization.BatchRun)) { 292 hiveJob.Job.IsParentJob = true; 293 hiveJob.Job.FinishWhenChildJobsFinished = true; 294 jobData = hiveJob.GetAsJobData(true, out plugins); 295 } else { 296 hiveJob.Job.IsParentJob = false; 297 hiveJob.Job.FinishWhenChildJobsFinished = false; 298 jobData = hiveJob.GetAsJobData(false, out plugins); 299 } 300 }, 30, "Could not serialize job"); 301 cancellationToken.ThrowIfCancellationRequested(); 302 303 TryAndRepeat(() => { 304 if (!cancellationToken.IsCancellationRequested) { 305 ServiceLocator.Instance.CallHiveService((s) => hiveJob.Job.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins)); 306 } 307 }, -1, "Failed to upload plugins"); 308 cancellationToken.ThrowIfCancellationRequested(); 309 hiveJob.Job.PluginsNeededIds.Add(configPluginId); 310 hiveJob.Job.HiveExperimentId = hiveExperimentId; 311 hiveJob.Job.IsPrivileged = isPrivileged; 312 313 log.LogMessage(string.Format("Uploading job ({0} kb, {1} objects)", jobData.Data.Count() / 1024, hiveJob.ItemJob.GetObjectGraphObjects().Count())); 314 TryAndRepeat(() => { 315 if (!cancellationToken.IsCancellationRequested) { 316 if (parentHiveJob != null) { 317 hiveJob.Job.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddChildJob(parentHiveJob.Job.Id, hiveJob.Job, jobData)); 318 } else { 319 hiveJob.Job.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddJob(hiveJob.Job, jobData, groups.ToList())); 320 } 321 } 322 }, 50, "Failed to add job", log); 323 cancellationToken.ThrowIfCancellationRequested(); 324 325 lock (jobCountLocker) { 326 progress.ProgressValue = (double)jobCount[0] / totalJobCount; 327 progress.Status = string.Format("Uploaded job ({0} of {1})", jobCount[0], totalJobCount); 328 } 329 330 var tasks = new List<Task>(); 331 foreach (HiveJob child in hiveJob.ChildHiveJobs) { 332 tasks.Add(Task.Factory.StartNew((tuple) => { 333 var arguments = (Tuple<HiveJob, HiveJob>)tuple; 334 UploadJobWithChildren(progress, arguments.Item1, arguments.Item2, groups, jobCount, totalJobCount, configPluginId, hiveExperimentId, log, isPrivileged, cancellationToken); 335 }, new Tuple<HiveJob, HiveJob>(child, hiveJob )) 336 .ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted)); 337 } 338 Task.WaitAll(tasks.ToArray()); 339 } 340 finally { 341 jobUploadSemaphore.Release(); 310 342 } 311 343 }
Note: See TracChangeset
for help on using the changeset viewer.