Changeset 4368 for branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs
- Timestamp:
- 09/07/10 10:22:27 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs
r4342 r4368 45 45 using HeuristicLab.Hive.Experiment.Properties; 46 46 using System.ComponentModel; 47 using HeuristicLab.Hive.Experiment.Jobs; 47 48 48 49 namespace HeuristicLab.Hive.Experiment { … … 51 52 /// </summary> 52 53 [Item(itemName, itemDescription)] 53 [Creatable("Testing & Analysis")]54 54 [StorableClass] 55 55 public class HiveExperiment : NamedItem, IExecutable { … … 63 63 private System.Timers.Timer timer; 64 64 private bool pausePending, stopPending; 65 private bool sendingJobsFinished = false;66 65 67 66 // ensure that only 2 threads can fetch jobresults simultaniously … … 98 97 [Storable] 99 98 private DateTime lastUpdateTime; 100 101 /// <summary> 102 /// Mapping from JobId to an optimizer. 103 /// Stores all pending optimizers. If an optimizer is finished it is removed from this collection 104 /// </summary> 105 [Storable] 106 private IDictionary<Guid, IOptimizer> pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>(); 107 108 /// <summary> 109 /// Stores a mapping from the child-optimizer to the parent optimizer. 110 /// Needed to replace a finished optimizer in the optimizer-tree. 111 /// Only pending optmizers are stored. 112 /// </summary> 113 [Storable] 114 private IDictionary<IOptimizer, IOptimizer> parentOptimizersByPendingOptimizer = new Dictionary<IOptimizer, IOptimizer>(); 115 116 [Storable] 117 private JobItemList jobItems; 118 public JobItemList JobItems { 119 get { return jobItems; } 120 } 121 99 122 100 [Storable] 123 101 private string resourceIds; … … 173 151 } 174 152 } 153 154 [Storable] 155 private IJob rootJob; 156 public IJob RootJob { 157 get { return rootJob; } 158 set { 159 if (rootJob != value) { 160 rootJob = value; 161 OnRootJobChanged(); 162 } 163 } 164 } 165 166 private JobItem rootJobItem; 167 public JobItem RootJobItem { 168 get { return rootJobItem; } 169 set { 170 if (rootJobItem != null) { 171 DeregisterRootJobItemEvents(); 172 } 173 if (rootJobItem != value) { 174 rootJobItem = value; 175 RegisterRootJobItemEvents(); 176 OnRootJobItemChanged(); 177 } 178 } 179 } 180 181 private void RegisterRootJobItemEvents() { 182 rootJobItem.FinalResultAvailable += new EventHandler(rootJobItem_FinalResultAvailable); 183 } 184 185 private void DeregisterRootJobItemEvents() { 186 rootJobItem.FinalResultAvailable -= new EventHandler(rootJobItem_FinalResultAvailable); 187 } 188 175 189 #endregion 176 190 … … 185 199 this.log = new Log(); 186 200 pausePending = stopPending = false; 187 jobItems = new JobItemList();188 201 isPollingResults = false; 189 RegisterJobItemListEvents();190 202 InitTimer(); 191 203 } … … 198 210 clone.executionState = this.executionState; 199 211 clone.executionTime = this.executionTime; 200 clone.pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();201 202 lock (pendingOptimizerMappingsLocker) {203 foreach (var pair in this.pendingOptimizersByJobId)204 clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);205 206 foreach (var pair in this.parentOptimizersByPendingOptimizer)207 clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value);208 }209 212 clone.log = (ILog)cloner.Clone(log); 210 213 clone.stopPending = this.stopPending; 211 214 clone.pausePending = this.pausePending; 212 clone.jobItems.AddRange((JobItemList)cloner.Clone(jobItems));213 215 clone.lastUpdateTime = this.lastUpdateTime; 214 216 clone.isPollingResults = this.isPollingResults; 217 clone.rootJob = (IJob)cloner.Clone(this.rootJob); 215 218 return clone; 216 219 } … … 221 224 this.IsPollingResults = false; 222 225 this.stopResultsPollingPending = false; 223 Register JobItemListEvents();226 RegisterEvents(); 224 227 LogMessage("I was deserialized."); 228 } 229 230 private void RegisterEvents() { 231 RootJobChanged += new EventHandler(HiveExperiment_RootJobChanged); 232 } 233 234 private void DeRegisterEvents() { 235 RootJobChanged -= new EventHandler(HiveExperiment_RootJobChanged); 225 236 } 226 237 … … 247 258 if (experiment != null) { 248 259 StopResultPolling(); 249 lock (pendingOptimizerMappingsLocker) {250 pendingOptimizersByJobId.Clear();251 parentOptimizersByPendingOptimizer.Clear();252 }253 lock (jobItems) {254 jobItems.Clear();255 }256 260 experiment.Prepare(); 257 261 this.ExecutionState = Core.ExecutionState.Prepared; … … 261 265 262 266 public void Start() { 263 sendingJobsFinished = false;264 267 OnStarted(); 265 268 ExecutionTime = new TimeSpan(); 266 269 lastUpdateTime = DateTime.Now; 267 270 this.ExecutionState = Core.ExecutionState.Started; 268 StartResultPolling();269 271 270 272 Thread t = new Thread(() => { 271 IClientFacade clientFacade = CreateStreamedClientFacade(); 272 273 try { 274 pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>(); 275 276 LogMessage("Extracting jobs from Experiment"); 277 parentOptimizersByPendingOptimizer = GetOptimizers(true); 278 LogMessage("Extraction of jobs from Experiment finished"); 279 280 IEnumerable<string> groups = ResourceGroups; 281 lock (pendingOptimizerMappingsLocker) { 282 foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) { 283 SerializedJob serializedJob = CreateSerializedJob(optimizer); 284 ResponseObject<JobDto> response = clientFacade.AddJobWithGroupStrings(serializedJob, groups); 285 pendingOptimizersByJobId.Add(response.Obj.Id, optimizer); 286 287 JobItem jobItem = new JobItem() { 288 JobDto = response.Obj, 289 LatestSnapshot = null, 290 Optimizer = optimizer 291 }; 292 lock (jobItems) { 293 jobItems.Add(jobItem); 294 } 295 LogMessage(jobItem.JobDto.Id, "Job sent to Hive"); 273 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 274 try { 275 RootJobItem = ToJobItem(RootJob); 276 277 IEnumerable<string> groups = ResourceGroups; 278 SerializedJob serializedJob = RootJobItem.ToSerializedJob(); 279 ResponseObject<JobDto> response = service.Obj.AddJobWithGroupStrings(serializedJob, groups); 280 281 if (response.StatusMessage != ResponseStatus.Ok) { 282 throw new Exception(response.StatusMessage.ToString()); 283 } else { 284 RootJobItem.JobDto = response.Obj; 285 LogMessage(RootJobItem.JobDto.Id, "Job sent to Hive"); 286 287 StartResultPolling(); 296 288 } 297 289 } 298 } 299 catch (Exception e) { 300 LogMessage("Error: Starting HiveExperiment failed: " + e.Message); 301 this.ExecutionState = Core.ExecutionState.Stopped; 302 OnStopped(); 303 } 304 finally { 305 ServiceLocator.DisposeClientFacade(clientFacade); 306 } 307 sendingJobsFinished = true; 290 catch (Exception e) { 291 LogMessage("Error: Starting HiveExperiment failed: " + e.Message); 292 this.ExecutionState = Core.ExecutionState.Stopped; 293 OnStopped(); 294 } 295 } 308 296 }); 309 297 t.Start(); … … 311 299 312 300 public void Stop() { 301 if (IsPollingResults) 302 StopResultPolling(); 313 303 this.ExecutionState = Core.ExecutionState.Stopped; 314 foreach (JobItem jobItem in jobItems) {315 AbortJob(jobItem.JobDto.Id);316 }317 304 OnStopped(); 318 305 } 319 306 #endregion 320 321 #region Optimizier Management 322 /// <summary> 323 /// Returns all optimizers in the current Experiment 324 /// </summary> 325 /// <param name="flatout">if false only top level optimizers are returned, if true the optimizer-tree is flatted</param> 326 /// <returns></returns> 327 private IDictionary<IOptimizer, IOptimizer> GetOptimizers(bool flatout) { 328 if (!flatout) { 329 var optimizers = new Dictionary<IOptimizer, IOptimizer>(); 330 foreach (IOptimizer opt in experiment.Optimizers) { 331 optimizers.Add(experiment, opt); 332 } 333 return optimizers; 307 308 #region Job Management 309 public void RefreshJobTree() { 310 this.RootJob = CreateJobTree(this.Experiment); 311 } 312 313 private IJob CreateJobTree(IOptimizer optimizer) { 314 IJob job = null; 315 if (optimizer is HeuristicLab.Optimization.Experiment) { 316 HeuristicLab.Optimization.Experiment exp = (HeuristicLab.Optimization.Experiment)optimizer; 317 ExperimentJob expJob = new ExperimentJob(exp); 318 foreach (IOptimizer opt in exp.Optimizers) { 319 expJob.AddChildJob(CreateJobTree(opt)); 320 } 321 job = expJob; 322 } else if (optimizer is BatchRun) { 323 job = new BatchRunJob(optimizer); 334 324 } else { 335 return FlatOptimizerTree(null, experiment, ""); 336 } 337 } 338 339 /// <summary> 340 /// Recursively iterates all IOptimizers in the optimizer-tree and returns them. 341 /// 342 /// [chn] this could be implemented more cleanly if Experiment and BatchRun would implement an interface like: 343 /// interface IParallelizable { 344 /// IEnumerable<IOptimizer> GetOptimizers(); 345 /// } 346 /// </summary> 347 /// <returns>a dictionary mapping from the parent optimizer to the child optimizer</returns> 348 private IDictionary<IOptimizer, IOptimizer> FlatOptimizerTree(IOptimizer parent, IOptimizer optimizer, string prepend) { 349 IDictionary<IOptimizer, IOptimizer> optimizers = new Dictionary<IOptimizer, IOptimizer>(); 350 if (optimizer is HeuristicLab.Optimization.Experiment) { 351 HeuristicLab.Optimization.Experiment experiment = optimizer as HeuristicLab.Optimization.Experiment; 352 if (this.experiment != experiment) { 353 prepend += experiment.Name + "/"; // don't prepend for top-level optimizers 354 } 355 foreach (IOptimizer opt in experiment.Optimizers) { 356 AddRange(optimizers, FlatOptimizerTree(experiment, opt, prepend)); 357 } 358 } else if (optimizer is BatchRun) { 359 BatchRun batchRun = optimizer as BatchRun; 360 prepend += batchRun.Name + "/"; 361 for (int i = 0; i < batchRun.Repetitions; i++) { 362 IOptimizer opt = (IOptimizer)batchRun.Algorithm.Clone(); 363 opt.Name += " [" + i + "]"; 364 IDictionary<IOptimizer, IOptimizer> batchOptimizers = FlatOptimizerTree(batchRun, opt, prepend); 365 AddRange(optimizers, batchOptimizers); 366 } 367 } else if (optimizer is EngineAlgorithm) { 368 optimizer.Name = prepend + optimizer.Name; 369 optimizers.Add(optimizer, parent); 370 LogMessage("Optimizer extracted: " + optimizer.Name); 371 } else { 372 Logger.Warn("Optimizer of type " + optimizers.GetType().ToString() + " unknown"); 373 optimizer.Name = prepend + optimizer.Name; 374 optimizers.Add(optimizer, parent); 375 LogMessage("Optimizer extracted: " + optimizer.Name); 376 } 377 return optimizers; 378 } 379 380 private void ReplaceOptimizer(IOptimizer parentOptimizer, IOptimizer originalOptimizer, IOptimizer newOptimizer) { 381 lock (locker) { 382 if (parentOptimizer is HeuristicLab.Optimization.Experiment) { 383 HeuristicLab.Optimization.Experiment exp = (HeuristicLab.Optimization.Experiment)parentOptimizer; 384 int originalOptimizerIndex = exp.Optimizers.IndexOf(originalOptimizer); 385 exp.Optimizers[originalOptimizerIndex] = newOptimizer; 386 } else if (parentOptimizer is BatchRun) { 387 BatchRun batchRun = (BatchRun)parentOptimizer; 388 if (newOptimizer is IAlgorithm) { 389 batchRun.Runs.Add(new Run(newOptimizer.Name, (IAlgorithm)newOptimizer)); 390 } else { 391 throw new NotSupportedException("Only IAlgorithm types supported"); 392 } 393 } else { 394 throw new NotSupportedException("Invalid parentOptimizer"); 395 } 396 } 397 } 398 399 private bool NoMorePendingOptimizers() { 400 lock (pendingOptimizerMappingsLocker) { 401 return pendingOptimizersByJobId.Count == 0; 402 } 403 } 404 405 /// <summary> 406 /// Removes optimizers from 407 /// - parentOptimizersByPendingOptimizer 408 /// - pendingOptimizersByJobId 409 /// </summary> 410 /// <param name="jobId"></param> 411 private void DisposeOptimizerMappings(Guid jobId) { 412 LogMessage(jobId, "Disposing Optimizer Mappings"); 413 lock (pendingOptimizerMappingsLocker) { 414 parentOptimizersByPendingOptimizer.Remove(pendingOptimizersByJobId[jobId]); 415 pendingOptimizersByJobId.Remove(jobId); 416 } 417 } 418 419 #endregion 420 421 #region Job Management 325 job = new OptimizerJob(optimizer); 326 } 327 return job; 328 } 329 330 private JobItem ToJobItem(IJob j) { 331 return new JobItem() { 332 Job = j, 333 JobDto = new JobDto() { 334 State = JobState.Offline, 335 PluginsNeeded = HivePluginInfoDto.FindPluginsNeeded(j.GetType()) 336 } 337 }; 338 } 339 422 340 /// <summary> 423 341 /// Updates all JobItems with the results 342 /// if one is finished, the serialized job is downloaded and updated 424 343 /// </summary> 425 344 /// <param name="jobResultList"></param> 426 345 private void UpdateJobItems(JobResultList jobResultList) { 427 // use a Dict to avoid quadratic runtime complexity 428 IDictionary<Guid, JobResult> jobResultDict = jobResultList.ToDictionary(job => job.JobId); 429 lock (jobItems) { 430 foreach (JobItem jobItem in JobItems) { 431 if (jobResultDict.ContainsKey(jobItem.JobDto.Id)) { 432 jobItem.JobResult = jobResultDict[jobItem.JobDto.Id]; 346 if (RootJobItem == null) { 347 var rootResults = jobResultList.Where(res => !res.ParentJobId.HasValue); 348 if (rootResults.Count() > 0) { 349 RootJobItem = new JobItem(rootResults.First()); 350 } else { 351 LogMessage("Error: Could not find JobResult for RootJobItem"); 352 return; 353 } 354 } 355 356 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 357 foreach (JobResult jobResult in jobResultList) { 358 JobItem jobItem = FindJobItem(RootJobItem, jobResult.Id); 359 if (jobItem != null) { 360 jobItem.UpdateJob(jobResult); 361 if (jobItem.JobDto.State == JobState.Finished && !jobItem.IsFinalResultAvailable) { 362 // job is finished but the final result was not yet downloaded 363 SerializedJob serializedJob = service.Obj.GetLastSerializedResult(jobResult.Id).Obj; 364 jobItem.Job = XmlParser.Deserialize<IJob>(new MemoryStream(serializedJob.SerializedJobData)); 365 UpdateChildJobs(jobItem); 366 this.Experiment.Runs.AddRange(((OptimizerJob)jobItem.Job).Optimizer.Runs); 367 //ReplaceOptimizerInExperiment(jobItem.JobDto.Id, jobItem.Job); 368 } 369 } else { 370 // job does not yet exist locally 371 JobItem parentJobItem = FindJobItem(RootJobItem, jobResult.ParentJobId.Value); 372 if (parentJobItem != null) { 373 LogMessage(jobResult.Id, "Creating JobItem"); 374 SerializedJob serializedJob = service.Obj.GetLastSerializedResult(jobResult.Id).Obj; 375 JobItem newJobItem = new JobItem(); 376 newJobItem.JobDto = serializedJob.JobInfo; 377 newJobItem.Job = XmlParser.Deserialize<IJob>(new MemoryStream(serializedJob.SerializedJobData)); 378 parentJobItem.AddChildJob(newJobItem); 379 if (newJobItem.JobDto.State == JobState.Finished && newJobItem.IsFinalResultAvailable) { 380 this.Experiment.Runs.AddRange(((OptimizerJob)newJobItem.Job).Optimizer.Runs); 381 UpdateChildJobs(newJobItem); 382 //ReplaceOptimizerInExperiment(jobItem.JobDto.Id, jobItem.Job); 383 } 384 } else { 385 LogMessage("Error: Could not update JobResult for " + jobResult.Id); 386 } 433 387 } 434 388 } 435 389 } 390 } 391 392 /// <summary> 393 /// Updates the ChildJobItems of a JobItem according to the IJob.ChildJobs of JobItem.Job (pretty confusing, right) 394 /// </summary> 395 /// <param name="jobItem"></param> 396 private void UpdateChildJobs(JobItem jobItem) { 397 List<JobItem> newJobItems = new List<JobItem>(); 398 foreach (IJob job in jobItem.Job.ChildJobs) { 399 if (!jobItem.ReplaceChildJob(job)) { 400 newJobItems.Add(ToJobItem(job)); 401 } 402 } 403 foreach (JobItem item in newJobItems) { 404 jobItem.AddChildJob(item); 405 } 406 } 407 408 private JobItem FindJobItem(JobItem parentJobItem, Guid jobId) { 409 if (parentJobItem.JobDto.Id == jobId) { 410 return parentJobItem; 411 } else { 412 foreach (JobItem child in parentJobItem.ChildJobItems) { 413 JobItem result = FindJobItem(child, jobId); 414 if (result != null) 415 return result; 416 } 417 } 418 return null; 436 419 } 437 420 … … 441 424 Thread t = new Thread(() => { 442 425 try { 443 if (jobItem. State == JobState.Finished) {426 if (jobItem.JobDto.State == JobState.Finished) { 444 427 FetchAndUpdateJob(jobItem.JobDto.Id); 445 DisposeOptimizerMappings(jobItem.JobDto.Id); 446 } else if (jobItem.State == JobState.Failed) { 447 DisposeOptimizerMappings(jobItem.JobDto.Id); 428 } else if (jobItem.JobDto.State == JobState.Failed) { 429 448 430 } 449 450 if (NoMorePendingOptimizers()) {451 StopResultPolling();452 this.ExecutionState = Core.ExecutionState.Stopped;453 OnStopped();454 }455 431 } 456 432 catch (Exception ex) { 457 Logger.Error("JobItem_JobStateChanged failed badly: " + ex.Message);458 433 LogMessage("JobItem_JobStateChanged failed badly: " + ex.Message); 459 434 } … … 469 444 LogMessage(jobId, "FetchAndUpdateJob started"); 470 445 if (fetchJobSemaphore.WaitOne(new TimeSpan(0, 2, 0))) { 471 IClientFacade clientFacade = null; 446 472 447 try { 473 clientFacade = CreateStreamedClientFacade(); 474 IOptimizer originalOptimizer; 475 originalOptimizer = pendingOptimizersByJobId[jobId]; 476 477 ResponseObject<SerializedJob> jobResponse = clientFacade.GetLastSerializedResult(jobId, false, false); 478 IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData)); 479 IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer; 480 ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer); 481 LogMessage(jobId, "FetchAndUpdateJob ended"); 448 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 449 ResponseObject<SerializedJob> jobResponse = service.Obj.GetLastSerializedResult(jobId); 450 IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData)); 451 IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer; 452 LogMessage(jobId, "FetchAndUpdateJob ended"); 453 } 482 454 } 483 455 catch (Exception e) { … … 486 458 } 487 459 finally { 488 ServiceLocator.DisposeClientFacade(clientFacade);489 460 fetchJobSemaphore.Release(); 490 461 } … … 499 470 } 500 471 501 private void UpdateJobItem(JobDto jobDto) {502 JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id);503 jobItem.JobDto = jobDto;504 }505 506 472 public void AbortJob(Guid jobId) { 507 IClientFacade clientFacade = CreateClientFacade(); 508 Response response = clientFacade.AbortJob(jobId); 509 LogMessage(jobId, "Aborting Job: " + response.StatusMessage); 510 } 511 512 private SerializedJob CreateSerializedJob(IOptimizer optimizer) { 513 IJob job = new OptimizerJob() { 514 Optimizer = optimizer 515 }; 516 517 // serialize job 518 MemoryStream memStream = new MemoryStream(); 519 XmlGenerator.Serialize(job, memStream); 520 byte[] jobByteArray = memStream.ToArray(); 521 memStream.Dispose(); 522 523 // find out which which plugins are needed for the given object 524 List<HivePluginInfoDto> pluginsNeeded = ( 525 from p in GetDeclaringPlugins(job.GetType()) 526 select new HivePluginInfoDto() { 527 Name = p.Name, 528 Version = p.Version 529 }).ToList(); 530 531 JobDto jobDto = new JobDto() { 532 CoresNeeded = 1, // [chn] how to determine real cores needed? 533 PluginsNeeded = pluginsNeeded, 534 State = JobState.Offline, 535 MemoryNeeded = 0 536 }; 537 538 SerializedJob serializedJob = new SerializedJob() { 539 JobInfo = jobDto, 540 SerializedJobData = jobByteArray 541 }; 542 543 return serializedJob; 544 } 545 546 private JobItem GetJobItemById(Guid jobId) { 547 return jobItems.Single(x => x.JobDto.Id == jobId); 473 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { 474 Response response = service.Obj.AbortJob(jobId); 475 LogMessage(jobId, "Aborting Job: " + response.StatusMessage); 476 } 548 477 } 549 478 #endregion … … 553 482 this.stopResultsPollingPending = false; 554 483 this.IsPollingResults = true; 555 resultPollingThread = CreateResultPollingThread();484 resultPollingThread = new Thread(RunResultPollingThread); 556 485 if (resultPollingThread.ThreadState != System.Threading.ThreadState.Running) 557 486 resultPollingThread.Start(); … … 560 489 public void StopResultPolling() { 561 490 this.stopResultsPollingPending = true; 562 if (resultPollingThread != null && resultPollingThread.ThreadState == System.Threading.ThreadState.WaitSleepJoin) { 563 resultPollingThread.Interrupt(); 491 if (resultPollingThread != null) { 492 if (resultPollingThread.ThreadState == System.Threading.ThreadState.WaitSleepJoin) { 493 resultPollingThread.Interrupt(); 494 } 495 resultPollingThread.Join(); 564 496 } 565 497 this.stopResultsPollingPending = false; 566 498 } 567 499 568 private Thread CreateResultPollingThread() { 569 return new Thread(() => { 570 try { 571 do { 572 IClientFacade clientFacade = CreateStreamedClientFacade(); 573 IEnumerable<Guid> jobIdsToQuery = from job in JobItems 574 where job.State != JobState.Finished && 575 job.State != JobState.Failed 576 select job.JobDto.Id; 577 if (jobIdsToQuery.Count() > 0) { 578 LogMessage("Polling results for " + jobIdsToQuery.Count() + " jobs"); 500 private void RunResultPollingThread() { 501 try { 502 do { 503 if (RootJobItem.JobDto.State != JobState.Finished) { 504 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 505 LogMessage("Polling results"); 579 506 try { 580 ResponseObject<JobResultList> response = clientFacade.GetJobResults(jobIdsToQuery);507 ResponseObject<JobResultList> response = service.Obj.GetChildJobResults(RootJobItem.JobDto.Id, true, true); 581 508 if (response.StatusMessage == ResponseStatus.Ok) { 582 509 JobResultList jobItemList = response.Obj; … … 591 518 LogMessage("Polling results failed: " + e.Message); 592 519 } 593 finally {594 ServiceLocator.DisposeClientFacade(clientFacade);595 }596 Thread.Sleep(resultPollingIntervalMs);597 } else {598 if (sendingJobsFinished) {599 // all the jobs have been sent to hive, but non are to query any more (all finished or failed)600 this.stopResultsPollingPending = true;601 }602 520 } 603 } while (!this.stopResultsPollingPending); 604 } 605 catch (ThreadInterruptedException exception) { 606 // thread has been interuppted 607 } 608 catch (Exception e) { 609 LogMessage("Result Polling Thread failed badly: " + e.Message); 610 Logger.Error("Result Polling Thread failed badly: " + e.Message); 611 } 612 finally { 613 this.IsPollingResults = false; 614 } 615 }); 521 Thread.Sleep(resultPollingIntervalMs); 522 } else { 523 // all the jobs have been sent to hive, but non are to query any more (all finished or failed) 524 this.stopResultsPollingPending = true; 525 } 526 } while (!this.stopResultsPollingPending); 527 } 528 catch (ThreadInterruptedException) { 529 // thread has been interuppted 530 } 531 catch (Exception e) { 532 LogMessage("Result Polling Thread failed badly: " + e.Message); 533 } 534 finally { 535 this.IsPollingResults = false; 536 } 616 537 } 617 538 … … 620 541 #region Snapshots 621 542 622 private void UpdateSnapshot(ResponseObject<SerializedJob> response) { 623 JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id); 624 jobItem.LatestSnapshot = response; 625 } 626 627 public void RequestSnapshot(Guid jobId) { 628 Thread t = new Thread(() => { 629 IClientFacade clientFacade = null; 630 try { 631 clientFacade = CreateStreamedClientFacade(); 632 633 ResponseObject<SerializedJob> response; 634 int retryCount = 0; 635 636 Response snapShotResponse = clientFacade.RequestSnapshot(jobId); 637 if (snapShotResponse.StatusMessage == ResponseStatus.RequestSnapshot_JobIsNotBeeingCalculated) { 638 // job already finished 639 Logger.Debug("HiveExperiment: Abort - GetLastResult(false)"); 640 response = clientFacade.GetLastSerializedResult(jobId, false, false); 641 Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage); 642 } else { 643 // server sent snapshot request to client 644 // poll until snapshot is ready 645 do { 646 Thread.Sleep(snapshotPollingIntervalMs); 647 Logger.Debug("HiveExperiment: Abort - GetLastResult(true)"); 648 response = clientFacade.GetLastSerializedResult(jobId, false, true); 649 Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage); 650 retryCount++; 651 // loop while 652 // 1. problem with communication with server 653 // 2. job result not yet ready 654 } while ( 655 (retryCount < maxSnapshotRetries) && ( 656 response.StatusMessage == ResponseStatus.GetLastSerializedResult_JobResultNotYetThere) 657 ); 658 } 659 if (response.StatusMessage == ResponseStatus.Ok) { 660 LogMessage(jobId, "Snapshot polling successfull for job " + jobId); 661 UpdateSnapshot(response); 662 } else { 663 LogMessage(jobId, "Error: Polling of Snapshot failed for job " + jobId + ": " + response.StatusMessage); 664 } 665 } 666 catch (Exception e) { 667 LogMessage("RequestSnapshot Thread failed badly: " + e.Message); 668 Logger.Error("RequestSnapshot Thread failed badly: " + e.Message); 669 } 670 finally { 671 ServiceLocator.DisposeClientFacade(clientFacade); 672 } 673 }); 674 t.Start(); 675 } 676 677 void JobItem_SnapshotRequestedStateChanged(object sender, EventArgs e) { 678 JobItem jobItem = (JobItem)sender; 679 if (jobItem.SnapshotRequestedState == SnapshotRequestedState.Requested) { 680 RequestSnapshot(jobItem.JobDto.Id); 681 } 682 } 683 684 #endregion 685 686 #region Required Plugin Search 687 /// <summary> 688 /// Returns a list of plugins in which the type itself and all members 689 /// of the type are declared. Objectgraph is searched recursively. 690 /// </summary> 691 private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) { 692 HashSet<Type> types = new HashSet<Type>(); 693 FindTypes(type, types, "HeuristicLab."); 694 return GetDeclaringPlugins(types); 695 } 696 697 /// <summary> 698 /// Returns the plugins (including dependencies) in which the given types are declared 699 /// </summary> 700 private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) { 701 HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>(); 702 foreach (Type t in types) { 703 FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins); 704 } 705 return plugins; 706 } 707 708 /// <summary> 709 /// Finds the dependencies of the given plugin and adds it to the plugins hashset. 710 /// Also searches the dependencies recursively. 711 /// </summary> 712 private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) { 713 if (!plugins.Contains(plugin)) { 714 plugins.Add(plugin); 715 foreach (IPluginDescription dependency in plugin.Dependencies) { 716 FindDeclaringPlugins(dependency, plugins); 717 } 718 } 719 } 720 721 /// <summary> 722 /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart 723 /// Be aware that search is not performed on attributes 724 /// </summary> 725 /// <param name="type">the type to be searched</param> 726 /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param> 727 /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param> 728 private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) { 729 if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) { 730 types.Add(type); 731 732 // constructors 733 foreach (ConstructorInfo info in type.GetConstructors()) { 734 foreach (ParameterInfo paramInfo in info.GetParameters()) { 735 FindTypes(paramInfo.ParameterType, types, namespaceStart); 736 } 737 } 738 739 // interfaces 740 foreach (Type t in type.GetInterfaces()) { 741 FindTypes(t, types, namespaceStart); 742 } 743 744 // events 745 foreach (EventInfo info in type.GetEvents()) { 746 FindTypes(info.EventHandlerType, types, namespaceStart); 747 FindTypes(info.DeclaringType, types, namespaceStart); 748 } 749 750 // properties 751 foreach (PropertyInfo info in type.GetProperties()) { 752 FindTypes(info.PropertyType, types, namespaceStart); 753 } 754 755 // fields 756 foreach (FieldInfo info in type.GetFields()) { 757 FindTypes(info.FieldType, types, namespaceStart); 758 } 759 760 // methods 761 foreach (MethodInfo info in type.GetMethods()) { 762 foreach (ParameterInfo paramInfo in info.GetParameters()) { 763 FindTypes(paramInfo.ParameterType, types, namespaceStart); 764 } 765 FindTypes(info.ReturnType, types, namespaceStart); 766 } 767 } 768 } 543 //private void UpdateSnapshot(ResponseObject<SerializedJob> response) { 544 // JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id); 545 // jobItem.LatestSnapshot = response; 546 //} 547 548 //public void RequestSnapshot(Guid jobId) { 549 // Thread t = new Thread(() => { 550 // IClientFacade clientFacade = null; 551 // try { 552 // clientFacade = CreateStreamedClientFacade(); 553 554 // ResponseObject<SerializedJob> response; 555 // int retryCount = 0; 556 557 // Response snapShotResponse = clientFacade.RequestSnapshot(jobId); 558 // if (snapShotResponse.StatusMessage == ResponseStatus.RequestSnapshot_JobIsNotBeeingCalculated) { 559 // // job already finished 560 // Logger.Debug("HiveExperiment: Abort - GetLastResult(false)"); 561 // response = clientFacade.GetLastSerializedResult(jobId, false, false); 562 // Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage); 563 // } else { 564 // // server sent snapshot request to client 565 // // poll until snapshot is ready 566 // do { 567 // Thread.Sleep(snapshotPollingIntervalMs); 568 // Logger.Debug("HiveExperiment: Abort - GetLastResult(true)"); 569 // response = clientFacade.GetLastSerializedResult(jobId, false, true); 570 // Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage); 571 // retryCount++; 572 // // loop while 573 // // 1. problem with communication with server 574 // // 2. job result not yet ready 575 // } while ( 576 // (retryCount < maxSnapshotRetries) && ( 577 // response.StatusMessage == ResponseStatus.GetLastSerializedResult_JobResultNotYetThere) 578 // ); 579 // } 580 // if (response.StatusMessage == ResponseStatus.Ok) { 581 // LogMessage(jobId, "Snapshot polling successfull for job " + jobId); 582 // //UpdateSnapshot(response); 583 // } else { 584 // LogMessage(jobId, "Error: Polling of Snapshot failed for job " + jobId + ": " + response.StatusMessage); 585 // } 586 // } 587 // catch (Exception e) { 588 // LogMessage("RequestSnapshot Thread failed badly: " + e.Message); 589 // Logger.Error("RequestSnapshot Thread failed badly: " + e.Message); 590 // } 591 // finally { 592 // ServiceLocator.DisposeClientFacade(clientFacade); 593 // } 594 // }); 595 // t.Start(); 596 //} 597 598 //void JobItem_SnapshotRequestedStateChanged(object sender, EventArgs e) { 599 // JobItem jobItem = (JobItem)sender; 600 // if (jobItem.SnapshotRequestedState == SnapshotRequestedState.Requested) { 601 // RequestSnapshot(jobItem.JobDto.Id); 602 // } 603 //} 604 769 605 #endregion 770 606 … … 827 663 LogMessage("Experiment changed"); 828 664 EventHandler handler = ExperimentChanged; 665 if (handler != null) handler(this, EventArgs.Empty); 666 } 667 668 public event EventHandler RootJobChanged; 669 private void OnRootJobChanged() { 670 EventHandler handler = RootJobChanged; 671 if (handler != null) handler(this, EventArgs.Empty); 672 } 673 674 public event EventHandler RootJobItemChanged; 675 private void OnRootJobItemChanged() { 676 EventHandler handler = RootJobItemChanged; 829 677 if (handler != null) handler(this, EventArgs.Empty); 830 678 } … … 843 691 } 844 692 845 private void RegisterJobItemListEvents() { 846 jobItems.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset); 847 jobItems.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded); 848 jobItems.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved); 849 jobItems.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced); 850 foreach (JobItem jobItem in jobItems) { 851 RegisterJobItemEvents(jobItem); 852 } 853 } 854 855 private void DeregisterJobItemListEvents() { 856 jobItems.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset); 857 jobItems.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded); 858 jobItems.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved); 859 jobItems.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced); 860 foreach (JobItem jobItem in jobItems) { 861 DeregisterJobItemEvents(jobItem); 862 } 863 } 864 865 void jobItems_ItemsReplaced(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) { 866 UpdateJobItemEvents(e); 867 } 868 869 private void UpdateJobItemEvents(CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) { 870 if (e.OldItems != null) { 871 foreach (var item in e.OldItems) { 872 DeregisterJobItemEvents(item.Value); 873 } 874 } 875 if (e.Items != null) { 876 foreach (var item in e.Items) { 877 RegisterJobItemEvents(item.Value); 878 } 879 } 880 } 881 882 private void RegisterJobItemEvents(JobItem jobItem) { 883 jobItem.SnapshotRequestedStateChanged += new EventHandler(JobItem_SnapshotRequestedStateChanged); 884 jobItem.JobStateChanged += new EventHandler(JobItem_JobStateChanged); 885 } 886 887 private void DeregisterJobItemEvents(JobItem jobItem) { 888 jobItem.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged); 889 jobItem.JobStateChanged -= new EventHandler(JobItem_JobStateChanged); 890 } 891 892 void jobItems_ItemsRemoved(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) { 893 UpdateJobItemEvents(e); 894 } 895 896 void jobItems_ItemsAdded(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) { 897 UpdateJobItemEvents(e); 898 } 899 900 void jobItems_CollectionReset(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) { 901 foreach (var item in e.OldItems) { 902 item.Value.JobStateChanged -= new EventHandler(JobItem_JobStateChanged); 903 item.Value.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged); 904 } 693 void HiveExperiment_RootJobChanged(object sender, EventArgs e) { 694 this.Experiment = (Optimization.Experiment)((ExperimentJob)rootJobItem.Job).Optimizer; 695 } 696 697 void rootJobItem_FinalResultAvailable(object sender, EventArgs e) { 698 this.Experiment = (Optimization.Experiment)((ExperimentJob)rootJobItem.Job).Optimizer; 905 699 } 906 700 #endregion 907 908 #region Helper Functions 909 private IClientFacade CreateClientFacade() { 910 IClientFacade clientFacade = null; 911 do { 912 try { 913 //clientFacade = ServiceLocator.CreateClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientServiceName)); 914 clientFacade = ServiceLocator.CreateClientFacade(Settings.Default.HiveServerIp); 915 916 } 917 catch (EndpointNotFoundException exception) { 918 LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec."); 919 Thread.Sleep(resultPollingIntervalMs); 920 } 921 } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped); 922 return clientFacade; 923 } 924 925 private IClientFacade CreateStreamedClientFacade() { 926 IClientFacade clientFacade = null; 927 do { 928 try { 929 //clientFacade = ServiceLocator.CreateStreamedClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientStreamedServiceName)); 930 clientFacade = ServiceLocator.CreateStreamedClientFacade(Settings.Default.HiveServerIp); 931 } 932 catch (EndpointNotFoundException exception) { 933 LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec."); 934 Thread.Sleep(resultPollingIntervalMs); 935 } 936 } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped); 937 return clientFacade; 938 } 939 940 private void AddRange(IDictionary<IOptimizer, IOptimizer> optimizers, IDictionary<IOptimizer, IOptimizer> childs) { 941 foreach (KeyValuePair<IOptimizer, IOptimizer> kvp in childs) { 942 optimizers.Add(kvp); 943 } 944 } 945 946 #endregion 947 701 948 702 #region Logging 949 703 private void LogMessage(string message) { … … 951 705 lock (locker) { 952 706 log.LogMessage(message); 707 Logger.Debug(message); 953 708 } 954 709 } 955 710 956 711 private void LogMessage(Guid jobId, string message) { 957 GetJobItemById(jobId).LogMessage(message);712 //GetJobItemById(jobId).LogMessage(message); 958 713 LogMessage(message + " (jobId: " + jobId + ")"); 959 714 } 960 715 961 716 #endregion 717 718 /// <summary> 719 /// Downloads the root job from hive and sets the experiment, rootJob and rootJobItem 720 /// </summary> 721 public void LoadExperimentFromHive() { 722 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 723 ResponseObject<SerializedJob> response = service.Obj.GetLastSerializedResult(RootJobItem.JobDto.Id); 724 IJob job = XmlParser.Deserialize<IJob>(new MemoryStream(response.Obj.SerializedJobData)); 725 RootJob = job; 726 RootJobItem.JobDto = response.Obj.JobInfo; 727 this.Experiment = (Optimization.Experiment)((OptimizerJob)job).Optimizer; 728 if (rootJobItem.JobDto.State == JobState.Finished) { 729 // set execution time and finish up 730 } else if (rootJobItem.JobDto.State != JobState.Aborted && rootJobItem.JobDto.State != JobState.Failed) { 731 this.ExecutionState = Core.ExecutionState.Started; 732 } 733 } 734 } 962 735 } 963 736 }
Note: See TracChangeset
for help on using the changeset viewer.