Changeset 6168 for branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs
- Timestamp:
- 05/09/11 14:12:10 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs
r6112 r6168 24 24 using System.Diagnostics; 25 25 using System.IO; 26 using System.Linq; 26 27 using System.ServiceModel; 27 28 using System.Threading; … … 46 47 private Dictionary<Guid, Executor> executors = new Dictionary<Guid, Executor>(); 47 48 private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>(); 48 private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();49 49 50 50 private WcfService wcfService; … … 57 57 public Dictionary<Guid, Executor> Executors { 58 58 get { return executors; } 59 }60 61 internal Dictionary<Guid, Job> Jobs {62 get { return jobs; }63 59 } 64 60 … … 163 159 Job job = wcfService.GetJob(jobId); 164 160 if (job == null) throw new JobNotFoundException(jobId); 165 lock (executors) {166 if (!jobs.ContainsKey(job.Id)) {167 jobs.Add(job.Id, job);168 }169 }170 161 JobData jobData = wcfService.GetJobData(job.Id); 171 162 if (jobData == null) throw new JobDataNotFoundException(jobId); … … 220 211 221 212 private void DoPauseJob(Guid jobId) { 222 if (! Jobs.ContainsKey(jobId)) {213 if (!executors.ContainsKey(jobId)) { 223 214 clientCom.LogMessage("DoPauseJob: Can't find job with id " + jobId); 224 215 } else { 225 Job job = Jobs[jobId];226 227 if (job != null ) {216 Job job = wcfService.GetJob(jobId); 217 218 if (job != null && executors.ContainsKey(job.Id)) { 228 219 executors[job.Id].Pause(); 229 220 JobData sJob = executors[job.Id].GetPausedJob(); … … 251 242 252 243 private void DoStopJob(Guid jobId) { 253 if (! Jobs.ContainsKey(jobId)) {244 if (!executors.ContainsKey(jobId)) { 254 245 clientCom.LogMessage("DoStopJob: Can't find job with id " + jobId); 255 246 } else { 256 Job job = Jobs[jobId];247 Job job = wcfService.GetJob(jobId); 257 248 258 249 if (job != null) { … … 260 251 JobData sJob = executors[job.Id].GetFinishedJob(); 261 252 job.ExecutionTime = executors[job.Id].ExecutionTime; 262 263 253 264 254 try { … … 268 258 SlaveStatusInfo.JobsAborted++; 269 259 270 clientCom.LogMessage("Sending the stopp ped job with id: " + job.Id);260 clientCom.LogMessage("Sending the stopped job with id: " + job.Id); 271 261 wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Aborted); 272 262 } … … 285 275 /// </summary> 286 276 private void DoAbortAll() { 287 List<Guid> guids = new List<Guid>(); 288 foreach (Guid job in Jobs.Keys) { 289 guids.Add(job); 290 } 291 292 foreach (Guid g in guids) { 293 KillAppDomain(g); 294 } 295 277 List<Guid> jobIds; 278 lock (executors) { 279 jobIds = new List<Guid>(executors.Keys); 280 } 281 foreach (Guid jobId in jobIds) { 282 KillAppDomain(jobId); 283 } 296 284 clientCom.LogMessage("Aborted all jobs!"); 297 285 } … … 304 292 305 293 //copy guids because there will be removed items from 'Jobs' 306 List<Guid> guids = new List<Guid>();307 foreach (Guid job in Jobs.Keys) {308 guids.Add(job);309 } 310 311 foreach (Guid g in guids) {312 DoPauseJob( g);294 List<Guid> jobIds; 295 lock (executors) { 296 jobIds = new List<Guid>(Executors.Keys); 297 } 298 299 foreach (Guid jobId in jobIds) { 300 DoPauseJob(jobId); 313 301 } 314 302 } … … 321 309 322 310 //copy guids because there will be removed items from 'Jobs' 323 List<Guid> guids = new List<Guid>();324 foreach (Guid job in Jobs.Keys) {325 guids.Add(job);326 } 327 328 foreach (Guid g in guids) {329 DoStopJob( g);311 List<Guid> jobIds; 312 lock (executors) { 313 jobIds = new List<Guid>(executors.Keys); 314 } 315 316 foreach (Guid jobId in jobIds) { 317 DoStopJob(jobId); 330 318 } 331 319 } … … 395 383 /// </summary> 396 384 public void PauseWaitJob(JobData data) { 397 if (! Jobs.ContainsKey(data.JobId)) {385 if (!Executors.ContainsKey(data.JobId)) { 398 386 clientCom.LogMessage("Can't find job with id " + data.JobId); 399 387 } else { 400 Job job = Jobs[data.JobId];388 Job job = wcfService.GetJob(data.JobId); 401 389 wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused); 402 390 wcfService.UpdateJobState(job.Id, JobState.Waiting, null); … … 416 404 return; 417 405 } 418 if (! jobs.ContainsKey(jobId)) {406 if (!executors.ContainsKey(jobId)) { 419 407 clientCom.LogMessage("Job doesn't exist"); 420 408 return; 421 409 } 422 Job cJob = jobs[jobId];423 cJob.ExecutionTime = executors[jobId].ExecutionTime;410 Job job = wcfService.GetJob(jobId); 411 job.ExecutionTime = executors[jobId].ExecutionTime; 424 412 425 413 if (executors[jobId].Aborted) { … … 436 424 try { 437 425 clientCom.LogMessage("Sending the finished job with id: " + jobId); 438 wcfService.UpdateJobData( cJob, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);426 wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished); 439 427 } 440 428 catch (Exception e) { … … 456 444 /// A new Job from the wcfService has been received and will be started within a AppDomain. 457 445 /// </summary> 458 private void StartJobInAppDomain(Job myJob, JobData jobData) {459 clientCom.LogMessage("Received new job with id " + myJob.Id);446 private void StartJobInAppDomain(Job job, JobData jobData) { 447 clientCom.LogMessage("Received new job with id " + job.Id); 460 448 clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole()); 461 449 462 450 lock (startInAppDomainLocker) { 463 if (executors.ContainsKey( myJob.Id)) {464 clientCom.LogMessage("Job with key " + myJob.Id + " already exists. Job will be ignored.");451 if (executors.ContainsKey(job.Id)) { 452 clientCom.LogMessage("Job with key " + job.Id + " already exists. Job will be ignored."); 465 453 return; 466 454 } 467 455 468 String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());456 String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, job.Id.ToString()); 469 457 bool pluginsPrepared = false; 470 458 string configFileName = string.Empty; 471 459 472 460 try { 473 PluginCache.Instance.PreparePlugins( myJob, out configFileName);474 clientCom.LogMessage("Plugins fetched for job " + myJob.Id);461 PluginCache.Instance.PreparePlugins(job, out configFileName); 462 clientCom.LogMessage("Plugins fetched for job " + job.Id); 475 463 pluginsPrepared = true; 476 464 } 477 465 catch (Exception exception) { 478 clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));479 wcfService.UpdateJobState( myJob.Id, JobState.Failed, exception.ToString());466 clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", job.Id, exception)); 467 wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString()); 480 468 SlaveStatusInfo.JobsAborted++; 481 lock (executors) {482 if (jobs.ContainsKey(myJob.Id)) {483 jobs.Remove(myJob.Id);484 }485 }486 469 } 487 470 488 471 if (pluginsPrepared) { 489 472 try { 490 AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox( myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));473 AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(job.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName)); 491 474 appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException); 492 475 Executor executor; 493 appDomains.Add( myJob.Id, appDomain);476 appDomains.Add(job.Id, appDomain); 494 477 clientCom.LogMessage("Creating AppDomain"); 495 478 executor = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName); 496 479 clientCom.LogMessage("Created AppDomain"); 497 executor.JobId = myJob.Id;498 480 executor.Core = this; 499 clientCom.LogMessage("Starting Executor for job " + myJob.Id); 481 executor.JobId = job.Id; 482 executor.CoresNeeded = job.CoresNeeded; 483 executor.MemoryNeeded = job.MemoryNeeded; 484 clientCom.LogMessage("Starting Executor for job " + job.Id); 500 485 executor.Start(jobData.Data); 501 502 486 lock (executors) { 503 executors.Add( myJob.Id, executor);487 executors.Add(job.Id, executor); 504 488 } 505 489 } 506 490 catch (Exception exception) { 507 clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);491 clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + job.Id); 508 492 clientCom.LogMessage("Error thrown is: " + exception.ToString()); 509 493 510 if (executors.ContainsKey( myJob.Id) && executors[myJob.Id].CurrentException != string.Empty) {511 wcfService.UpdateJobState( myJob.Id, JobState.Failed, executors[myJob.Id].CurrentException);494 if (executors.ContainsKey(job.Id) && executors[job.Id].CurrentException != string.Empty) { 495 wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException); 512 496 } else { 513 wcfService.UpdateJobState( myJob.Id, JobState.Failed, exception.ToString());497 wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString()); 514 498 } 515 499 SlaveStatusInfo.JobsAborted++; 516 500 517 KillAppDomain( myJob.Id);501 KillAppDomain(job.Id); 518 502 } 519 503 } … … 587 571 } 588 572 589 jobs.Remove(id);590 573 PluginCache.Instance.DeletePluginsForJob(id); 591 574 GC.Collect(); … … 602 585 return null; // avoid destruction of proxy object after 5 minutes 603 586 } 587 588 public int GetCoresNeeded() { 589 lock (executors) { 590 return executors.Sum(x => x.Value.CoresNeeded); 591 } 592 } 604 593 } 605 594 }
Note: See TracChangeset
for help on using the changeset viewer.