Changeset 4333 for branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs
- Timestamp:
- 08/27/10 08:35:43 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs
r4316 r4333 67 67 private Semaphore fetchJobSemaphore = new Semaphore(2, 2); 68 68 69 private static object pendingOptimizerMappingsLocker = new object(); 70 69 71 private bool stopResultsPollingPending = false; 70 72 … … 197 199 clone.pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>(); 198 200 199 foreach (var pair in this.pendingOptimizersByJobId) 200 clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value); 201 202 foreach (var pair in this.parentOptimizersByPendingOptimizer) 203 clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value); 204 201 lock (pendingOptimizerMappingsLocker) { 202 foreach (var pair in this.pendingOptimizersByJobId) 203 clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value); 204 205 foreach (var pair in this.parentOptimizersByPendingOptimizer) 206 clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value); 207 } 205 208 clone.log = (ILog)cloner.Clone(log); 206 209 clone.stopPending = this.stopPending; … … 243 246 if (experiment != null) { 244 247 StopResultPolling(); 245 lock (pendingOptimizer sByJobId) {248 lock (pendingOptimizerMappingsLocker) { 246 249 pendingOptimizersByJobId.Clear(); 247 }248 parentOptimizersByPendingOptimizer.Clear();250 parentOptimizersByPendingOptimizer.Clear(); 251 } 249 252 lock (jobItems) { 250 253 jobItems.Clear(); … … 275 278 276 279 IEnumerable<string> groups = ResourceGroups; 277 278 foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) { 279 SerializedJob serializedJob = CreateSerializedJob(optimizer); 280 ResponseObject<JobDto> response = clientFacade.AddJobWithGroupStrings(serializedJob, groups); 281 lock (pendingOptimizersByJobId) { 280 lock (pendingOptimizerMappingsLocker) { 281 foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) { 282 SerializedJob serializedJob = CreateSerializedJob(optimizer); 283 ResponseObject<JobDto> response = clientFacade.AddJobWithGroupStrings(serializedJob, groups); 282 284 pendingOptimizersByJobId.Add(response.Obj.Id, optimizer); 285 286 JobItem jobItem = new JobItem() { 287 JobDto = response.Obj, 288 LatestSnapshot = null, 289 Optimizer = optimizer 290 }; 291 lock (jobItems) { 292 jobItems.Add(jobItem); 293 } 294 LogMessage(jobItem.JobDto.Id, "Job sent to Hive"); 283 295 } 284 285 JobItem jobItem = new JobItem() {286 JobDto = response.Obj,287 LatestSnapshot = null,288 Optimizer = optimizer289 };290 lock (jobItems) {291 jobItems.Add(jobItem);292 }293 LogMessage(jobItem.JobDto.Id, "Job sent to Hive");294 296 } 295 297 } … … 396 398 397 399 private bool NoMorePendingOptimizers() { 398 lock (pendingOptimizer sByJobId) {400 lock (pendingOptimizerMappingsLocker) { 399 401 return pendingOptimizersByJobId.Count == 0; 400 402 } … … 408 410 /// <param name="jobId"></param> 409 411 private void DisposeOptimizerMappings(Guid jobId) { 410 lock (pendingOptimizersByJobId) {411 LogMessage(jobId, "Disposing Optimizer Mappings");412 LogMessage(jobId, "Disposing Optimizer Mappings"); 413 lock (pendingOptimizerMappingsLocker) { 412 414 parentOptimizersByPendingOptimizer.Remove(pendingOptimizersByJobId[jobId]); 413 415 pendingOptimizersByJobId.Remove(jobId); … … 436 438 private void JobItem_JobStateChanged(object sender, EventArgs e) { 437 439 JobItem jobItem = (JobItem)sender; 440 438 441 Thread t = new Thread(() => { 439 if (jobItem.State == JobState.Finished) { 440 FetchAndUpdateJob(jobItem.JobDto.Id); 441 DisposeOptimizerMappings(jobItem.JobDto.Id); 442 } else if (jobItem.State == JobState.Failed) { 443 DisposeOptimizerMappings(jobItem.JobDto.Id); 444 } 445 446 if (NoMorePendingOptimizers()) { 447 StopResultPolling(); 448 this.ExecutionState = Core.ExecutionState.Stopped; 449 OnStopped(); 442 try { 443 if (jobItem.State == JobState.Finished) { 444 FetchAndUpdateJob(jobItem.JobDto.Id); 445 DisposeOptimizerMappings(jobItem.JobDto.Id); 446 } else if (jobItem.State == JobState.Failed) { 447 DisposeOptimizerMappings(jobItem.JobDto.Id); 448 } 449 450 if (NoMorePendingOptimizers()) { 451 StopResultPolling(); 452 this.ExecutionState = Core.ExecutionState.Stopped; 453 OnStopped(); 454 } 455 } 456 catch (Exception ex) { 457 Logger.Error("JobItem_JobStateChanged failed badly: " + ex.Message); 458 LogMessage("JobItem_JobStateChanged failed badly: " + ex.Message); 450 459 } 451 460 }); … … 457 466 /// </summary> 458 467 private void FetchAndUpdateJob(Guid jobId) { 468 bool tryagain = false; 459 469 LogMessage(jobId, "FetchAndUpdateJob started"); 460 IClientFacade clientFacade = CreateStreamedClientFacade(); 461 IOptimizer originalOptimizer; 462 lock (pendingOptimizersByJobId) { 463 originalOptimizer = pendingOptimizersByJobId[jobId]; 464 } 465 466 fetchJobSemaphore.WaitOne(); 467 ResponseObject<SerializedJob> jobResponse = clientFacade.GetLastSerializedResult(jobId, false, false); 468 ServiceLocator.DisposeClientFacade(clientFacade); 469 IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData)); 470 IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer; 471 472 ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer); 473 fetchJobSemaphore.Release(); 474 LogMessage(jobId, "FetchAndUpdateJob ended"); 470 if (fetchJobSemaphore.WaitOne(new TimeSpan(0, 2, 0))) { 471 IClientFacade clientFacade = null; 472 try { 473 clientFacade = CreateStreamedClientFacade(); 474 IOptimizer originalOptimizer; 475 originalOptimizer = pendingOptimizersByJobId[jobId]; 476 477 ResponseObject<SerializedJob> jobResponse = clientFacade.GetLastSerializedResult(jobId, false, false); 478 IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData)); 479 IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer; 480 ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer); 481 LogMessage(jobId, "FetchAndUpdateJob ended"); 482 } 483 catch (Exception e) { 484 LogMessage(jobId, "FetchAndUpdateJob failed: " + e.Message + ". Will try again!"); 485 tryagain = true; 486 } 487 finally { 488 ServiceLocator.DisposeClientFacade(clientFacade); 489 fetchJobSemaphore.Release(); 490 } 491 } else { 492 LogMessage(jobId, "FetchAndUpdateJob timed out. Will try again!"); 493 tryagain = true; 494 } 495 496 if (tryagain) { 497 FetchAndUpdateJob(jobId); 498 } 475 499 } 476 500 … … 509 533 PluginsNeeded = pluginsNeeded, 510 534 State = JobState.Offline, 511 MemoryNeeded = 0, 512 UserId = Guid.Empty // [chn] set real userid here! 535 MemoryNeeded = 0 513 536 }; 514 537 … … 537 560 public void StopResultPolling() { 538 561 this.stopResultsPollingPending = true; 539 resultPollingThread.Interrupt(); 562 if (resultPollingThread != null && resultPollingThread.ThreadState == System.Threading.ThreadState.WaitSleepJoin) { 563 resultPollingThread.Interrupt(); 564 } 540 565 this.stopResultsPollingPending = false; 541 566 } … … 581 606 // thread has been interuppted 582 607 } 608 catch (Exception e) { 609 LogMessage("Result Polling Thread failed badly: " + e.Message); 610 Logger.Error("Result Polling Thread failed badly: " + e.Message); 611 } 583 612 finally { 584 613 this.IsPollingResults = false; … … 598 627 public void RequestSnapshot(Guid jobId) { 599 628 Thread t = new Thread(() => { 600 IClientFacade clientFacade = CreateStreamedClientFacade();629 IClientFacade clientFacade = null; 601 630 try { 631 clientFacade = CreateStreamedClientFacade(); 632 602 633 ResponseObject<SerializedJob> response; 603 634 int retryCount = 0; … … 633 664 } 634 665 } 666 catch (Exception e) { 667 LogMessage("RequestSnapshot Thread failed badly: " + e.Message); 668 Logger.Error("RequestSnapshot Thread failed badly: " + e.Message); 669 } 635 670 finally { 636 671 ServiceLocator.DisposeClientFacade(clientFacade); … … 885 920 clientFacade = ServiceLocator.CreateClientFacade(Settings.Default.HiveServerIp); 886 921 887 } catch (EndpointNotFoundException exception) { 922 } 923 catch (EndpointNotFoundException exception) { 888 924 LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec."); 889 925 Thread.Sleep(resultPollingIntervalMs); … … 899 935 //clientFacade = ServiceLocator.CreateStreamedClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientStreamedServiceName)); 900 936 clientFacade = ServiceLocator.CreateStreamedClientFacade(Settings.Default.HiveServerIp); 901 } catch (EndpointNotFoundException exception) { 937 } 938 catch (EndpointNotFoundException exception) { 902 939 LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec."); 903 940 Thread.Sleep(resultPollingIntervalMs);
Note: See TracChangeset
for help on using the changeset viewer.