- Timestamp:
- 09/07/10 10:22:27 (14 years ago)
- Location:
- branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3
- Files:
-
- 7 added
- 1 deleted
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HeuristicLab.Hive.Experiment-3.3.csproj
r4342 r4368 17 17 </PropertyGroup> 18 18 <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x86' "> 19 <PlatformTarget> AnyCPU</PlatformTarget>19 <PlatformTarget>x64</PlatformTarget> 20 20 <DebugSymbols>true</DebugSymbols> 21 21 <DebugType>full</DebugType> … … 53 53 <PropertyGroup> 54 54 <AssemblyOriginatorKeyFile>HeuristicLab.snk</AssemblyOriginatorKeyFile> 55 </PropertyGroup> 56 <PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Debug|x64'"> 57 <DebugSymbols>true</DebugSymbols> 58 <OutputPath>bin\x64\Debug\</OutputPath> 59 <PlatformTarget>x64</PlatformTarget> 60 <CodeAnalysisLogFile>bin\Debug\HeuristicLab.Hive.Experiment-3.3.dll.CodeAnalysisLog.xml</CodeAnalysisLogFile> 61 <CodeAnalysisUseTypeNameInSuppression>true</CodeAnalysisUseTypeNameInSuppression> 62 <CodeAnalysisModuleSuppressionsFile>GlobalSuppressions.cs</CodeAnalysisModuleSuppressionsFile> 63 <CodeAnalysisRuleSet>AllRules.ruleset</CodeAnalysisRuleSet> 64 <CodeAnalysisRuleSetDirectories>;c:\Program Files (x86)\Microsoft Visual Studio 10.0\Team Tools\Static Analysis Tools\\Rule Sets</CodeAnalysisRuleSetDirectories> 65 <CodeAnalysisIgnoreBuiltInRuleSets>false</CodeAnalysisIgnoreBuiltInRuleSets> 66 <CodeAnalysisRuleDirectories>;c:\Program Files (x86)\Microsoft Visual Studio 10.0\Team Tools\Static Analysis Tools\FxCop\\Rules</CodeAnalysisRuleDirectories> 67 <CodeAnalysisIgnoreBuiltInRules>false</CodeAnalysisIgnoreBuiltInRules> 68 </PropertyGroup> 69 <PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Release|x64'"> 70 <OutputPath>bin\x64\Release\</OutputPath> 71 <PlatformTarget>x64</PlatformTarget> 72 <CodeAnalysisLogFile>bin\Release\HeuristicLab.Hive.Experiment-3.3.dll.CodeAnalysisLog.xml</CodeAnalysisLogFile> 73 <CodeAnalysisUseTypeNameInSuppression>true</CodeAnalysisUseTypeNameInSuppression> 74 <CodeAnalysisModuleSuppressionsFile>GlobalSuppressions.cs</CodeAnalysisModuleSuppressionsFile> 75 <CodeAnalysisRuleSet>MinimumRecommendedRules.ruleset</CodeAnalysisRuleSet> 76 <CodeAnalysisRuleSetDirectories>;c:\Program Files (x86)\Microsoft Visual Studio 10.0\Team Tools\Static Analysis Tools\\Rule Sets</CodeAnalysisRuleSetDirectories> 77 <CodeAnalysisIgnoreBuiltInRuleSets>false</CodeAnalysisIgnoreBuiltInRuleSets> 78 <CodeAnalysisRuleDirectories>;c:\Program Files (x86)\Microsoft Visual Studio 10.0\Team Tools\Static Analysis Tools\FxCop\\Rules</CodeAnalysisRuleDirectories> 79 <CodeAnalysisIgnoreBuiltInRules>false</CodeAnalysisIgnoreBuiltInRules> 55 80 </PropertyGroup> 56 81 <ItemGroup> … … 93 118 </ItemGroup> 94 119 <ItemGroup> 120 <Compile Include="JobItem.cs" /> 95 121 <Compile Include="JobItemList.cs" /> 96 <Compile Include="JobItem.cs" /> 97 <Compile Include="OptimizerJob.cs" /> 122 <Compile Include="Jobs\BachRunJob.cs" /> 123 <Compile Include="Jobs\JobList.cs" /> 124 <Compile Include="Jobs\ExperimentJob.cs" /> 125 <Compile Include="HiveClient.cs" /> 126 <Compile Include="HiveExperimentList.cs" /> 127 <Compile Include="Jobs\OptimizerJob.cs" /> 98 128 <Compile Include="HeuristicLabHiveExperimentPlugin.cs" /> 99 129 <Compile Include="HiveExperiment.cs" /> -
branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs
r4342 r4368 45 45 using HeuristicLab.Hive.Experiment.Properties; 46 46 using System.ComponentModel; 47 using HeuristicLab.Hive.Experiment.Jobs; 47 48 48 49 namespace HeuristicLab.Hive.Experiment { … … 51 52 /// </summary> 52 53 [Item(itemName, itemDescription)] 53 [Creatable("Testing & Analysis")]54 54 [StorableClass] 55 55 public class HiveExperiment : NamedItem, IExecutable { … … 63 63 private System.Timers.Timer timer; 64 64 private bool pausePending, stopPending; 65 private bool sendingJobsFinished = false;66 65 67 66 // ensure that only 2 threads can fetch jobresults simultaniously … … 98 97 [Storable] 99 98 private DateTime lastUpdateTime; 100 101 /// <summary> 102 /// Mapping from JobId to an optimizer. 103 /// Stores all pending optimizers. If an optimizer is finished it is removed from this collection 104 /// </summary> 105 [Storable] 106 private IDictionary<Guid, IOptimizer> pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>(); 107 108 /// <summary> 109 /// Stores a mapping from the child-optimizer to the parent optimizer. 110 /// Needed to replace a finished optimizer in the optimizer-tree. 111 /// Only pending optmizers are stored. 112 /// </summary> 113 [Storable] 114 private IDictionary<IOptimizer, IOptimizer> parentOptimizersByPendingOptimizer = new Dictionary<IOptimizer, IOptimizer>(); 115 116 [Storable] 117 private JobItemList jobItems; 118 public JobItemList JobItems { 119 get { return jobItems; } 120 } 121 99 122 100 [Storable] 123 101 private string resourceIds; … … 173 151 } 174 152 } 153 154 [Storable] 155 private IJob rootJob; 156 public IJob RootJob { 157 get { return rootJob; } 158 set { 159 if (rootJob != value) { 160 rootJob = value; 161 OnRootJobChanged(); 162 } 163 } 164 } 165 166 private JobItem rootJobItem; 167 public JobItem RootJobItem { 168 get { return rootJobItem; } 169 set { 170 if (rootJobItem != null) { 171 DeregisterRootJobItemEvents(); 172 } 173 if (rootJobItem != value) { 174 rootJobItem = value; 175 RegisterRootJobItemEvents(); 176 OnRootJobItemChanged(); 177 } 178 } 179 } 180 181 private void RegisterRootJobItemEvents() { 182 rootJobItem.FinalResultAvailable += new EventHandler(rootJobItem_FinalResultAvailable); 183 } 184 185 private void DeregisterRootJobItemEvents() { 186 rootJobItem.FinalResultAvailable -= new EventHandler(rootJobItem_FinalResultAvailable); 187 } 188 175 189 #endregion 176 190 … … 185 199 this.log = new Log(); 186 200 pausePending = stopPending = false; 187 jobItems = new JobItemList();188 201 isPollingResults = false; 189 RegisterJobItemListEvents();190 202 InitTimer(); 191 203 } … … 198 210 clone.executionState = this.executionState; 199 211 clone.executionTime = this.executionTime; 200 clone.pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();201 202 lock (pendingOptimizerMappingsLocker) {203 foreach (var pair in this.pendingOptimizersByJobId)204 clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);205 206 foreach (var pair in this.parentOptimizersByPendingOptimizer)207 clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value);208 }209 212 clone.log = (ILog)cloner.Clone(log); 210 213 clone.stopPending = this.stopPending; 211 214 clone.pausePending = this.pausePending; 212 clone.jobItems.AddRange((JobItemList)cloner.Clone(jobItems));213 215 clone.lastUpdateTime = this.lastUpdateTime; 214 216 clone.isPollingResults = this.isPollingResults; 217 clone.rootJob = (IJob)cloner.Clone(this.rootJob); 215 218 return clone; 216 219 } … … 221 224 this.IsPollingResults = false; 222 225 this.stopResultsPollingPending = false; 223 Register JobItemListEvents();226 RegisterEvents(); 224 227 LogMessage("I was deserialized."); 228 } 229 230 private void RegisterEvents() { 231 RootJobChanged += new EventHandler(HiveExperiment_RootJobChanged); 232 } 233 234 private void DeRegisterEvents() { 235 RootJobChanged -= new EventHandler(HiveExperiment_RootJobChanged); 225 236 } 226 237 … … 247 258 if (experiment != null) { 248 259 StopResultPolling(); 249 lock (pendingOptimizerMappingsLocker) {250 pendingOptimizersByJobId.Clear();251 parentOptimizersByPendingOptimizer.Clear();252 }253 lock (jobItems) {254 jobItems.Clear();255 }256 260 experiment.Prepare(); 257 261 this.ExecutionState = Core.ExecutionState.Prepared; … … 261 265 262 266 public void Start() { 263 sendingJobsFinished = false;264 267 OnStarted(); 265 268 ExecutionTime = new TimeSpan(); 266 269 lastUpdateTime = DateTime.Now; 267 270 this.ExecutionState = Core.ExecutionState.Started; 268 StartResultPolling();269 271 270 272 Thread t = new Thread(() => { 271 IClientFacade clientFacade = CreateStreamedClientFacade(); 272 273 try { 274 pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>(); 275 276 LogMessage("Extracting jobs from Experiment"); 277 parentOptimizersByPendingOptimizer = GetOptimizers(true); 278 LogMessage("Extraction of jobs from Experiment finished"); 279 280 IEnumerable<string> groups = ResourceGroups; 281 lock (pendingOptimizerMappingsLocker) { 282 foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) { 283 SerializedJob serializedJob = CreateSerializedJob(optimizer); 284 ResponseObject<JobDto> response = clientFacade.AddJobWithGroupStrings(serializedJob, groups); 285 pendingOptimizersByJobId.Add(response.Obj.Id, optimizer); 286 287 JobItem jobItem = new JobItem() { 288 JobDto = response.Obj, 289 LatestSnapshot = null, 290 Optimizer = optimizer 291 }; 292 lock (jobItems) { 293 jobItems.Add(jobItem); 294 } 295 LogMessage(jobItem.JobDto.Id, "Job sent to Hive"); 273 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 274 try { 275 RootJobItem = ToJobItem(RootJob); 276 277 IEnumerable<string> groups = ResourceGroups; 278 SerializedJob serializedJob = RootJobItem.ToSerializedJob(); 279 ResponseObject<JobDto> response = service.Obj.AddJobWithGroupStrings(serializedJob, groups); 280 281 if (response.StatusMessage != ResponseStatus.Ok) { 282 throw new Exception(response.StatusMessage.ToString()); 283 } else { 284 RootJobItem.JobDto = response.Obj; 285 LogMessage(RootJobItem.JobDto.Id, "Job sent to Hive"); 286 287 StartResultPolling(); 296 288 } 297 289 } 298 } 299 catch (Exception e) { 300 LogMessage("Error: Starting HiveExperiment failed: " + e.Message); 301 this.ExecutionState = Core.ExecutionState.Stopped; 302 OnStopped(); 303 } 304 finally { 305 ServiceLocator.DisposeClientFacade(clientFacade); 306 } 307 sendingJobsFinished = true; 290 catch (Exception e) { 291 LogMessage("Error: Starting HiveExperiment failed: " + e.Message); 292 this.ExecutionState = Core.ExecutionState.Stopped; 293 OnStopped(); 294 } 295 } 308 296 }); 309 297 t.Start(); … … 311 299 312 300 public void Stop() { 301 if (IsPollingResults) 302 StopResultPolling(); 313 303 this.ExecutionState = Core.ExecutionState.Stopped; 314 foreach (JobItem jobItem in jobItems) {315 AbortJob(jobItem.JobDto.Id);316 }317 304 OnStopped(); 318 305 } 319 306 #endregion 320 321 #region Optimizier Management 322 /// <summary> 323 /// Returns all optimizers in the current Experiment 324 /// </summary> 325 /// <param name="flatout">if false only top level optimizers are returned, if true the optimizer-tree is flatted</param> 326 /// <returns></returns> 327 private IDictionary<IOptimizer, IOptimizer> GetOptimizers(bool flatout) { 328 if (!flatout) { 329 var optimizers = new Dictionary<IOptimizer, IOptimizer>(); 330 foreach (IOptimizer opt in experiment.Optimizers) { 331 optimizers.Add(experiment, opt); 332 } 333 return optimizers; 307 308 #region Job Management 309 public void RefreshJobTree() { 310 this.RootJob = CreateJobTree(this.Experiment); 311 } 312 313 private IJob CreateJobTree(IOptimizer optimizer) { 314 IJob job = null; 315 if (optimizer is HeuristicLab.Optimization.Experiment) { 316 HeuristicLab.Optimization.Experiment exp = (HeuristicLab.Optimization.Experiment)optimizer; 317 ExperimentJob expJob = new ExperimentJob(exp); 318 foreach (IOptimizer opt in exp.Optimizers) { 319 expJob.AddChildJob(CreateJobTree(opt)); 320 } 321 job = expJob; 322 } else if (optimizer is BatchRun) { 323 job = new BatchRunJob(optimizer); 334 324 } else { 335 return FlatOptimizerTree(null, experiment, ""); 336 } 337 } 338 339 /// <summary> 340 /// Recursively iterates all IOptimizers in the optimizer-tree and returns them. 341 /// 342 /// [chn] this could be implemented more cleanly if Experiment and BatchRun would implement an interface like: 343 /// interface IParallelizable { 344 /// IEnumerable<IOptimizer> GetOptimizers(); 345 /// } 346 /// </summary> 347 /// <returns>a dictionary mapping from the parent optimizer to the child optimizer</returns> 348 private IDictionary<IOptimizer, IOptimizer> FlatOptimizerTree(IOptimizer parent, IOptimizer optimizer, string prepend) { 349 IDictionary<IOptimizer, IOptimizer> optimizers = new Dictionary<IOptimizer, IOptimizer>(); 350 if (optimizer is HeuristicLab.Optimization.Experiment) { 351 HeuristicLab.Optimization.Experiment experiment = optimizer as HeuristicLab.Optimization.Experiment; 352 if (this.experiment != experiment) { 353 prepend += experiment.Name + "/"; // don't prepend for top-level optimizers 354 } 355 foreach (IOptimizer opt in experiment.Optimizers) { 356 AddRange(optimizers, FlatOptimizerTree(experiment, opt, prepend)); 357 } 358 } else if (optimizer is BatchRun) { 359 BatchRun batchRun = optimizer as BatchRun; 360 prepend += batchRun.Name + "/"; 361 for (int i = 0; i < batchRun.Repetitions; i++) { 362 IOptimizer opt = (IOptimizer)batchRun.Algorithm.Clone(); 363 opt.Name += " [" + i + "]"; 364 IDictionary<IOptimizer, IOptimizer> batchOptimizers = FlatOptimizerTree(batchRun, opt, prepend); 365 AddRange(optimizers, batchOptimizers); 366 } 367 } else if (optimizer is EngineAlgorithm) { 368 optimizer.Name = prepend + optimizer.Name; 369 optimizers.Add(optimizer, parent); 370 LogMessage("Optimizer extracted: " + optimizer.Name); 371 } else { 372 Logger.Warn("Optimizer of type " + optimizers.GetType().ToString() + " unknown"); 373 optimizer.Name = prepend + optimizer.Name; 374 optimizers.Add(optimizer, parent); 375 LogMessage("Optimizer extracted: " + optimizer.Name); 376 } 377 return optimizers; 378 } 379 380 private void ReplaceOptimizer(IOptimizer parentOptimizer, IOptimizer originalOptimizer, IOptimizer newOptimizer) { 381 lock (locker) { 382 if (parentOptimizer is HeuristicLab.Optimization.Experiment) { 383 HeuristicLab.Optimization.Experiment exp = (HeuristicLab.Optimization.Experiment)parentOptimizer; 384 int originalOptimizerIndex = exp.Optimizers.IndexOf(originalOptimizer); 385 exp.Optimizers[originalOptimizerIndex] = newOptimizer; 386 } else if (parentOptimizer is BatchRun) { 387 BatchRun batchRun = (BatchRun)parentOptimizer; 388 if (newOptimizer is IAlgorithm) { 389 batchRun.Runs.Add(new Run(newOptimizer.Name, (IAlgorithm)newOptimizer)); 390 } else { 391 throw new NotSupportedException("Only IAlgorithm types supported"); 392 } 393 } else { 394 throw new NotSupportedException("Invalid parentOptimizer"); 395 } 396 } 397 } 398 399 private bool NoMorePendingOptimizers() { 400 lock (pendingOptimizerMappingsLocker) { 401 return pendingOptimizersByJobId.Count == 0; 402 } 403 } 404 405 /// <summary> 406 /// Removes optimizers from 407 /// - parentOptimizersByPendingOptimizer 408 /// - pendingOptimizersByJobId 409 /// </summary> 410 /// <param name="jobId"></param> 411 private void DisposeOptimizerMappings(Guid jobId) { 412 LogMessage(jobId, "Disposing Optimizer Mappings"); 413 lock (pendingOptimizerMappingsLocker) { 414 parentOptimizersByPendingOptimizer.Remove(pendingOptimizersByJobId[jobId]); 415 pendingOptimizersByJobId.Remove(jobId); 416 } 417 } 418 419 #endregion 420 421 #region Job Management 325 job = new OptimizerJob(optimizer); 326 } 327 return job; 328 } 329 330 private JobItem ToJobItem(IJob j) { 331 return new JobItem() { 332 Job = j, 333 JobDto = new JobDto() { 334 State = JobState.Offline, 335 PluginsNeeded = HivePluginInfoDto.FindPluginsNeeded(j.GetType()) 336 } 337 }; 338 } 339 422 340 /// <summary> 423 341 /// Updates all JobItems with the results 342 /// if one is finished, the serialized job is downloaded and updated 424 343 /// </summary> 425 344 /// <param name="jobResultList"></param> 426 345 private void UpdateJobItems(JobResultList jobResultList) { 427 // use a Dict to avoid quadratic runtime complexity 428 IDictionary<Guid, JobResult> jobResultDict = jobResultList.ToDictionary(job => job.JobId); 429 lock (jobItems) { 430 foreach (JobItem jobItem in JobItems) { 431 if (jobResultDict.ContainsKey(jobItem.JobDto.Id)) { 432 jobItem.JobResult = jobResultDict[jobItem.JobDto.Id]; 346 if (RootJobItem == null) { 347 var rootResults = jobResultList.Where(res => !res.ParentJobId.HasValue); 348 if (rootResults.Count() > 0) { 349 RootJobItem = new JobItem(rootResults.First()); 350 } else { 351 LogMessage("Error: Could not find JobResult for RootJobItem"); 352 return; 353 } 354 } 355 356 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 357 foreach (JobResult jobResult in jobResultList) { 358 JobItem jobItem = FindJobItem(RootJobItem, jobResult.Id); 359 if (jobItem != null) { 360 jobItem.UpdateJob(jobResult); 361 if (jobItem.JobDto.State == JobState.Finished && !jobItem.IsFinalResultAvailable) { 362 // job is finished but the final result was not yet downloaded 363 SerializedJob serializedJob = service.Obj.GetLastSerializedResult(jobResult.Id).Obj; 364 jobItem.Job = XmlParser.Deserialize<IJob>(new MemoryStream(serializedJob.SerializedJobData)); 365 UpdateChildJobs(jobItem); 366 this.Experiment.Runs.AddRange(((OptimizerJob)jobItem.Job).Optimizer.Runs); 367 //ReplaceOptimizerInExperiment(jobItem.JobDto.Id, jobItem.Job); 368 } 369 } else { 370 // job does not yet exist locally 371 JobItem parentJobItem = FindJobItem(RootJobItem, jobResult.ParentJobId.Value); 372 if (parentJobItem != null) { 373 LogMessage(jobResult.Id, "Creating JobItem"); 374 SerializedJob serializedJob = service.Obj.GetLastSerializedResult(jobResult.Id).Obj; 375 JobItem newJobItem = new JobItem(); 376 newJobItem.JobDto = serializedJob.JobInfo; 377 newJobItem.Job = XmlParser.Deserialize<IJob>(new MemoryStream(serializedJob.SerializedJobData)); 378 parentJobItem.AddChildJob(newJobItem); 379 if (newJobItem.JobDto.State == JobState.Finished && newJobItem.IsFinalResultAvailable) { 380 this.Experiment.Runs.AddRange(((OptimizerJob)newJobItem.Job).Optimizer.Runs); 381 UpdateChildJobs(newJobItem); 382 //ReplaceOptimizerInExperiment(jobItem.JobDto.Id, jobItem.Job); 383 } 384 } else { 385 LogMessage("Error: Could not update JobResult for " + jobResult.Id); 386 } 433 387 } 434 388 } 435 389 } 390 } 391 392 /// <summary> 393 /// Updates the ChildJobItems of a JobItem according to the IJob.ChildJobs of JobItem.Job (pretty confusing, right) 394 /// </summary> 395 /// <param name="jobItem"></param> 396 private void UpdateChildJobs(JobItem jobItem) { 397 List<JobItem> newJobItems = new List<JobItem>(); 398 foreach (IJob job in jobItem.Job.ChildJobs) { 399 if (!jobItem.ReplaceChildJob(job)) { 400 newJobItems.Add(ToJobItem(job)); 401 } 402 } 403 foreach (JobItem item in newJobItems) { 404 jobItem.AddChildJob(item); 405 } 406 } 407 408 private JobItem FindJobItem(JobItem parentJobItem, Guid jobId) { 409 if (parentJobItem.JobDto.Id == jobId) { 410 return parentJobItem; 411 } else { 412 foreach (JobItem child in parentJobItem.ChildJobItems) { 413 JobItem result = FindJobItem(child, jobId); 414 if (result != null) 415 return result; 416 } 417 } 418 return null; 436 419 } 437 420 … … 441 424 Thread t = new Thread(() => { 442 425 try { 443 if (jobItem. State == JobState.Finished) {426 if (jobItem.JobDto.State == JobState.Finished) { 444 427 FetchAndUpdateJob(jobItem.JobDto.Id); 445 DisposeOptimizerMappings(jobItem.JobDto.Id); 446 } else if (jobItem.State == JobState.Failed) { 447 DisposeOptimizerMappings(jobItem.JobDto.Id); 428 } else if (jobItem.JobDto.State == JobState.Failed) { 429 448 430 } 449 450 if (NoMorePendingOptimizers()) {451 StopResultPolling();452 this.ExecutionState = Core.ExecutionState.Stopped;453 OnStopped();454 }455 431 } 456 432 catch (Exception ex) { 457 Logger.Error("JobItem_JobStateChanged failed badly: " + ex.Message);458 433 LogMessage("JobItem_JobStateChanged failed badly: " + ex.Message); 459 434 } … … 469 444 LogMessage(jobId, "FetchAndUpdateJob started"); 470 445 if (fetchJobSemaphore.WaitOne(new TimeSpan(0, 2, 0))) { 471 IClientFacade clientFacade = null; 446 472 447 try { 473 clientFacade = CreateStreamedClientFacade(); 474 IOptimizer originalOptimizer; 475 originalOptimizer = pendingOptimizersByJobId[jobId]; 476 477 ResponseObject<SerializedJob> jobResponse = clientFacade.GetLastSerializedResult(jobId, false, false); 478 IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData)); 479 IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer; 480 ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer); 481 LogMessage(jobId, "FetchAndUpdateJob ended"); 448 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 449 ResponseObject<SerializedJob> jobResponse = service.Obj.GetLastSerializedResult(jobId); 450 IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData)); 451 IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer; 452 LogMessage(jobId, "FetchAndUpdateJob ended"); 453 } 482 454 } 483 455 catch (Exception e) { … … 486 458 } 487 459 finally { 488 ServiceLocator.DisposeClientFacade(clientFacade);489 460 fetchJobSemaphore.Release(); 490 461 } … … 499 470 } 500 471 501 private void UpdateJobItem(JobDto jobDto) {502 JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id);503 jobItem.JobDto = jobDto;504 }505 506 472 public void AbortJob(Guid jobId) { 507 IClientFacade clientFacade = CreateClientFacade(); 508 Response response = clientFacade.AbortJob(jobId); 509 LogMessage(jobId, "Aborting Job: " + response.StatusMessage); 510 } 511 512 private SerializedJob CreateSerializedJob(IOptimizer optimizer) { 513 IJob job = new OptimizerJob() { 514 Optimizer = optimizer 515 }; 516 517 // serialize job 518 MemoryStream memStream = new MemoryStream(); 519 XmlGenerator.Serialize(job, memStream); 520 byte[] jobByteArray = memStream.ToArray(); 521 memStream.Dispose(); 522 523 // find out which which plugins are needed for the given object 524 List<HivePluginInfoDto> pluginsNeeded = ( 525 from p in GetDeclaringPlugins(job.GetType()) 526 select new HivePluginInfoDto() { 527 Name = p.Name, 528 Version = p.Version 529 }).ToList(); 530 531 JobDto jobDto = new JobDto() { 532 CoresNeeded = 1, // [chn] how to determine real cores needed? 533 PluginsNeeded = pluginsNeeded, 534 State = JobState.Offline, 535 MemoryNeeded = 0 536 }; 537 538 SerializedJob serializedJob = new SerializedJob() { 539 JobInfo = jobDto, 540 SerializedJobData = jobByteArray 541 }; 542 543 return serializedJob; 544 } 545 546 private JobItem GetJobItemById(Guid jobId) { 547 return jobItems.Single(x => x.JobDto.Id == jobId); 473 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { 474 Response response = service.Obj.AbortJob(jobId); 475 LogMessage(jobId, "Aborting Job: " + response.StatusMessage); 476 } 548 477 } 549 478 #endregion … … 553 482 this.stopResultsPollingPending = false; 554 483 this.IsPollingResults = true; 555 resultPollingThread = CreateResultPollingThread();484 resultPollingThread = new Thread(RunResultPollingThread); 556 485 if (resultPollingThread.ThreadState != System.Threading.ThreadState.Running) 557 486 resultPollingThread.Start(); … … 560 489 public void StopResultPolling() { 561 490 this.stopResultsPollingPending = true; 562 if (resultPollingThread != null && resultPollingThread.ThreadState == System.Threading.ThreadState.WaitSleepJoin) { 563 resultPollingThread.Interrupt(); 491 if (resultPollingThread != null) { 492 if (resultPollingThread.ThreadState == System.Threading.ThreadState.WaitSleepJoin) { 493 resultPollingThread.Interrupt(); 494 } 495 resultPollingThread.Join(); 564 496 } 565 497 this.stopResultsPollingPending = false; 566 498 } 567 499 568 private Thread CreateResultPollingThread() { 569 return new Thread(() => { 570 try { 571 do { 572 IClientFacade clientFacade = CreateStreamedClientFacade(); 573 IEnumerable<Guid> jobIdsToQuery = from job in JobItems 574 where job.State != JobState.Finished && 575 job.State != JobState.Failed 576 select job.JobDto.Id; 577 if (jobIdsToQuery.Count() > 0) { 578 LogMessage("Polling results for " + jobIdsToQuery.Count() + " jobs"); 500 private void RunResultPollingThread() { 501 try { 502 do { 503 if (RootJobItem.JobDto.State != JobState.Finished) { 504 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 505 LogMessage("Polling results"); 579 506 try { 580 ResponseObject<JobResultList> response = clientFacade.GetJobResults(jobIdsToQuery);507 ResponseObject<JobResultList> response = service.Obj.GetChildJobResults(RootJobItem.JobDto.Id, true, true); 581 508 if (response.StatusMessage == ResponseStatus.Ok) { 582 509 JobResultList jobItemList = response.Obj; … … 591 518 LogMessage("Polling results failed: " + e.Message); 592 519 } 593 finally {594 ServiceLocator.DisposeClientFacade(clientFacade);595 }596 Thread.Sleep(resultPollingIntervalMs);597 } else {598 if (sendingJobsFinished) {599 // all the jobs have been sent to hive, but non are to query any more (all finished or failed)600 this.stopResultsPollingPending = true;601 }602 520 } 603 } while (!this.stopResultsPollingPending); 604 } 605 catch (ThreadInterruptedException exception) { 606 // thread has been interuppted 607 } 608 catch (Exception e) { 609 LogMessage("Result Polling Thread failed badly: " + e.Message); 610 Logger.Error("Result Polling Thread failed badly: " + e.Message); 611 } 612 finally { 613 this.IsPollingResults = false; 614 } 615 }); 521 Thread.Sleep(resultPollingIntervalMs); 522 } else { 523 // all the jobs have been sent to hive, but non are to query any more (all finished or failed) 524 this.stopResultsPollingPending = true; 525 } 526 } while (!this.stopResultsPollingPending); 527 } 528 catch (ThreadInterruptedException) { 529 // thread has been interuppted 530 } 531 catch (Exception e) { 532 LogMessage("Result Polling Thread failed badly: " + e.Message); 533 } 534 finally { 535 this.IsPollingResults = false; 536 } 616 537 } 617 538 … … 620 541 #region Snapshots 621 542 622 private void UpdateSnapshot(ResponseObject<SerializedJob> response) { 623 JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id); 624 jobItem.LatestSnapshot = response; 625 } 626 627 public void RequestSnapshot(Guid jobId) { 628 Thread t = new Thread(() => { 629 IClientFacade clientFacade = null; 630 try { 631 clientFacade = CreateStreamedClientFacade(); 632 633 ResponseObject<SerializedJob> response; 634 int retryCount = 0; 635 636 Response snapShotResponse = clientFacade.RequestSnapshot(jobId); 637 if (snapShotResponse.StatusMessage == ResponseStatus.RequestSnapshot_JobIsNotBeeingCalculated) { 638 // job already finished 639 Logger.Debug("HiveExperiment: Abort - GetLastResult(false)"); 640 response = clientFacade.GetLastSerializedResult(jobId, false, false); 641 Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage); 642 } else { 643 // server sent snapshot request to client 644 // poll until snapshot is ready 645 do { 646 Thread.Sleep(snapshotPollingIntervalMs); 647 Logger.Debug("HiveExperiment: Abort - GetLastResult(true)"); 648 response = clientFacade.GetLastSerializedResult(jobId, false, true); 649 Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage); 650 retryCount++; 651 // loop while 652 // 1. problem with communication with server 653 // 2. job result not yet ready 654 } while ( 655 (retryCount < maxSnapshotRetries) && ( 656 response.StatusMessage == ResponseStatus.GetLastSerializedResult_JobResultNotYetThere) 657 ); 658 } 659 if (response.StatusMessage == ResponseStatus.Ok) { 660 LogMessage(jobId, "Snapshot polling successfull for job " + jobId); 661 UpdateSnapshot(response); 662 } else { 663 LogMessage(jobId, "Error: Polling of Snapshot failed for job " + jobId + ": " + response.StatusMessage); 664 } 665 } 666 catch (Exception e) { 667 LogMessage("RequestSnapshot Thread failed badly: " + e.Message); 668 Logger.Error("RequestSnapshot Thread failed badly: " + e.Message); 669 } 670 finally { 671 ServiceLocator.DisposeClientFacade(clientFacade); 672 } 673 }); 674 t.Start(); 675 } 676 677 void JobItem_SnapshotRequestedStateChanged(object sender, EventArgs e) { 678 JobItem jobItem = (JobItem)sender; 679 if (jobItem.SnapshotRequestedState == SnapshotRequestedState.Requested) { 680 RequestSnapshot(jobItem.JobDto.Id); 681 } 682 } 683 684 #endregion 685 686 #region Required Plugin Search 687 /// <summary> 688 /// Returns a list of plugins in which the type itself and all members 689 /// of the type are declared. Objectgraph is searched recursively. 690 /// </summary> 691 private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) { 692 HashSet<Type> types = new HashSet<Type>(); 693 FindTypes(type, types, "HeuristicLab."); 694 return GetDeclaringPlugins(types); 695 } 696 697 /// <summary> 698 /// Returns the plugins (including dependencies) in which the given types are declared 699 /// </summary> 700 private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) { 701 HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>(); 702 foreach (Type t in types) { 703 FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins); 704 } 705 return plugins; 706 } 707 708 /// <summary> 709 /// Finds the dependencies of the given plugin and adds it to the plugins hashset. 710 /// Also searches the dependencies recursively. 711 /// </summary> 712 private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) { 713 if (!plugins.Contains(plugin)) { 714 plugins.Add(plugin); 715 foreach (IPluginDescription dependency in plugin.Dependencies) { 716 FindDeclaringPlugins(dependency, plugins); 717 } 718 } 719 } 720 721 /// <summary> 722 /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart 723 /// Be aware that search is not performed on attributes 724 /// </summary> 725 /// <param name="type">the type to be searched</param> 726 /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param> 727 /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param> 728 private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) { 729 if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) { 730 types.Add(type); 731 732 // constructors 733 foreach (ConstructorInfo info in type.GetConstructors()) { 734 foreach (ParameterInfo paramInfo in info.GetParameters()) { 735 FindTypes(paramInfo.ParameterType, types, namespaceStart); 736 } 737 } 738 739 // interfaces 740 foreach (Type t in type.GetInterfaces()) { 741 FindTypes(t, types, namespaceStart); 742 } 743 744 // events 745 foreach (EventInfo info in type.GetEvents()) { 746 FindTypes(info.EventHandlerType, types, namespaceStart); 747 FindTypes(info.DeclaringType, types, namespaceStart); 748 } 749 750 // properties 751 foreach (PropertyInfo info in type.GetProperties()) { 752 FindTypes(info.PropertyType, types, namespaceStart); 753 } 754 755 // fields 756 foreach (FieldInfo info in type.GetFields()) { 757 FindTypes(info.FieldType, types, namespaceStart); 758 } 759 760 // methods 761 foreach (MethodInfo info in type.GetMethods()) { 762 foreach (ParameterInfo paramInfo in info.GetParameters()) { 763 FindTypes(paramInfo.ParameterType, types, namespaceStart); 764 } 765 FindTypes(info.ReturnType, types, namespaceStart); 766 } 767 } 768 } 543 //private void UpdateSnapshot(ResponseObject<SerializedJob> response) { 544 // JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id); 545 // jobItem.LatestSnapshot = response; 546 //} 547 548 //public void RequestSnapshot(Guid jobId) { 549 // Thread t = new Thread(() => { 550 // IClientFacade clientFacade = null; 551 // try { 552 // clientFacade = CreateStreamedClientFacade(); 553 554 // ResponseObject<SerializedJob> response; 555 // int retryCount = 0; 556 557 // Response snapShotResponse = clientFacade.RequestSnapshot(jobId); 558 // if (snapShotResponse.StatusMessage == ResponseStatus.RequestSnapshot_JobIsNotBeeingCalculated) { 559 // // job already finished 560 // Logger.Debug("HiveExperiment: Abort - GetLastResult(false)"); 561 // response = clientFacade.GetLastSerializedResult(jobId, false, false); 562 // Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage); 563 // } else { 564 // // server sent snapshot request to client 565 // // poll until snapshot is ready 566 // do { 567 // Thread.Sleep(snapshotPollingIntervalMs); 568 // Logger.Debug("HiveExperiment: Abort - GetLastResult(true)"); 569 // response = clientFacade.GetLastSerializedResult(jobId, false, true); 570 // Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage); 571 // retryCount++; 572 // // loop while 573 // // 1. problem with communication with server 574 // // 2. job result not yet ready 575 // } while ( 576 // (retryCount < maxSnapshotRetries) && ( 577 // response.StatusMessage == ResponseStatus.GetLastSerializedResult_JobResultNotYetThere) 578 // ); 579 // } 580 // if (response.StatusMessage == ResponseStatus.Ok) { 581 // LogMessage(jobId, "Snapshot polling successfull for job " + jobId); 582 // //UpdateSnapshot(response); 583 // } else { 584 // LogMessage(jobId, "Error: Polling of Snapshot failed for job " + jobId + ": " + response.StatusMessage); 585 // } 586 // } 587 // catch (Exception e) { 588 // LogMessage("RequestSnapshot Thread failed badly: " + e.Message); 589 // Logger.Error("RequestSnapshot Thread failed badly: " + e.Message); 590 // } 591 // finally { 592 // ServiceLocator.DisposeClientFacade(clientFacade); 593 // } 594 // }); 595 // t.Start(); 596 //} 597 598 //void JobItem_SnapshotRequestedStateChanged(object sender, EventArgs e) { 599 // JobItem jobItem = (JobItem)sender; 600 // if (jobItem.SnapshotRequestedState == SnapshotRequestedState.Requested) { 601 // RequestSnapshot(jobItem.JobDto.Id); 602 // } 603 //} 604 769 605 #endregion 770 606 … … 827 663 LogMessage("Experiment changed"); 828 664 EventHandler handler = ExperimentChanged; 665 if (handler != null) handler(this, EventArgs.Empty); 666 } 667 668 public event EventHandler RootJobChanged; 669 private void OnRootJobChanged() { 670 EventHandler handler = RootJobChanged; 671 if (handler != null) handler(this, EventArgs.Empty); 672 } 673 674 public event EventHandler RootJobItemChanged; 675 private void OnRootJobItemChanged() { 676 EventHandler handler = RootJobItemChanged; 829 677 if (handler != null) handler(this, EventArgs.Empty); 830 678 } … … 843 691 } 844 692 845 private void RegisterJobItemListEvents() { 846 jobItems.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset); 847 jobItems.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded); 848 jobItems.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved); 849 jobItems.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced); 850 foreach (JobItem jobItem in jobItems) { 851 RegisterJobItemEvents(jobItem); 852 } 853 } 854 855 private void DeregisterJobItemListEvents() { 856 jobItems.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset); 857 jobItems.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded); 858 jobItems.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved); 859 jobItems.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced); 860 foreach (JobItem jobItem in jobItems) { 861 DeregisterJobItemEvents(jobItem); 862 } 863 } 864 865 void jobItems_ItemsReplaced(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) { 866 UpdateJobItemEvents(e); 867 } 868 869 private void UpdateJobItemEvents(CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) { 870 if (e.OldItems != null) { 871 foreach (var item in e.OldItems) { 872 DeregisterJobItemEvents(item.Value); 873 } 874 } 875 if (e.Items != null) { 876 foreach (var item in e.Items) { 877 RegisterJobItemEvents(item.Value); 878 } 879 } 880 } 881 882 private void RegisterJobItemEvents(JobItem jobItem) { 883 jobItem.SnapshotRequestedStateChanged += new EventHandler(JobItem_SnapshotRequestedStateChanged); 884 jobItem.JobStateChanged += new EventHandler(JobItem_JobStateChanged); 885 } 886 887 private void DeregisterJobItemEvents(JobItem jobItem) { 888 jobItem.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged); 889 jobItem.JobStateChanged -= new EventHandler(JobItem_JobStateChanged); 890 } 891 892 void jobItems_ItemsRemoved(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) { 893 UpdateJobItemEvents(e); 894 } 895 896 void jobItems_ItemsAdded(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) { 897 UpdateJobItemEvents(e); 898 } 899 900 void jobItems_CollectionReset(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) { 901 foreach (var item in e.OldItems) { 902 item.Value.JobStateChanged -= new EventHandler(JobItem_JobStateChanged); 903 item.Value.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged); 904 } 693 void HiveExperiment_RootJobChanged(object sender, EventArgs e) { 694 this.Experiment = (Optimization.Experiment)((ExperimentJob)rootJobItem.Job).Optimizer; 695 } 696 697 void rootJobItem_FinalResultAvailable(object sender, EventArgs e) { 698 this.Experiment = (Optimization.Experiment)((ExperimentJob)rootJobItem.Job).Optimizer; 905 699 } 906 700 #endregion 907 908 #region Helper Functions 909 private IClientFacade CreateClientFacade() { 910 IClientFacade clientFacade = null; 911 do { 912 try { 913 //clientFacade = ServiceLocator.CreateClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientServiceName)); 914 clientFacade = ServiceLocator.CreateClientFacade(Settings.Default.HiveServerIp); 915 916 } 917 catch (EndpointNotFoundException exception) { 918 LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec."); 919 Thread.Sleep(resultPollingIntervalMs); 920 } 921 } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped); 922 return clientFacade; 923 } 924 925 private IClientFacade CreateStreamedClientFacade() { 926 IClientFacade clientFacade = null; 927 do { 928 try { 929 //clientFacade = ServiceLocator.CreateStreamedClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientStreamedServiceName)); 930 clientFacade = ServiceLocator.CreateStreamedClientFacade(Settings.Default.HiveServerIp); 931 } 932 catch (EndpointNotFoundException exception) { 933 LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec."); 934 Thread.Sleep(resultPollingIntervalMs); 935 } 936 } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped); 937 return clientFacade; 938 } 939 940 private void AddRange(IDictionary<IOptimizer, IOptimizer> optimizers, IDictionary<IOptimizer, IOptimizer> childs) { 941 foreach (KeyValuePair<IOptimizer, IOptimizer> kvp in childs) { 942 optimizers.Add(kvp); 943 } 944 } 945 946 #endregion 947 701 948 702 #region Logging 949 703 private void LogMessage(string message) { … … 951 705 lock (locker) { 952 706 log.LogMessage(message); 707 Logger.Debug(message); 953 708 } 954 709 } 955 710 956 711 private void LogMessage(Guid jobId, string message) { 957 GetJobItemById(jobId).LogMessage(message);712 //GetJobItemById(jobId).LogMessage(message); 958 713 LogMessage(message + " (jobId: " + jobId + ")"); 959 714 } 960 715 961 716 #endregion 717 718 /// <summary> 719 /// Downloads the root job from hive and sets the experiment, rootJob and rootJobItem 720 /// </summary> 721 public void LoadExperimentFromHive() { 722 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 723 ResponseObject<SerializedJob> response = service.Obj.GetLastSerializedResult(RootJobItem.JobDto.Id); 724 IJob job = XmlParser.Deserialize<IJob>(new MemoryStream(response.Obj.SerializedJobData)); 725 RootJob = job; 726 RootJobItem.JobDto = response.Obj.JobInfo; 727 this.Experiment = (Optimization.Experiment)((OptimizerJob)job).Optimizer; 728 if (rootJobItem.JobDto.State == JobState.Finished) { 729 // set execution time and finish up 730 } else if (rootJobItem.JobDto.State != JobState.Aborted && rootJobItem.JobDto.State != JobState.Failed) { 731 this.ExecutionState = Core.ExecutionState.Started; 732 } 733 } 734 } 962 735 } 963 736 } -
branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/JobItem.cs
r4264 r4368 12 12 using HeuristicLab.Optimization; 13 13 using HeuristicLab.Hive.Contracts.ResponseObjects; 14 using HeuristicLab.Hive.JobBase; 15 using System.IO; 16 using HeuristicLab.Persistence.Default.Xml; 17 using HeuristicLab.PluginInfrastructure; 18 using System.Reflection; 19 using HeuristicLab.Hive.Experiment.Jobs; 14 20 15 21 namespace HeuristicLab.Hive.Experiment { … … 22 28 public override Image ItemImage { 23 29 get { 24 if ( State == JobState.Offline) return HeuristicLab.Common.Resources.VS2008ImageLibrary.ExecutablePrepared;25 else if ( State == JobState.Calculating) return HeuristicLab.Common.Resources.VS2008ImageLibrary.ExecutableStarted;26 else if ( State == JobState.Aborted) return HeuristicLab.Common.Resources.VS2008ImageLibrary.ExecutableStopped;27 else if ( State == JobState.Failed) return HeuristicLab.Common.Resources.VS2008ImageLibrary.Error;28 else if ( State == JobState.Finished) return HeuristicLab.Common.Resources.VS2008ImageLibrary.ExecutableStopped;30 if (jobDto.State == JobState.Offline) return HeuristicLab.Common.Resources.VS2008ImageLibrary.ExecutablePrepared; 31 else if (jobDto.State == JobState.Calculating) return HeuristicLab.Common.Resources.VS2008ImageLibrary.ExecutableStarted; 32 else if (jobDto.State == JobState.Aborted) return HeuristicLab.Common.Resources.VS2008ImageLibrary.ExecutableStopped; 33 else if (jobDto.State == JobState.Failed) return HeuristicLab.Common.Resources.VS2008ImageLibrary.Error; 34 else if (jobDto.State == JobState.Finished) return HeuristicLab.Common.Resources.VS2008ImageLibrary.ExecutableStopped; 29 35 else return HeuristicLab.Common.Resources.VS2008ImageLibrary.Event; 30 36 } … … 33 39 [Storable] 34 40 private JobDto jobDto; 35 /// <summary>36 /// Some static information about the job. Don't use State-Information out of there37 /// </summary>38 41 public JobDto JobDto { 39 42 get { return jobDto; } … … 49 52 50 53 [Storable] 51 private JobResult jobResult;52 public JobResult JobResult{53 private get { return jobResult; }54 private IJob job; 55 public IJob Job { 56 get { return job; } 54 57 set { 55 if (jobResult != value) { 56 jobResult = value; 57 OnJobStateChanged(); 58 OnToStringChanged(); 59 OnItemImageChanged(); 60 } 61 } 62 } 63 64 public JobState State { 65 get { return jobResult != null ? JobResult.State : JobDto.State; } 66 } 67 68 public double? Percentage { 69 get { return jobResult != null ? JobResult.Percentage : JobDto.Percentage; } 70 } 71 72 public string Exception { 73 get { return jobResult != null ? JobResult.Exception : JobDto.Exception; } 74 } 75 76 public DateTime? DateCalculated { 77 get { return jobResult != null ? JobResult.DateCalculated : JobDto.DateCalculated; } 78 } 79 80 public DateTime? DateFinished { 81 get { return jobResult != null ? JobResult.DateFinished : JobDto.DateFinished; } 82 } 83 84 [Storable] 85 private ResponseObject<SerializedJob> latestSnapshot; 86 public ResponseObject<SerializedJob> LatestSnapshot { 87 get { return latestSnapshot; } 58 if (job != value) { 59 job = value; 60 IsFinalResultAvailable = job.ExecutionState == ExecutionState.Stopped; 61 OnJobChanged(); 62 } 63 } 64 } 65 66 [Storable] 67 private ICollection<JobItem> childJobItems; 68 public IEnumerable<JobItem> ChildJobItems { 69 get { return childJobItems; } 70 } 71 72 [Storable] 73 private bool isFinalResultAvailable; 74 public bool IsFinalResultAvailable { 75 get { return isFinalResultAvailable; } 88 76 set { 89 if (latestSnapshot != value) { 90 latestSnapshot = value; 91 if (value != null) { 92 latestSnapshotTime = DateTime.Now; 93 } 94 SnapshotRequestedState = Experiment.SnapshotRequestedState.Idle; 95 OnLatestSnapshotChanged(); 96 } 97 } 98 } 99 100 [Storable] 101 private DateTime latestSnapshotTime; 102 public DateTime LatestSnapshotTime { 103 get { return latestSnapshotTime; } 104 } 105 106 [Storable] 107 private ILog log; 108 public ILog Log { 109 get { return log; } 110 } 111 112 [Storable] 113 private IOptimizer optimizer; 114 public IOptimizer Optimizer { 115 get { return optimizer; } 116 set { this.optimizer = value; } 117 } 118 119 [Storable] 120 private SnapshotRequestedState snapshotRequestedState; 121 public SnapshotRequestedState SnapshotRequestedState { 122 get { return snapshotRequestedState; } 123 private set { 124 this.snapshotRequestedState = value; 125 OnSnapshotRequestedStateChanged(); 126 } 127 } 128 77 if (isFinalResultAvailable != value) { 78 isFinalResultAvailable = value; 79 if (isFinalResultAvailable) 80 OnFinalResultAvailable(); 81 } 82 } 83 } 84 129 85 public JobItem() { 130 log = new Log(); 86 childJobItems = new List<JobItem>(); 87 } 88 89 public JobItem(JobResult jobResult) : this() { 90 this.jobDto = new JobDto(); 91 UpdateJob(jobResult); 131 92 } 132 93 133 94 public override string ToString() { 134 95 if (jobDto != null) { 135 return optimizer.Name;96 return job.Name; 136 97 } else { 137 98 return base.ToString(); … … 139 100 } 140 101 141 public event EventHandler LatestSnapshotChanged; 142 private void OnLatestSnapshotChanged() { 143 LogMessage("LatestSnapshotChanged"); 144 EventHandler handler = LatestSnapshotChanged; 145 if (handler != null) handler(this, EventArgs.Empty); 146 } 147 102 public void UpdateJob(JobResult jobResult) { 103 if (jobResult != null) { 104 jobDto.Id = jobResult.Id; 105 jobDto.DateCreated = jobResult.DateCreated; 106 jobDto.DateCalculated = jobResult.DateCalculated; 107 jobDto.DateFinished = jobResult.DateFinished; 108 jobDto.Exception = jobResult.Exception; 109 jobDto.Id = jobResult.Id; 110 jobDto.Percentage = jobResult.Percentage; 111 jobDto.State = jobResult.State; 112 // what about parentJob 113 OnJobStateChanged(); 114 OnToStringChanged(); 115 OnItemImageChanged(); 116 } 117 } 118 119 public void AddChildJob(JobItem child) { 120 this.childJobItems.Add(child); 121 OnChildJobAdded(); 122 } 123 124 //public IEnumerable<JobItem> GetChildJobItems() { 125 // if (childJobItems == null) 126 // childJobItems = CreateChildJobItems(); 127 // return childJobItems; 128 //} 129 130 //protected IEnumerable<JobItem> CreateChildJobItems() { 131 // return (from j in Job.ChildJobs 132 // select new JobItem() { 133 // Job = j, 134 // JobDto = new JobDto() { 135 // State = JobState.Offline 136 // } 137 // }).ToList(); 138 //} 139 140 public SerializedJob ToSerializedJob() { 141 MemoryStream memStream = new MemoryStream(); 142 XmlGenerator.Serialize(job, memStream); 143 byte[] jobByteArray = memStream.ToArray(); 144 memStream.Dispose(); 145 146 UpdateRequiredPlugins(); 147 148 this.JobDto.CoresNeeded = job.CoresNeeded; 149 this.JobDto.MemoryNeeded = job.MemoryNeeded; 150 151 SerializedJob serializedJob = new SerializedJob() { 152 JobInfo = jobDto, 153 SerializedJobData = jobByteArray 154 }; 155 156 return serializedJob; 157 } 158 159 private void UpdateRequiredPlugins() { 160 // find out which which plugins are needed for the given object 161 this.JobDto.PluginsNeeded = HivePluginInfoDto.FindPluginsNeeded(job.GetType()); 162 } 163 148 164 public event EventHandler JobDtoChanged; 149 165 private void OnJobDtoChanged() { … … 155 171 public event EventHandler JobStateChanged; 156 172 private void OnJobStateChanged() { 157 LogMessage("JobStateChanged (State: " + this. State + ", Percentage: " + this.Percentage + ")");173 LogMessage("JobStateChanged (State: " + this.JobDto.State + ", Percentage: " + this.JobDto.Percentage + ")"); 158 174 EventHandler handler = JobStateChanged; 175 if (handler != null) handler(this, EventArgs.Empty); 176 } 177 178 public event EventHandler ChildJobAdded; 179 private void OnChildJobAdded() { 180 var handler = ChildJobAdded; 181 if (handler != null) handler(this, EventArgs.Empty); 182 } 183 184 public event EventHandler JobChanged; 185 private void OnJobChanged() { 186 var handler = JobChanged; 187 if (handler != null) handler(this, EventArgs.Empty); 188 } 189 190 public event EventHandler FinalResultAvailable; 191 private void OnFinalResultAvailable() { 192 var handler = FinalResultAvailable; 159 193 if (handler != null) handler(this, EventArgs.Empty); 160 194 } … … 162 196 public void LogMessage(string message) { 163 197 lock (locker) { 164 log.LogMessage(message); 198 if (job != null) { 199 job.Log.LogMessage(message); 200 } 165 201 } 166 202 } … … 169 205 LogMessage("I am beeing cloned"); 170 206 JobItem clone = (JobItem)base.Clone(cloner); 171 clone.latestSnapshotTime = this.latestSnapshotTime;172 207 clone.jobDto = (JobDto)cloner.Clone(this.jobDto); 173 clone.latestSnapshot = (ResponseObject<SerializedJob>)cloner.Clone(this.latestSnapshot); 174 clone.log = (ILog)cloner.Clone(this.log); 175 clone.optimizer = (IOptimizer)cloner.Clone(this.optimizer); 176 clone.jobResult = (JobResult)cloner.Clone(this.jobResult); 208 clone.job = (IJob)cloner.Clone(this.job); 177 209 return clone; 178 210 } 179 211 180 181 public event EventHandler SnapshotRequestedStateChanged; 182 private void OnSnapshotRequestedStateChanged() { 183 EventHandler handler = SnapshotRequestedStateChanged; 184 if (handler != null) handler(this, EventArgs.Empty); 185 } 186 187 public void RequestSnapshot() { 188 this.SnapshotRequestedState = Experiment.SnapshotRequestedState.Requested; 189 } 190 191 212 public void UpdateFinalResults(IJob j) { 213 if (this.Job is OptimizerJob && j is OptimizerJob) { 214 ((OptimizerJob)this.Job).Optimizer.Runs.AddRange(((OptimizerJob)j).Optimizer.Runs); 215 IsFinalResultAvailable = true; //job.ExecutionState == ExecutionState.Stopped; //?? 216 } else { 217 throw new NotSupportedException(); 218 } 219 } 220 221 internal bool ReplaceChildJob(IJob job) { 222 var matches = this.childJobItems.Where(child => child.Job.InternalId == job.InternalId); 223 if (matches.Count() > 0) { 224 matches.First().job = job; 225 return true; 226 } 227 return false; 228 } 192 229 } 193 230 } -
branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/ServiceLocator.cs
r4337 r4368 28 28 using HeuristicLab.Hive.Contracts; 29 29 using HeuristicLab.Hive.Experiment.Properties; 30 using HeuristicLab.Hive.Contracts.BusinessObjects; 31 using HeuristicLab.Hive.JobBase; 30 32 31 33 namespace HeuristicLab.Hive.Experiment { 32 34 internal class ServiceLocator { 33 internal static IClientFacade CreateClientFacade(string hostAddress) {34 ChannelFactory<IClientFacade> factory = new ChannelFactory<IClientFacade>("ClientHttpEndpoint");35 WcfSettings.SetEndpointAddress(factory.Endpoint, hostAddress);35 private static ServiceLocator instance = null; 36 private WcfServicePool<IClientFacade> clientFacadePool = null; 37 private WcfServicePool<IClientFacade> streamedClientFacadePool = null; 36 38 37 factory.Credentials.UserName.UserName = Settings.Default.HiveUsername; 38 factory.Credentials.UserName.Password = Settings.Default.HivePassword; 39 40 IClientFacade client = factory.CreateChannel(); 41 return client; 39 internal static ServiceLocator Instance { 40 get { 41 if (instance == null) { 42 instance = new ServiceLocator(); 43 } 44 return instance; 45 } 42 46 } 43 47 44 internal static IClientFacade CreateStreamedClientFacade(string hostAddress) { 45 ChannelFactory<IClientFacade> factory = new ChannelFactory<IClientFacade>("ClientTcpStreamedEndpoint"); 46 WcfSettings.SetEndpointAddress(factory.Endpoint, hostAddress); 47 48 factory.Credentials.UserName.UserName = Settings.Default.HiveUsername; 49 factory.Credentials.UserName.Password = Settings.Default.HivePassword; 50 51 IClientFacade client = factory.CreateChannel(); 52 return client; 48 internal WcfServicePool<IClientFacade> ClientFacadePool { 49 get { 50 if (clientFacadePool == null) { 51 clientFacadePool = new WcfServicePool<IClientFacade>(Settings.Default.HiveServerIp, Settings.Default.HiveUsername, Settings.Default.HivePassword, "ClientHttpEndpoint"); 52 } 53 return clientFacadePool; 54 } 53 55 } 54 56 55 public static void DisposeClientFacade(IClientFacade clientFacade) { 56 WcfSettings.DisposeWcfClient((ICommunicationObject)clientFacade); 57 internal WcfServicePool<IClientFacade> StreamedClientFacadePool { 58 get { 59 if (streamedClientFacadePool == null) { 60 streamedClientFacadePool = new WcfServicePool<IClientFacade>(Settings.Default.HiveServerIp, Settings.Default.HiveUsername, Settings.Default.HivePassword, "ClientTcpStreamedEndpoint"); 61 } 62 return streamedClientFacadePool; 63 } 57 64 } 58 65 }
Note: See TracChangeset
for help on using the changeset viewer.