Changeset 6203 for branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs
- Timestamp:
- 05/16/11 21:19:34 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs
r6202 r6203 23 23 using System.Collections.Generic; 24 24 using System.Diagnostics; 25 using System.IO;26 using System.Linq;27 25 using System.ServiceModel; 28 26 using System.Threading; … … 45 43 public static ILog Log { get; set; } 46 44 47 private Dictionary<Guid, Executor> executors = new Dictionary<Guid, Executor>(); 48 private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>(); 49 50 // signalizes if the Executor.Start method has properly finished. only then the appdomain may be unloaded 51 private Dictionary<Guid, Semaphore> semaphores = new Dictionary<Guid, Semaphore>(); 45 private Dictionary<Guid, SlaveJob> slaveJobs = new Dictionary<Guid, SlaveJob>(); 52 46 53 47 private WcfService wcfService; 54 private HeartbeatManager heartbeatManager;55 p rivate int coreThreadId;48 private static HeartbeatManager heartbeatManager; 49 public static HeartbeatManager HBManager { get { return heartbeatManager; } } 56 50 57 51 private ISlaveCommunication clientCom; 58 52 private ServiceHost slaveComm; 59 53 60 public Dictionary<Guid, Executor> Executors {61 get { return executors; }54 public Dictionary<Guid, SlaveJob> SlaveJobs { 55 get { return slaveJobs; } 62 56 } 63 57 … … 68 62 /// </summary> 69 63 public void Start() { 70 coreThreadId = Thread.CurrentThread.ManagedThreadId;71 64 abortRequested = false; 72 65 … … 160 153 Task.Factory.StartNew((jobIdObj) => { 161 154 Guid jobId = (Guid)jobIdObj; 162 Job job = wcfService.GetJob(jobId); 163 if (job == null) throw new JobNotFoundException(jobId); 164 JobData jobData = wcfService.GetJobData(job.Id); 165 if (jobData == null) throw new JobDataNotFoundException(jobId); 166 SlaveStatusInfo.JobsFetched++; 167 job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null); 168 if (job == null) throw new JobNotFoundException(jobId); 169 StartJobInAppDomain(job, jobData); 155 SlaveJob newJob = new SlaveJob(this); 156 bool start = true; 157 158 lock (slaveJobs) { 159 if (slaveJobs.ContainsKey(jobId)) { 160 start = false; 161 clientCom.LogMessage(string.Format("Job with id {0} already exists. Start aborted.", jobId)); 162 } else { 163 slaveJobs.Add(jobId, newJob); 164 } 165 } 166 167 if (start) { 168 newJob.CalculateJob(jobId); 169 } 170 170 }, container.JobId) 171 171 .ContinueWith((t) => { … … 173 173 clientCom.LogMessage(t.Exception.ToString()); 174 174 wcfService.UpdateJobState(container.JobId, JobState.Failed, t.Exception.ToString()); 175 SlaveStatusInfo. JobsAborted++;175 SlaveStatusInfo.IncrementJobsFailed(); 176 176 }, TaskContinuationOptions.OnlyOnFaulted); 177 177 break; … … 189 189 break; 190 190 case MessageContainer.MessageType.AbortJob: 191 SlaveStatusInfo.JobsAborted++; 192 KillAppDomain(container.JobId); 191 SlaveStatusInfo.IncrementJobsAborted(); //TODO: move to a sane place 192 193 Task.Factory.StartNew((jobIdObj) => { 194 Guid jobId = (Guid)jobIdObj; 195 bool abort = true; 196 SlaveJob sj = null; 197 198 lock (slaveJobs) { 199 if (!slaveJobs.ContainsKey(jobId)) { 200 clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Abort aborted.", jobId)); 201 abort = false; 202 } else { 203 sj = slaveJobs[jobId]; 204 } 205 } 206 if (abort && !sj.Finished) { 207 sj.KillAppDomain(); 208 } 209 }, container.JobId) 210 .ContinueWith((t) => { 211 // handle exception of task 212 clientCom.LogMessage(t.Exception.ToString()); 213 }, TaskContinuationOptions.OnlyOnFaulted); 193 214 break; 194 215 case MessageContainer.MessageType.StopJob: 195 DoStopJob(container.JobId); 216 Task.Factory.StartNew((jobIdObj) => { 217 Guid jobId = (Guid)jobIdObj; 218 bool stop = true; 219 SlaveJob sj = null; 220 221 lock (slaveJobs) { 222 if (!slaveJobs.ContainsKey(jobId)) { 223 clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Stop aborted.", jobId)); 224 stop = false; 225 } else { 226 sj = slaveJobs[jobId]; 227 } 228 } 229 if (stop && !sj.Finished) { 230 sj.StopJob(); 231 } 232 }, container.JobId) 233 .ContinueWith((t) => { 234 // handle exception of task 235 clientCom.LogMessage(t.Exception.ToString()); 236 }, TaskContinuationOptions.OnlyOnFaulted); 196 237 break; 197 238 case MessageContainer.MessageType.PauseJob: 198 DoPauseJob(container.JobId); 239 Task.Factory.StartNew((jobIdObj) => { 240 Guid jobId = (Guid)jobIdObj; 241 bool pause = true; 242 SlaveJob sj = null; 243 244 lock (slaveJobs) { 245 if (!slaveJobs.ContainsKey(jobId)) { 246 clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Pause aborted.", jobId)); 247 pause = false; 248 } else { 249 sj = slaveJobs[jobId]; 250 } 251 } 252 if (pause && !sj.Finished) { 253 sj.PauseJob(); 254 } 255 }, container.JobId) 256 .ContinueWith((t) => { 257 // handle exception of task 258 clientCom.LogMessage(t.Exception.ToString()); 259 }, TaskContinuationOptions.OnlyOnFaulted); 199 260 break; 200 261 case MessageContainer.MessageType.Restart: … … 213 274 } 214 275 215 private void DoPauseJob(Guid jobId) { 216 if (!executors.ContainsKey(jobId)) { 217 clientCom.LogMessage("DoPauseJob: Can't find job with id " + jobId); 218 } else { 219 Job job = wcfService.GetJob(jobId); 220 221 if (job != null && executors.ContainsKey(job.Id)) { 222 executors[job.Id].Pause(); 223 JobData sJob = executors[job.Id].GetPausedJob(); 224 job.ExecutionTime = executors[job.Id].ExecutionTime; 225 226 try { 227 if (executors[job.Id].CurrentException != string.Empty) { 228 wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException); 229 SlaveStatusInfo.JobsAborted++; 230 } else { 231 SlaveStatusInfo.JobsProcessed++; 232 } 233 clientCom.LogMessage("Sending the paused job with id: " + job.Id); 234 wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused); 276 /// <summary> 277 /// aborts all running jobs, no results are sent back 278 /// </summary> 279 private void DoAbortAll() { 280 lock (slaveJobs) { 281 foreach (SlaveJob sj in slaveJobs.Values) { 282 if (!sj.Finished) { 283 sj.KillAppDomain(); 235 284 } 236 catch (Exception e) { 237 clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")"); 238 } 239 finally { 240 KillAppDomain(job.Id); // kill app-domain in every case 241 } 242 } 243 } 244 } 245 246 private void DoStopJob(Guid jobId) { 247 if (!executors.ContainsKey(jobId)) { 248 clientCom.LogMessage("DoStopJob: Can't find job with id " + jobId); 249 } else { 250 Job job = wcfService.GetJob(jobId); 251 252 if (job != null) { 253 executors[job.Id].Stop(); 254 JobData sJob = executors[job.Id].GetFinishedJob(); 255 job.ExecutionTime = executors[job.Id].ExecutionTime; 256 257 try { 258 if (executors[job.Id].CurrentException != string.Empty) { 259 wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException); 260 } 261 SlaveStatusInfo.JobsAborted++; 262 263 clientCom.LogMessage("Sending the stopped job with id: " + job.Id); 264 wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Aborted); 265 } 266 catch (Exception e) { 267 clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")"); 268 } 269 finally { 270 KillAppDomain(job.Id); // kill app-domain in every case 271 } 272 } 273 } 274 } 275 276 /// <summary> 277 /// aborts all running jobs, no results are sent back 278 /// </summary> 279 private void DoAbortAll() { 280 List<Guid> jobIds; 281 lock (executors) { 282 jobIds = new List<Guid>(executors.Keys); 283 } 284 foreach (Guid jobId in jobIds) { 285 KillAppDomain(jobId); 285 } 286 286 } 287 287 clientCom.LogMessage("Aborted all jobs!"); … … 294 294 clientCom.LogMessage("Pause all received"); 295 295 296 //copy guids because there will be removed items from 'Jobs' 297 List<Guid> jobIds; 298 lock (executors) { 299 jobIds = new List<Guid>(Executors.Keys); 300 } 301 302 foreach (Guid jobId in jobIds) { 303 DoPauseJob(jobId); 296 lock (slaveJobs) { 297 foreach (SlaveJob sj in slaveJobs.Values) { 298 if (!sj.Finished) { 299 sj.PauseJob(); 300 } 301 } 304 302 } 305 303 } … … 311 309 clientCom.LogMessage("Stop all received"); 312 310 313 //copy guids because there will be removed items from 'Jobs' 314 List<Guid> jobIds; 315 lock (executors) { 316 jobIds = new List<Guid>(executors.Keys); 317 } 318 319 foreach (Guid jobId in jobIds) { 320 DoStopJob(jobId); 311 lock (slaveJobs) { 312 foreach (SlaveJob sj in slaveJobs.Values) { 313 if (!sj.Finished) { 314 sj.StopJob(); 315 } 316 } 321 317 } 322 318 } … … 341 337 clientCom.LogMessage("Logging out"); 342 338 343 344 lock (executors) { 345 clientCom.LogMessage("executors locked"); 346 foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) { 347 clientCom.LogMessage("Shutting down Appdomain for " + kvp.Key); 348 appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException); 349 AppDomain.Unload(kvp.Value); 350 } 351 } 339 DoAbortAll(); 340 352 341 WcfService.Instance.Disconnect(); 353 342 clientCom.Shutdown(); … … 382 371 383 372 /// <summary> 384 /// Pauses a job, which means sending it to the server and killing it locally;385 /// atm only used when executor is waiting for child jobs386 /// </summary>387 public void PauseWaitJob(JobData data) {388 if (!Executors.ContainsKey(data.JobId)) {389 clientCom.LogMessage("Can't find job with id " + data.JobId);390 } else {391 Job job = wcfService.GetJob(data.JobId);392 wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);393 wcfService.UpdateJobState(job.Id, JobState.Waiting, null);394 }395 KillAppDomain(data.JobId);396 }397 398 /// <summary>399 /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.400 /// once the connection gets reestablished, the job gets submitted401 /// </summary>402 public void SendFinishedJob(Guid jobId) {403 try {404 clientCom.LogMessage("Getting the finished job with id: " + jobId);405 if (!executors.ContainsKey(jobId)) {406 clientCom.LogMessage("Executor doesn't exist");407 return;408 }409 if (!executors.ContainsKey(jobId)) {410 clientCom.LogMessage("Job doesn't exist");411 return;412 }413 Job job = wcfService.GetJob(jobId);414 job.ExecutionTime = executors[jobId].ExecutionTime;415 416 if (executors[jobId].Aborted) {417 SlaveStatusInfo.JobsAborted++;418 } else {419 SlaveStatusInfo.JobsProcessed++;420 }421 422 if (executors[jobId].CurrentException != string.Empty) {423 wcfService.UpdateJobState(jobId, JobState.Failed, executors[jobId].CurrentException);424 }425 426 JobData sJob = executors[jobId].GetFinishedJob();427 try {428 clientCom.LogMessage("Sending the finished job with id: " + jobId);429 wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);430 }431 catch (Exception e) {432 clientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");433 }434 finally {435 KillAppDomain(jobId);436 heartbeatManager.AwakeHeartBeatThread();437 }438 }439 catch (Exception e) {440 OnExceptionOccured(e);441 }442 }443 444 private static object startInAppDomainLocker = new object();445 private static Mutex startInAppDomainMutex = new Mutex();446 /// <summary>447 /// A new Job from the wcfService has been received and will be started within a AppDomain.448 /// </summary>449 private void StartJobInAppDomain(Job job, JobData jobData) {450 clientCom.LogMessage("Received new job with id " + job.Id);451 clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());452 453 startInAppDomainMutex.WaitOne(); // mutex is used instead of lock to be able to release it before executor.Start() is called (which may take long time)454 bool released = false;455 456 if (executors.ContainsKey(job.Id)) {457 clientCom.LogMessage("Job with key " + job.Id + " already exists. Job will be ignored.");458 return;459 }460 461 String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, job.Id.ToString());462 bool pluginsPrepared = false;463 string configFileName = string.Empty;464 465 try {466 PluginCache.Instance.PreparePlugins(job, out configFileName);467 clientCom.LogMessage("Plugins fetched for job " + job.Id);468 pluginsPrepared = true;469 }470 catch (Exception exception) {471 clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", job.Id, exception));472 wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());473 SlaveStatusInfo.JobsAborted++;474 }475 476 if (pluginsPrepared) {477 try {478 //TODO: switch back to unprivileged sandbox479 AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitPrivilegedSandbox(job.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));480 appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);481 Executor executor;482 appDomains.Add(job.Id, appDomain);483 clientCom.LogMessage("Creating AppDomain");484 executor = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);485 clientCom.LogMessage("Created AppDomain");486 executor.Core = this;487 executor.JobId = job.Id;488 executor.CoresNeeded = job.CoresNeeded;489 executor.MemoryNeeded = job.MemoryNeeded;490 clientCom.LogMessage("Starting Executor for job " + job.Id);491 lock (executors) {492 if (executors.ContainsKey(job.Id)) {493 throw new JobAlreadyExistsException(job.Id);494 }495 executors.Add(job.Id, executor);496 }497 startInAppDomainMutex.ReleaseMutex();498 released = true;499 semaphores[job.Id] = new Semaphore(0, 1);500 executor.Start(jobData.Data);501 semaphores[job.Id].Release();502 }503 catch (JobAlreadyExistsException e) {504 clientCom.LogMessage(string.Format("Job {0} has already been started. Job will be ignored", e.JobId));505 }506 catch (Exception exception) {507 clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + job.Id);508 clientCom.LogMessage("Error thrown is: " + exception.ToString());509 510 if (executors.ContainsKey(job.Id) && executors[job.Id].CurrentException != string.Empty) {511 wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);512 } else {513 wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());514 }515 SlaveStatusInfo.JobsAborted++;516 517 KillAppDomain(job.Id);518 }519 520 if (!released)521 startInAppDomainMutex.ReleaseMutex();522 }523 heartbeatManager.AwakeHeartBeatThread();524 }525 526 public event EventHandler<EventArgs<Exception>> ExceptionOccured;527 private void OnExceptionOccured(Exception e) {528 clientCom.LogMessage("Error: " + e.ToString());529 var handler = ExceptionOccured;530 if (handler != null) handler(this, new EventArgs<Exception>(e));531 }532 533 private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {534 clientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());535 KillAppDomain(new Guid(e.ExceptionObject.ToString()));536 }537 538 /// <summary>539 373 /// Enqueues messages from the executor to the message queue. 540 374 /// This is necessary if the core thread has to execute certain actions, e.g. … … 549 383 } 550 384 551 /// <summary> 552 /// Kill a appdomain with a specific id. 553 /// </summary> 554 /// <param name="jobId">the GUID of the job</param> 555 public void KillAppDomain(Guid jobId) { 556 if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) { 557 EnqueueExecutorMessage<Guid>(KillAppDomain, jobId); 558 return; 559 } 560 561 clientCom.LogMessage("Shutting down Appdomain for Job " + jobId); 562 lock (executors) { 563 try { 564 if (executors.ContainsKey(jobId)) { 565 executors[jobId].Dispose(); 566 executors.Remove(jobId); 567 } 568 569 if (appDomains.ContainsKey(jobId)) { 570 appDomains[jobId].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException); 571 int repeat = 5; 572 while (repeat > 0) { 573 try { 574 semaphores[jobId].WaitOne(); 575 AppDomain.Unload(appDomains[jobId]); 576 semaphores[jobId].Dispose(); 577 semaphores.Remove(jobId); 578 repeat = 0; 579 } 580 catch (CannotUnloadAppDomainException) { 581 clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec."); 582 Thread.Sleep(1000); 583 repeat--; 584 if (repeat == 0) { 585 clientCom.LogMessage("Could not unload AppDomain, shutting down core..."); 586 throw; // rethrow and let app crash 587 } 588 } 589 } 590 appDomains.Remove(jobId); 591 } 592 593 PluginCache.Instance.DeletePluginsForJob(jobId); 594 GC.Collect(); 595 } 596 catch (Exception ex) { 597 clientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString()); 598 } 599 } 600 GC.Collect(); 601 clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole()); 602 } 603 604 public override object InitializeLifetimeService() { 605 return null; // avoid destruction of proxy object after 5 minutes 606 } 607 608 public int GetCoresNeeded() { 609 lock (executors) { 610 return executors.Sum(x => x.Value.CoresNeeded); 385 public void RemoveSlaveJobFromList(Guid jobId) { 386 lock (slaveJobs) { 387 if (slaveJobs.ContainsKey(jobId)) { 388 slaveJobs.Remove(jobId); 389 } 611 390 } 612 391 }
Note: See TracChangeset
for help on using the changeset viewer.