Changeset 6203 for branches/HeuristicLab.Hive-3.4
- Timestamp:
- 05/16/11 21:19:34 (13 years ago)
- Location:
- branches/HeuristicLab.Hive-3.4/sources
- Files:
-
- 3 added
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave.Views/3.4/SlaveView.cs
r6004 r6203 124 124 DataPoint pJobs = new DataPoint(status.Jobs.Count, status.Jobs.Count); 125 125 DataPoint pJobsAborted = new DataPoint(status.JobsAborted, status.JobsAborted); 126 DataPoint pJobsDone = new DataPoint(status.Jobs Done, status.JobsDone);126 DataPoint pJobsDone = new DataPoint(status.JobsFinished, status.JobsFinished); 127 127 DataPoint pJobsFetched = new DataPoint(status.JobsFetched, status.JobsFetched); 128 128 … … 131 131 pJobsAborted.LegendText = "Aborted jobs: " + status.JobsAborted; 132 132 pJobsAborted.Color = System.Drawing.Color.Red; 133 pJobsDone.LegendText = "Finished jobs: " + status.Jobs Done;133 pJobsDone.LegendText = "Finished jobs: " + status.JobsFinished; 134 134 pJobsDone.Color = System.Drawing.Color.Green; 135 135 pJobsFetched.LegendText = "Fetched jobs: " + status.JobsFetched; -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/ConfigManager.cs
r6168 r6203 79 79 80 80 st.TotalCores = slave.Cores.HasValue ? slave.Cores.Value : 0; 81 st.FreeCores = slave.Cores.HasValue ? slave.Cores.Value - GetUsedCores(): 0;81 st.FreeCores = slave.Cores.HasValue ? slave.Cores.Value - SlaveStatusInfo.UsedCores : 0; 82 82 83 83 st.JobsAborted = SlaveStatusInfo.JobsAborted; 84 st.Jobs Done = SlaveStatusInfo.JobsProcessed;84 st.JobsFinished = SlaveStatusInfo.JobsFinished; 85 85 st.JobsFetched = SlaveStatusInfo.JobsFetched; 86 87 Dictionary<Guid, Executor> engines = Core.Executors; 86 st.JobsFailed = SlaveStatusInfo.JobsFailed; 87 88 Dictionary<Guid, SlaveJob> slaveJobs = Core.SlaveJobs; 88 89 st.Jobs = new List<JobStatus>(); 89 90 90 lock (engines) { 91 foreach (KeyValuePair<Guid, Executor> kvp in engines) { 92 Executor e = kvp.Value; 93 st.Jobs.Add(new JobStatus { JobId = e.JobId, ExecutionTime = e.ExecutionTime, Since = e.CreationTime }); 91 lock (slaveJobs) { 92 foreach (KeyValuePair<Guid, SlaveJob> kvp in slaveJobs) { 93 Executor e = kvp.Value.JobExecutor; 94 if (e != null && !kvp.Value.Finished) { 95 st.Jobs.Add(new JobStatus { JobId = e.JobId, ExecutionTime = e.ExecutionTime, Since = e.CreationTime }); 96 } 94 97 } 95 98 } … … 99 102 public Dictionary<Guid, TimeSpan> GetExecutionTimeOfAllJobs() { 100 103 Dictionary<Guid, TimeSpan> prog = new Dictionary<Guid, TimeSpan>(); 101 Dictionary<Guid, Executor> engines = Core.Executors; 102 lock (engines) { 103 foreach (KeyValuePair<Guid, Executor> kvp in engines) { 104 Executor e = kvp.Value; 105 //don't include jobs in hb's which are currently serializing 106 if (e.SendHeartbeatForExecutor) { 107 prog[e.JobId] = e.ExecutionTime; 104 Dictionary<Guid, SlaveJob> slaveJobs = Core.SlaveJobs; 105 lock (slaveJobs) { 106 foreach (KeyValuePair<Guid, SlaveJob> kvp in slaveJobs) { 107 Executor e = kvp.Value.JobExecutor; 108 if (e != null && !kvp.Value.Finished) { 109 //don't include jobs in hb's which are currently serializing 110 if (e.SendHeartbeatForExecutor) { 111 prog[e.JobId] = e.ExecutionTime; 112 } 108 113 } 109 114 } 110 115 } 111 116 return prog; 112 }113 114 public int GetUsedCores() {115 return Core.GetCoresNeeded();116 117 } 117 118 -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs
r6202 r6203 23 23 using System.Collections.Generic; 24 24 using System.Diagnostics; 25 using System.IO;26 using System.Linq;27 25 using System.ServiceModel; 28 26 using System.Threading; … … 45 43 public static ILog Log { get; set; } 46 44 47 private Dictionary<Guid, Executor> executors = new Dictionary<Guid, Executor>(); 48 private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>(); 49 50 // signalizes if the Executor.Start method has properly finished. only then the appdomain may be unloaded 51 private Dictionary<Guid, Semaphore> semaphores = new Dictionary<Guid, Semaphore>(); 45 private Dictionary<Guid, SlaveJob> slaveJobs = new Dictionary<Guid, SlaveJob>(); 52 46 53 47 private WcfService wcfService; 54 private HeartbeatManager heartbeatManager;55 p rivate int coreThreadId;48 private static HeartbeatManager heartbeatManager; 49 public static HeartbeatManager HBManager { get { return heartbeatManager; } } 56 50 57 51 private ISlaveCommunication clientCom; 58 52 private ServiceHost slaveComm; 59 53 60 public Dictionary<Guid, Executor> Executors {61 get { return executors; }54 public Dictionary<Guid, SlaveJob> SlaveJobs { 55 get { return slaveJobs; } 62 56 } 63 57 … … 68 62 /// </summary> 69 63 public void Start() { 70 coreThreadId = Thread.CurrentThread.ManagedThreadId;71 64 abortRequested = false; 72 65 … … 160 153 Task.Factory.StartNew((jobIdObj) => { 161 154 Guid jobId = (Guid)jobIdObj; 162 Job job = wcfService.GetJob(jobId); 163 if (job == null) throw new JobNotFoundException(jobId); 164 JobData jobData = wcfService.GetJobData(job.Id); 165 if (jobData == null) throw new JobDataNotFoundException(jobId); 166 SlaveStatusInfo.JobsFetched++; 167 job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null); 168 if (job == null) throw new JobNotFoundException(jobId); 169 StartJobInAppDomain(job, jobData); 155 SlaveJob newJob = new SlaveJob(this); 156 bool start = true; 157 158 lock (slaveJobs) { 159 if (slaveJobs.ContainsKey(jobId)) { 160 start = false; 161 clientCom.LogMessage(string.Format("Job with id {0} already exists. Start aborted.", jobId)); 162 } else { 163 slaveJobs.Add(jobId, newJob); 164 } 165 } 166 167 if (start) { 168 newJob.CalculateJob(jobId); 169 } 170 170 }, container.JobId) 171 171 .ContinueWith((t) => { … … 173 173 clientCom.LogMessage(t.Exception.ToString()); 174 174 wcfService.UpdateJobState(container.JobId, JobState.Failed, t.Exception.ToString()); 175 SlaveStatusInfo. JobsAborted++;175 SlaveStatusInfo.IncrementJobsFailed(); 176 176 }, TaskContinuationOptions.OnlyOnFaulted); 177 177 break; … … 189 189 break; 190 190 case MessageContainer.MessageType.AbortJob: 191 SlaveStatusInfo.JobsAborted++; 192 KillAppDomain(container.JobId); 191 SlaveStatusInfo.IncrementJobsAborted(); //TODO: move to a sane place 192 193 Task.Factory.StartNew((jobIdObj) => { 194 Guid jobId = (Guid)jobIdObj; 195 bool abort = true; 196 SlaveJob sj = null; 197 198 lock (slaveJobs) { 199 if (!slaveJobs.ContainsKey(jobId)) { 200 clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Abort aborted.", jobId)); 201 abort = false; 202 } else { 203 sj = slaveJobs[jobId]; 204 } 205 } 206 if (abort && !sj.Finished) { 207 sj.KillAppDomain(); 208 } 209 }, container.JobId) 210 .ContinueWith((t) => { 211 // handle exception of task 212 clientCom.LogMessage(t.Exception.ToString()); 213 }, TaskContinuationOptions.OnlyOnFaulted); 193 214 break; 194 215 case MessageContainer.MessageType.StopJob: 195 DoStopJob(container.JobId); 216 Task.Factory.StartNew((jobIdObj) => { 217 Guid jobId = (Guid)jobIdObj; 218 bool stop = true; 219 SlaveJob sj = null; 220 221 lock (slaveJobs) { 222 if (!slaveJobs.ContainsKey(jobId)) { 223 clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Stop aborted.", jobId)); 224 stop = false; 225 } else { 226 sj = slaveJobs[jobId]; 227 } 228 } 229 if (stop && !sj.Finished) { 230 sj.StopJob(); 231 } 232 }, container.JobId) 233 .ContinueWith((t) => { 234 // handle exception of task 235 clientCom.LogMessage(t.Exception.ToString()); 236 }, TaskContinuationOptions.OnlyOnFaulted); 196 237 break; 197 238 case MessageContainer.MessageType.PauseJob: 198 DoPauseJob(container.JobId); 239 Task.Factory.StartNew((jobIdObj) => { 240 Guid jobId = (Guid)jobIdObj; 241 bool pause = true; 242 SlaveJob sj = null; 243 244 lock (slaveJobs) { 245 if (!slaveJobs.ContainsKey(jobId)) { 246 clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Pause aborted.", jobId)); 247 pause = false; 248 } else { 249 sj = slaveJobs[jobId]; 250 } 251 } 252 if (pause && !sj.Finished) { 253 sj.PauseJob(); 254 } 255 }, container.JobId) 256 .ContinueWith((t) => { 257 // handle exception of task 258 clientCom.LogMessage(t.Exception.ToString()); 259 }, TaskContinuationOptions.OnlyOnFaulted); 199 260 break; 200 261 case MessageContainer.MessageType.Restart: … … 213 274 } 214 275 215 private void DoPauseJob(Guid jobId) { 216 if (!executors.ContainsKey(jobId)) { 217 clientCom.LogMessage("DoPauseJob: Can't find job with id " + jobId); 218 } else { 219 Job job = wcfService.GetJob(jobId); 220 221 if (job != null && executors.ContainsKey(job.Id)) { 222 executors[job.Id].Pause(); 223 JobData sJob = executors[job.Id].GetPausedJob(); 224 job.ExecutionTime = executors[job.Id].ExecutionTime; 225 226 try { 227 if (executors[job.Id].CurrentException != string.Empty) { 228 wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException); 229 SlaveStatusInfo.JobsAborted++; 230 } else { 231 SlaveStatusInfo.JobsProcessed++; 232 } 233 clientCom.LogMessage("Sending the paused job with id: " + job.Id); 234 wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused); 276 /// <summary> 277 /// aborts all running jobs, no results are sent back 278 /// </summary> 279 private void DoAbortAll() { 280 lock (slaveJobs) { 281 foreach (SlaveJob sj in slaveJobs.Values) { 282 if (!sj.Finished) { 283 sj.KillAppDomain(); 235 284 } 236 catch (Exception e) { 237 clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")"); 238 } 239 finally { 240 KillAppDomain(job.Id); // kill app-domain in every case 241 } 242 } 243 } 244 } 245 246 private void DoStopJob(Guid jobId) { 247 if (!executors.ContainsKey(jobId)) { 248 clientCom.LogMessage("DoStopJob: Can't find job with id " + jobId); 249 } else { 250 Job job = wcfService.GetJob(jobId); 251 252 if (job != null) { 253 executors[job.Id].Stop(); 254 JobData sJob = executors[job.Id].GetFinishedJob(); 255 job.ExecutionTime = executors[job.Id].ExecutionTime; 256 257 try { 258 if (executors[job.Id].CurrentException != string.Empty) { 259 wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException); 260 } 261 SlaveStatusInfo.JobsAborted++; 262 263 clientCom.LogMessage("Sending the stopped job with id: " + job.Id); 264 wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Aborted); 265 } 266 catch (Exception e) { 267 clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")"); 268 } 269 finally { 270 KillAppDomain(job.Id); // kill app-domain in every case 271 } 272 } 273 } 274 } 275 276 /// <summary> 277 /// aborts all running jobs, no results are sent back 278 /// </summary> 279 private void DoAbortAll() { 280 List<Guid> jobIds; 281 lock (executors) { 282 jobIds = new List<Guid>(executors.Keys); 283 } 284 foreach (Guid jobId in jobIds) { 285 KillAppDomain(jobId); 285 } 286 286 } 287 287 clientCom.LogMessage("Aborted all jobs!"); … … 294 294 clientCom.LogMessage("Pause all received"); 295 295 296 //copy guids because there will be removed items from 'Jobs' 297 List<Guid> jobIds; 298 lock (executors) { 299 jobIds = new List<Guid>(Executors.Keys); 300 } 301 302 foreach (Guid jobId in jobIds) { 303 DoPauseJob(jobId); 296 lock (slaveJobs) { 297 foreach (SlaveJob sj in slaveJobs.Values) { 298 if (!sj.Finished) { 299 sj.PauseJob(); 300 } 301 } 304 302 } 305 303 } … … 311 309 clientCom.LogMessage("Stop all received"); 312 310 313 //copy guids because there will be removed items from 'Jobs' 314 List<Guid> jobIds; 315 lock (executors) { 316 jobIds = new List<Guid>(executors.Keys); 317 } 318 319 foreach (Guid jobId in jobIds) { 320 DoStopJob(jobId); 311 lock (slaveJobs) { 312 foreach (SlaveJob sj in slaveJobs.Values) { 313 if (!sj.Finished) { 314 sj.StopJob(); 315 } 316 } 321 317 } 322 318 } … … 341 337 clientCom.LogMessage("Logging out"); 342 338 343 344 lock (executors) { 345 clientCom.LogMessage("executors locked"); 346 foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) { 347 clientCom.LogMessage("Shutting down Appdomain for " + kvp.Key); 348 appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException); 349 AppDomain.Unload(kvp.Value); 350 } 351 } 339 DoAbortAll(); 340 352 341 WcfService.Instance.Disconnect(); 353 342 clientCom.Shutdown(); … … 382 371 383 372 /// <summary> 384 /// Pauses a job, which means sending it to the server and killing it locally;385 /// atm only used when executor is waiting for child jobs386 /// </summary>387 public void PauseWaitJob(JobData data) {388 if (!Executors.ContainsKey(data.JobId)) {389 clientCom.LogMessage("Can't find job with id " + data.JobId);390 } else {391 Job job = wcfService.GetJob(data.JobId);392 wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);393 wcfService.UpdateJobState(job.Id, JobState.Waiting, null);394 }395 KillAppDomain(data.JobId);396 }397 398 /// <summary>399 /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.400 /// once the connection gets reestablished, the job gets submitted401 /// </summary>402 public void SendFinishedJob(Guid jobId) {403 try {404 clientCom.LogMessage("Getting the finished job with id: " + jobId);405 if (!executors.ContainsKey(jobId)) {406 clientCom.LogMessage("Executor doesn't exist");407 return;408 }409 if (!executors.ContainsKey(jobId)) {410 clientCom.LogMessage("Job doesn't exist");411 return;412 }413 Job job = wcfService.GetJob(jobId);414 job.ExecutionTime = executors[jobId].ExecutionTime;415 416 if (executors[jobId].Aborted) {417 SlaveStatusInfo.JobsAborted++;418 } else {419 SlaveStatusInfo.JobsProcessed++;420 }421 422 if (executors[jobId].CurrentException != string.Empty) {423 wcfService.UpdateJobState(jobId, JobState.Failed, executors[jobId].CurrentException);424 }425 426 JobData sJob = executors[jobId].GetFinishedJob();427 try {428 clientCom.LogMessage("Sending the finished job with id: " + jobId);429 wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);430 }431 catch (Exception e) {432 clientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");433 }434 finally {435 KillAppDomain(jobId);436 heartbeatManager.AwakeHeartBeatThread();437 }438 }439 catch (Exception e) {440 OnExceptionOccured(e);441 }442 }443 444 private static object startInAppDomainLocker = new object();445 private static Mutex startInAppDomainMutex = new Mutex();446 /// <summary>447 /// A new Job from the wcfService has been received and will be started within a AppDomain.448 /// </summary>449 private void StartJobInAppDomain(Job job, JobData jobData) {450 clientCom.LogMessage("Received new job with id " + job.Id);451 clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());452 453 startInAppDomainMutex.WaitOne(); // mutex is used instead of lock to be able to release it before executor.Start() is called (which may take long time)454 bool released = false;455 456 if (executors.ContainsKey(job.Id)) {457 clientCom.LogMessage("Job with key " + job.Id + " already exists. Job will be ignored.");458 return;459 }460 461 String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, job.Id.ToString());462 bool pluginsPrepared = false;463 string configFileName = string.Empty;464 465 try {466 PluginCache.Instance.PreparePlugins(job, out configFileName);467 clientCom.LogMessage("Plugins fetched for job " + job.Id);468 pluginsPrepared = true;469 }470 catch (Exception exception) {471 clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", job.Id, exception));472 wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());473 SlaveStatusInfo.JobsAborted++;474 }475 476 if (pluginsPrepared) {477 try {478 //TODO: switch back to unprivileged sandbox479 AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitPrivilegedSandbox(job.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));480 appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);481 Executor executor;482 appDomains.Add(job.Id, appDomain);483 clientCom.LogMessage("Creating AppDomain");484 executor = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);485 clientCom.LogMessage("Created AppDomain");486 executor.Core = this;487 executor.JobId = job.Id;488 executor.CoresNeeded = job.CoresNeeded;489 executor.MemoryNeeded = job.MemoryNeeded;490 clientCom.LogMessage("Starting Executor for job " + job.Id);491 lock (executors) {492 if (executors.ContainsKey(job.Id)) {493 throw new JobAlreadyExistsException(job.Id);494 }495 executors.Add(job.Id, executor);496 }497 startInAppDomainMutex.ReleaseMutex();498 released = true;499 semaphores[job.Id] = new Semaphore(0, 1);500 executor.Start(jobData.Data);501 semaphores[job.Id].Release();502 }503 catch (JobAlreadyExistsException e) {504 clientCom.LogMessage(string.Format("Job {0} has already been started. Job will be ignored", e.JobId));505 }506 catch (Exception exception) {507 clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + job.Id);508 clientCom.LogMessage("Error thrown is: " + exception.ToString());509 510 if (executors.ContainsKey(job.Id) && executors[job.Id].CurrentException != string.Empty) {511 wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);512 } else {513 wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());514 }515 SlaveStatusInfo.JobsAborted++;516 517 KillAppDomain(job.Id);518 }519 520 if (!released)521 startInAppDomainMutex.ReleaseMutex();522 }523 heartbeatManager.AwakeHeartBeatThread();524 }525 526 public event EventHandler<EventArgs<Exception>> ExceptionOccured;527 private void OnExceptionOccured(Exception e) {528 clientCom.LogMessage("Error: " + e.ToString());529 var handler = ExceptionOccured;530 if (handler != null) handler(this, new EventArgs<Exception>(e));531 }532 533 private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {534 clientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());535 KillAppDomain(new Guid(e.ExceptionObject.ToString()));536 }537 538 /// <summary>539 373 /// Enqueues messages from the executor to the message queue. 540 374 /// This is necessary if the core thread has to execute certain actions, e.g. … … 549 383 } 550 384 551 /// <summary> 552 /// Kill a appdomain with a specific id. 553 /// </summary> 554 /// <param name="jobId">the GUID of the job</param> 555 public void KillAppDomain(Guid jobId) { 556 if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) { 557 EnqueueExecutorMessage<Guid>(KillAppDomain, jobId); 558 return; 559 } 560 561 clientCom.LogMessage("Shutting down Appdomain for Job " + jobId); 562 lock (executors) { 563 try { 564 if (executors.ContainsKey(jobId)) { 565 executors[jobId].Dispose(); 566 executors.Remove(jobId); 567 } 568 569 if (appDomains.ContainsKey(jobId)) { 570 appDomains[jobId].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException); 571 int repeat = 5; 572 while (repeat > 0) { 573 try { 574 semaphores[jobId].WaitOne(); 575 AppDomain.Unload(appDomains[jobId]); 576 semaphores[jobId].Dispose(); 577 semaphores.Remove(jobId); 578 repeat = 0; 579 } 580 catch (CannotUnloadAppDomainException) { 581 clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec."); 582 Thread.Sleep(1000); 583 repeat--; 584 if (repeat == 0) { 585 clientCom.LogMessage("Could not unload AppDomain, shutting down core..."); 586 throw; // rethrow and let app crash 587 } 588 } 589 } 590 appDomains.Remove(jobId); 591 } 592 593 PluginCache.Instance.DeletePluginsForJob(jobId); 594 GC.Collect(); 595 } 596 catch (Exception ex) { 597 clientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString()); 598 } 599 } 600 GC.Collect(); 601 clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole()); 602 } 603 604 public override object InitializeLifetimeService() { 605 return null; // avoid destruction of proxy object after 5 minutes 606 } 607 608 public int GetCoresNeeded() { 609 lock (executors) { 610 return executors.Sum(x => x.Value.CoresNeeded); 385 public void RemoveSlaveJobFromList(Guid jobId) { 386 lock (slaveJobs) { 387 if (slaveJobs.ContainsKey(jobId)) { 388 slaveJobs.Remove(jobId); 389 } 611 390 } 612 391 } -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Executor.cs
r6178 r6203 40 40 private Semaphore pauseStopSem = new Semaphore(0, 1); 41 41 private Semaphore startJobSem = new Semaphore(0, 1); 42 //make pause or stop wait until start is finished 43 private Semaphore jobStartedSem = new Semaphore(0, 1); 44 45 public ExecutorQueue executorQueue; 42 46 43 47 public bool SendHeartbeatForExecutor { get; set; } … … 72 76 public Executor() { 73 77 SendHeartbeatForExecutor = true; 78 executorQueue = new ExecutorQueue(); 74 79 } 75 80 … … 89 94 } else { 90 95 Job.Start(); 91 if (! startJobSem.WaitOne(TimeSpan.FromSeconds(15))) {96 if (!jobStartedSem.WaitOne(TimeSpan.FromSeconds(15))) { 92 97 throw new TimeoutException("Timeout when starting the job. JobStarted event was not fired."); 93 98 } 99 jobStartedSem.Release(); 94 100 } 95 101 } … … 102 108 public void Pause() { 103 109 SendHeartbeatForExecutor = false; 110 // wait until job is started. if this does not happen, the Job is null an we give up 111 jobStartedSem.WaitOne(TimeSpan.FromSeconds(15)); 104 112 if (Job == null) { 105 113 currentException = new Exception("Pausing job " + this.JobId + ": Job is null"); 106 Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);114 return; 107 115 } 108 116 … … 121 129 public void Stop() { 122 130 SendHeartbeatForExecutor = false; 131 // wait until job is started. if this does not happen, the Job is null an we give up 132 jobStartedSem.WaitOne(TimeSpan.FromSeconds(15)); 123 133 if (Job == null) { 124 134 currentException = new Exception("Stopping job " + this.JobId + ": Job is null"); 125 Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);126 135 } 127 136 wasJobAborted = true; … … 176 185 childJob.PluginsNeededIds = FindPluginsNeeded(e.Value); 177 186 178 //TODO: is return value needed? 179 WcfService.Instance.AddChildJob(this.JobId, childJob, childJobData); 187 ExecutorMessage msg = new ExecutorMessage(ExecutorMessageType.NewChildJob); 188 msg.MsgData = childJobData; 189 msg.MsgJob = childJob; 190 191 executorQueue.AddMessage(msg); 180 192 } 181 193 … … 188 200 jdata.JobId = this.JobId; 189 201 190 Core.PauseWaitJob(jdata); 202 ExecutorMessage msg = new ExecutorMessage(ExecutorMessageType.WaitForChildJobs); 203 msg.MsgData = jdata; 204 executorQueue.AddMessage(msg); 191 205 } 192 206 193 207 private void Job_DeleteChildJobs(object sender, EventArgs e) { 194 WcfService.Instance.DeleteChildJobs(JobId);208 executorQueue.AddMessage(ExecutorMessageType.DeleteChildJobs); 195 209 } 196 210 … … 198 212 HeuristicLab.Common.EventArgs<Exception> ex = (HeuristicLab.Common.EventArgs<Exception>)e; 199 213 currentException = ex.Value; 200 Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId);201 214 Aborted = true; 215 216 executorQueue.AddMessage(ExecutorMessageType.JobFailed); 202 217 } 203 218 … … 208 223 } else { 209 224 //it's a clean and finished job, so send it 210 Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId);225 executorQueue.AddMessage(ExecutorMessageType.JobStopped); 211 226 } 212 227 } … … 216 231 if (currentException == null) { 217 232 currentException = new Exception("Getting finished job " + this.JobId + ": Job is null"); 218 }219 Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);233 return GetJob(); 234 } 220 235 } 221 236 … … 234 249 } 235 250 236 237 251 public JobData GetPausedJob() { 238 252 if (Job.ExecutionState != HeuristicLab.Core.ExecutionState.Paused) { … … 247 261 248 262 void Job_JobStarted(object sender, EventArgs e) { 249 startJobSem.Release();263 jobStartedSem.Release(); 250 264 } 251 265 … … 255 269 } else { 256 270 JobData jdata = new JobData(); 257 jdata.Data = PersistenceUtil.Serialize(Job); 271 if (Job == null) { 272 //send empty job and save exception 273 jdata.Data = PersistenceUtil.Serialize(new JobData()); 274 if (currentException == null) { 275 currentException = new Exception("Job with id " + this.JobId + " is null, sending empty job"); 276 } 277 } else { 278 jdata.Data = PersistenceUtil.Serialize(Job); 279 } 258 280 jdata.JobId = JobId; 259 281 return jdata; -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/HeartbeatManager.cs
r6112 r6203 89 89 Heartbeat heartBeatData = new Heartbeat { 90 90 SlaveId = info.Id, 91 FreeCores = info.Cores.HasValue ? info.Cores.Value - ConfigManager.Instance.GetUsedCores(): 0,91 FreeCores = info.Cores.HasValue ? info.Cores.Value - SlaveStatusInfo.UsedCores : 0, 92 92 FreeMemory = ConfigManager.GetFreeMemory(), 93 93 JobProgress = ConfigManager.Instance.GetExecutionTimeOfAllJobs(), -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/HeuristicLab.Clients.Hive.Slave-3.4.csproj
r6202 r6203 98 98 <ItemGroup> 99 99 <Compile Include="ConfigManager.cs" /> 100 <Compile Include="Exceptions\JobAlreadyExistsException.cs" /> 100 101 <Compile Include="Exceptions\JobNotFoundException.cs" /> 101 102 <Compile Include="Exceptions\JobNotDataFoundException.cs" /> 102 <Compile Include="Exceptions\JobAlreadyExistsException.cs" /> 103 <Compile Include="ExecutorMessage.cs" /> 104 <Compile Include="ExecutorQueue.cs" /> 103 105 <Compile Include="SlaveClientCom.cs" /> 104 106 <Compile Include="Core.cs" /> … … 120 122 <Compile Include="ServiceContracts\ISlaveCommunicationCallbacks.cs" /> 121 123 <Compile Include="SlaveCommunicationService.cs" /> 124 <Compile Include="SlaveJob.cs" /> 122 125 <Compile Include="SlaveStatusInfo.cs" /> 123 126 <Compile Include="StatusCommons.cs" /> -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/MessageQueue.cs
r5599 r6203 52 52 /// </summary> 53 53 /// <returns>nothing</returns> 54 public override object InitializeLifetimeService() {54 /*public override object InitializeLifetimeService() { 55 55 return null; 56 } 56 } */ 57 57 58 58 /// <summary> -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/SlaveStatusInfo.cs
r6202 r6203 24 24 namespace HeuristicLab.Clients.Hive.SlaveCore { 25 25 public class SlaveStatusInfo { 26 static public int JobsProcessed { get; set; } 27 static public int JobsAborted { get; set; } 28 static public int JobsFetched { get; set; } 26 static private int jobsFinished; // everything went fine 27 static private int jobsAborted; // server sent stop or abort 28 static private int jobsFetched; // number of fetched jobs 29 static private int jobsFailed; // jobs that failed in the sandbox 30 static private int usedCores; // number of cores currently used 31 29 32 static public DateTime LoginTime { get; set; } 33 static private Object jobStatLock = new Object(); 34 static private Object coreLock = new Object(); 35 36 static public int UsedCores { 37 get { 38 lock (coreLock) { 39 return usedCores; 40 } 41 } 42 } 43 44 static public int JobsFinished { 45 get { 46 lock (jobStatLock) { 47 return jobsFinished; 48 } 49 } 50 } 51 52 static public int JobsAborted { 53 get { 54 lock (jobStatLock) { 55 return jobsAborted; 56 } 57 } 58 } 59 60 static public int JobsFetched { 61 get { 62 lock (jobStatLock) { 63 return jobsFetched; 64 } 65 } 66 } 67 68 static public int JobsFailed { 69 get { 70 lock (jobStatLock) { 71 return jobsFailed; 72 } 73 } 74 } 75 76 public static void IncrementJobsFinished() { 77 lock (jobStatLock) { 78 jobsFinished++; 79 } 80 } 81 82 public static void IncrementJobsFailed() { 83 lock (jobStatLock) { 84 jobsFailed++; 85 } 86 } 87 88 public static void IncrementJobsAborted() { 89 lock (jobStatLock) { 90 jobsAborted++; 91 } 92 } 93 94 public static void IncrementJobsFetched() { 95 lock (jobStatLock) { 96 jobsFetched++; 97 } 98 } 99 100 public static void IncrementUsedCores(int val) { 101 lock (coreLock) { 102 usedCores += val; 103 } 104 } 105 106 public static void DecrementUsedCores(int val) { 107 lock (coreLock) { 108 usedCores -= val; 109 } 110 } 30 111 } 31 112 } -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/StatusCommons.cs
r6033 r6203 43 43 public int JobsFetched { get; set; } 44 44 [DataMember] 45 public int Jobs Done{ get; set; }45 public int JobsFinished { get; set; } 46 46 [DataMember] 47 47 public int JobsAborted { get; set; } 48 [DataMember] 49 public int JobsFailed { get; set; } 48 50 [DataMember] 49 51 public List<JobStatus> Jobs { get; set; } 50 52 51 53 public override string ToString() { 52 return string.Format("Status: {0}, Fetched/ Done/Aborted: {1},{2},{3}", Status, JobsFetched, JobsDone, JobsAborted);54 return string.Format("Status: {0}, Fetched/Finished/Aborted/Failed: {1},{2},{3},{4}", Status, JobsFetched, JobsFinished, JobsAborted, JobsFailed); 53 55 } 56 57 54 58 } 55 59 }
Note: See TracChangeset
for help on using the changeset viewer.