Changeset 4133 for branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs
- Timestamp:
- 08/02/10 17:27:24 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs
r4121 r4133 53 53 private const string itemDescription = "An experiment which contains multiple batch runs of algorithms which are executed in the Hive."; 54 54 private const int resultPollingIntervalMs = 15000; 55 55 private const int maxSnapshotRetries = 20; 56 56 private object locker = new object(); 57 private const int maxSnapshotRetries = 20; 57 58 58 private System.Timers.Timer timer; 59 59 private bool pausePending, stopPending; 60 61 [Storable] 60 62 private DateTime lastUpdateTime; 61 63 64 private bool isPollingResults; 65 public bool IsPollingResults { 66 get { return isPollingResults; } 67 private set { 68 if (isPollingResults != value) { 69 isPollingResults = value; 70 OnIsPollingResultsChanged(); 71 } 72 } 73 } 74 75 private bool stopResultsPollingPending = false; 76 77 private IDictionary<Guid, Thread> resultPollingThreads; 78 62 79 [Storable] 63 80 private IDictionary<Guid, IOptimizer> pendingOptimizers = new Dictionary<Guid, IOptimizer>(); … … 68 85 get { return jobItems; } 69 86 } 70 87 71 88 72 89 [Storable] … … 76 93 set { 77 94 if (serverUrl != value) { 78 serverUrl = value; 95 serverUrl = value; 79 96 OnServerUrlChanged(); 80 97 } … … 88 105 set { 89 106 if (resourceIds != value) { 90 resourceIds = value; 107 resourceIds = value; 91 108 OnResourceIdsChanged(); 92 109 } … … 115 132 public HiveExperiment(bool deserializing) 116 133 : base(deserializing) { 134 this.resultPollingThreads = new Dictionary<Guid, Thread>(); 135 jobItems = new JobItemList(); 117 136 } 118 137 … … 124 143 pausePending = stopPending = false; 125 144 jobItems = new JobItemList(); 145 isPollingResults = false; 146 resultPollingThreads = new Dictionary<Guid, Thread>(); 126 147 InitTimer(); 127 148 } … … 142 163 clone.pausePending = this.pausePending; 143 164 clone.jobItems = (JobItemList)cloner.Clone(jobItems); 165 clone.lastUpdateTime = this.lastUpdateTime; 166 clone.isPollingResults = this.isPollingResults; 144 167 return clone; 145 168 } … … 148 171 private void AfterDeserialization() { 149 172 InitTimer(); 173 this.IsPollingResults = false; 174 this.stopResultsPollingPending = false; 150 175 LogMessage("I was deserialized."); 151 176 } … … 215 240 this.ExecutionState = Core.ExecutionState.Started; 216 241 Thread t = new Thread(() => { 217 IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(ServerUrl);242 IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade(); 218 243 219 244 pendingOptimizers = new Dictionary<Guid, IOptimizer>(); … … 234 259 }; 235 260 jobItems.Add(jobItem); 236 237 LogMessage("Sent job to server (jobId: " + response.Obj.Id + ")"); 238 } 239 261 jobItem.LogMessage("Job sent to Hive"); 262 263 LogMessage("Sent job to Hive (jobId: " + response.Obj.Id + ")"); 264 } 265 240 266 // start results polling after sending sending the jobs to the server (to avoid race conflicts at the optimizers-collection) 241 foreach (JobItem jobItem in jobItems) { 242 StartResultPollingThread(jobItem.JobDto); 243 } 267 StartResultPolling(); 244 268 }); 245 269 t.Start(); 270 } 271 272 private void CreateResultPollingThreads() { 273 foreach(JobItem jobItem in JobItems) { 274 resultPollingThreads.Add(jobItem.JobDto.Id, CreateResultPollingThread(jobItem.JobDto)); 275 } 276 } 277 278 public void StartResultPolling() { 279 this.stopResultsPollingPending = false; 280 CreateResultPollingThreads(); 281 foreach (Thread pollingThread in resultPollingThreads.Values) { 282 pollingThread.Start(); 283 } 284 this.IsPollingResults = true; 285 } 286 287 public void StopResultPolling() { 288 this.stopResultsPollingPending = true; 289 foreach (Thread pollingThread in resultPollingThreads.Values) { 290 pollingThread.Interrupt(); 291 } 292 this.stopResultsPollingPending = false; 293 } 294 295 private JobItem GetJobItemById(Guid jobId) { 296 return jobItems.Single(x => x.JobDto.Id == jobId); 246 297 } 247 298 … … 267 318 268 319 public void Stop() { 269 // todo 270 } 271 320 foreach(JobItem jobItem in jobItems) { 321 AbortJob(jobItem.JobDto.Id); 322 } 323 } 324 325 public void AbortJob(Guid jobId) { 326 IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade(); 327 executionEngineFacade.AbortJob(jobId); 328 resultPollingThreads[jobId].Interrupt(); 329 GetJobItemById(jobId).LogMessage("Aborting Job"); 330 } 272 331 #endregion 332 333 private IExecutionEngineFacade GetExecutionEngineFacade() { 334 return ServiceLocator.CreateExecutionEngineFacade(ServerUrl); 335 } 273 336 274 337 private SerializedJob CreateSerializedJob(IOptimizer optimizer) { … … 307 370 } 308 371 309 private void StartResultPollingThread(JobDto job) { 310 Thread t = new Thread(() => { 311 IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(ServerUrl); 312 IJob restoredObject = null; 313 314 do { 315 Thread.Sleep(resultPollingIntervalMs); 316 //lock (locker) { [chn] try without locking for better performance 317 if (stopPending) return; 372 private Thread CreateResultPollingThread(JobDto job) { 373 return new Thread(() => { 374 try { 375 GetJobItemById(job.Id).LogMessage("Starting job results polling"); 376 IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade(); 377 IJob restoredObject = null; 378 379 do { 380 Thread.Sleep(resultPollingIntervalMs); 381 if (stopPending || !this.IsPollingResults) { 382 return; 383 } 318 384 319 385 ResponseObject<JobDto> response = executionEngineFacade.GetJobById(job.Id); 320 386 LogMessage("Response: " + response.StatusMessage + " (jobId: " + job.Id + ")"); 387 GetJobItemById(job.Id).LogMessage("Response: " + response.StatusMessage); 321 388 322 389 if (response.Obj != null) { 323 390 UpdateJobItem(response.Obj); 324 391 } 325 392 326 393 // loop while 327 394 // 1. the user doesn't request an abort … … 334 401 UpdateSnapshot(jobResponse); 335 402 } 336 //} 337 } while (restoredObject == null || restoredObject.ExecutionState != Core.ExecutionState.Stopped); 338 339 LogMessage("Job finished (jobId: " + job.Id + ")"); 340 // job retrieved... replace the existing optimizers with the finished one 341 IOptimizer originalOptimizer = pendingOptimizers[job.Id]; 342 IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer; 343 344 ReplaceOptimizer(originalOptimizer, restoredOptimizer); 345 pendingOptimizers.Remove(job.Id); 346 347 if (pendingOptimizers.Count == 0) { 348 // finished 349 this.ExecutionState = Core.ExecutionState.Stopped; 350 OnStopped(); 403 } while (restoredObject == null || restoredObject.ExecutionState != Core.ExecutionState.Stopped); 404 405 LogMessage("Job finished (jobId: " + job.Id + ")"); 406 GetJobItemById(job.Id).LogMessage("Job finished"); 407 // job retrieved... replace the existing optimizers with the finished one 408 IOptimizer originalOptimizer = pendingOptimizers[job.Id]; 409 IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer; 410 411 ReplaceOptimizer(originalOptimizer, restoredOptimizer); 412 pendingOptimizers.Remove(job.Id); 413 414 if (pendingOptimizers.Count == 0) { 415 // finished 416 this.ExecutionState = Core.ExecutionState.Stopped; 417 OnStopped(); 418 } 419 } catch (ThreadInterruptedException exception) { 420 421 } finally { 422 GetJobItemById(job.Id).LogMessage("ResultsPolling Thread stopped"); 423 resultPollingThreads.Remove(job.Id); 424 if (resultPollingThreads.Count == 0) { 425 IsPollingResults = false; 426 } 351 427 } 352 428 }); 353 t.Start();354 429 } 355 430 … … 370 445 } 371 446 } 372 447 373 448 #region Required Plugin Search 374 449 /// <summary> … … 523 598 } 524 599 600 public event EventHandler IsResultsPollingChanged; 601 private void OnIsPollingResultsChanged() { 602 if (this.IsPollingResults) { 603 LogMessage("Results Polling Started"); 604 timer.Start(); 605 } else { 606 LogMessage("Results Polling Stopped"); 607 timer.Stop(); 608 } 609 EventHandler handler = IsResultsPollingChanged; 610 if (handler != null) handler(this, EventArgs.Empty); 611 } 525 612 #endregion 526 613 }
Note: See TracChangeset
for help on using the changeset viewer.