Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/SlaveJob.cs @ 6248

Last change on this file since 6248 was 6248, checked in by ascheibe, 13 years ago

#1233

  • don't set job failed if JobNotFoundException is thrown
  • disable AboutView for all items
  • avoid NullRefException in SendFinishedJob
File size: 13.5 KB
Line 
1
2using System;
3using System.IO;
4using System.Threading;
5using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
6
7namespace HeuristicLab.Clients.Hive.SlaveCore {
8
9  public class SlaveJob : MarshalByRefObject {
10    private Executor executor;
11    private AppDomain appDomain;
12    private Semaphore waitForStartBeforeKillSem;
13    private bool executorMonitoringRun;
14    private Thread executorMonitoringThread;
15    private Core core;
16    private bool finished;
17    private int coresNeeded;
18
19    private ISlaveCommunication clientCom;
20    private WcfService wcfService;
21
22    public Guid JobId;
23    public Executor JobExecutor { get { return executor; } }
24    public bool Finished { get { return finished; } }
25
26
27    public SlaveJob(Core core) {
28      clientCom = SlaveClientCom.Instance.ClientCom;
29      wcfService = WcfService.Instance;
30      waitForStartBeforeKillSem = new Semaphore(0, 1);
31      executorMonitoringRun = true;
32      this.core = core;
33      finished = false;
34    }
35
36    public void PrepareJob(Guid jobId) {
37      JobId = jobId;
38      Job job = wcfService.GetJob(jobId);
39      if (job == null) throw new JobNotFoundException(jobId);
40      coresNeeded = job.CoresNeeded;
41      SlaveStatusInfo.IncrementUsedCores(coresNeeded);
42    }
43
44    public void CalculateJob() {
45      Job job = wcfService.GetJob(JobId);
46      if (job == null) throw new JobNotFoundException(JobId);
47
48      JobData jobData = wcfService.GetJobData(job.Id);
49      if (jobData == null) throw new JobDataNotFoundException(JobId);
50      SlaveStatusInfo.IncrementJobsFetched();
51      job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
52      if (job == null) throw new JobNotFoundException(JobId);
53      StartJobInAppDomain(job, jobData);
54    }
55
56    public void StopJob() {
57      if (executor == null) {
58        clientCom.LogMessage(string.Format("StopJob: job with id {0} is missing the executor", JobId));
59      } else {
60        Job job = wcfService.GetJob(JobId);
61
62        if (job != null) {
63          executor.Stop();
64
65          try {
66            JobData sJob = executor.GetFinishedJob();
67            job.ExecutionTime = executor.ExecutionTime;
68
69            if (executor.CurrentException != string.Empty) {
70              wcfService.UpdateJobState(job.Id, JobState.Failed, executor.CurrentException);
71            }
72            SlaveStatusInfo.IncrementJobsAborted();
73
74            clientCom.LogMessage(string.Format("Sending the stopped job with id: {0}", job.Id));
75            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Aborted);
76          }
77          catch (Exception e) {
78            clientCom.LogMessage(string.Format("Transmitting the stopped job with id {0} to server failed. Exception is: {1}", job.Id, e.ToString()));
79          }
80          finally {
81            KillAppDomain(); // kill appdomain in every case         
82          }
83        }
84      }
85    }
86
87    public void PauseJob() {
88      if (executor == null) {
89        clientCom.LogMessage("PauseJob: Can't pause job with uninitialized executor");
90      } else {
91        Job job = wcfService.GetJob(JobId);
92
93        if (job != null) {
94          executor.Pause();
95          JobData sJob = executor.GetPausedJob();
96          job.ExecutionTime = executor.ExecutionTime;
97
98          try {
99            if (executor.CurrentException != string.Empty) {
100              wcfService.UpdateJobState(job.Id, JobState.Failed, executor.CurrentException);
101              SlaveStatusInfo.IncrementJobsFailed();
102            } else {
103              SlaveStatusInfo.IncrementJobsFinished();
104            }
105            clientCom.LogMessage("Sending the paused job with id: " + job.Id);
106            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
107          }
108          catch (Exception e) {
109            clientCom.LogMessage(string.Format("Transmitting the paused job with id {0} to server failed. Exception is: {1}", job.Id, e.ToString()));
110          }
111          finally {
112            KillAppDomain(); // kill appdomain in every case         
113          }
114        }
115      }
116    }
117
118    /// <summary>
119    /// Pauses a job, which means sending it to the server and killing it locally;
120    /// atm only used when executor is waiting for child jobs
121    /// </summary>
122    public void PauseWaitJob(JobData data) {
123      try {
124        if (executor == null) {
125          clientCom.LogMessage(string.Format("PauseWaitJob: Can't pause job with id {0} with uninitialized executor", JobId));
126        } else {
127          Job job = wcfService.GetJob(data.JobId);
128          wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
129          wcfService.UpdateJobState(job.Id, JobState.Waiting, null);
130        }
131      }
132      catch (Exception ex) {
133        clientCom.LogMessage(string.Format("Pausing job with id {0} failed. Exception: {1}", JobId, ex.ToString()));
134      }
135      finally {
136        KillAppDomain();
137      }
138    }
139
140    /// <summary>
141    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
142    /// once the connection gets reestablished, the job gets submitted
143    /// </summary>
144    public void SendFinishedJob() {
145      try {
146        clientCom.LogMessage(string.Format("Getting the finished job with id: {0} ", JobId));
147        if (executor == null) {
148          clientCom.LogMessage(string.Format("SendFinishedJob: Can't pause job with id {0} with uninitialized executor", JobId));
149          return;
150        }
151
152        Job job = wcfService.GetJob(JobId);
153        job.ExecutionTime = executor.ExecutionTime;
154
155        if (executor.Aborted) {
156          SlaveStatusInfo.IncrementJobsAborted();
157        } else {
158          SlaveStatusInfo.IncrementJobsFinished();
159        }
160
161        if (executor.CurrentException != string.Empty) {
162          wcfService.UpdateJobState(JobId, JobState.Failed, executor.CurrentException);
163        }
164
165        try {
166          JobData sJob = executor.GetFinishedJob();
167          if (sJob != null) {
168            clientCom.LogMessage(string.Format("Sending the finished job with id: {0}", JobId));
169            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
170          }
171        }
172        catch (Exception e) {
173          clientCom.LogMessage(string.Format("Transmitting the job with id {0} to server failed. Exception is: {1}", job.Id, e.ToString()));
174        }
175        finally {
176          KillAppDomain();
177          Core.HBManager.AwakeHeartBeatThread();
178        }
179      }
180      catch (Exception e) {
181        clientCom.LogMessage(string.Format("SendFinishedJob: The following exception has been thrown: {0}", e.ToString()));
182      }
183    }
184
185    /// <summary>
186    /// A new Job from the wcfService has been received and will be started within a AppDomain.
187    /// </summary>   
188    private void StartJobInAppDomain(Job job, JobData jobData) {
189      JobId = job.Id;
190
191      clientCom.LogMessage(string.Format("Received new job with id {0}", job.Id));
192      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
193
194      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, job.Id.ToString());
195      bool pluginsPrepared = false;
196      string configFileName = string.Empty;
197
198      try {
199        PluginCache.Instance.PreparePlugins(job, out configFileName);
200        clientCom.LogMessage(string.Format("Plugins fetched for job {0}", job.Id));
201        pluginsPrepared = true;
202      }
203      catch (Exception exception) {
204        clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", job.Id, exception));
205        wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
206        SlaveStatusInfo.IncrementJobsFailed();
207        core.RemoveSlaveJobFromList(JobId);
208        finished = true;
209        SlaveStatusInfo.DecrementUsedCores(coresNeeded);
210      }
211
212      if (pluginsPrepared) {
213        try {
214          appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitPrivilegedSandbox(job.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
215          appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
216
217          clientCom.LogMessage("Creating AppDomain");
218          executor = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
219          clientCom.LogMessage("Created AppDomain");
220
221          executor.JobId = job.Id;
222          executor.CoresNeeded = job.CoresNeeded;
223          executor.MemoryNeeded = job.MemoryNeeded;
224          clientCom.LogMessage(string.Format("Starting Executor for job {0}", job.Id));
225
226          executor.Start(jobData.Data);
227          waitForStartBeforeKillSem.Release();
228
229          StartExecutorMonitoringThread();
230        }
231        catch (Exception exception) {
232          clientCom.LogMessage(string.Format("Creating the Appdomain and loading the job failed for job {0}", job.Id));
233          clientCom.LogMessage(string.Format("Error thrown is: {0}", exception.ToString()));
234
235          if (executor != null && executor.CurrentException != string.Empty) {
236            wcfService.UpdateJobState(job.Id, JobState.Failed, executor.CurrentException);
237          } else {
238            wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
239          }
240          SlaveStatusInfo.IncrementJobsFailed();
241
242          KillAppDomain();
243        }
244      }
245      Core.HBManager.AwakeHeartBeatThread();
246    }
247
248    /// <summary>
249    /// Kill a appdomain with a specific id.
250    /// </summary>
251    /// <param name="JobId">the GUID of the job</param>   
252    public void KillAppDomain() {
253      clientCom.LogMessage(string.Format("Shutting down Appdomain for Job {0}", JobId));
254
255      try {
256        StopExecutorMonitoringThread();
257        finished = true;
258        SlaveStatusInfo.DecrementUsedCores(coresNeeded);
259
260        if (executor != null) {
261          executor.Dispose();
262        }
263
264        if (appDomain != null) {
265          appDomain.UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
266          int repeat = 5;
267          while (repeat > 0) {
268            try {
269              waitForStartBeforeKillSem.WaitOne();
270              AppDomain.Unload(appDomain);
271              waitForStartBeforeKillSem.Dispose();
272              repeat = 0;
273            }
274            catch (CannotUnloadAppDomainException) {
275              clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
276              Thread.Sleep(1000);
277              repeat--;
278              if (repeat == 0) {
279                clientCom.LogMessage("Could not unload AppDomain, shutting down core...");
280                throw; // rethrow and let app crash
281              }
282            }
283          }
284        }
285
286        PluginCache.Instance.DeletePluginsForJob(JobId);
287        GC.Collect();
288      }
289      catch (Exception ex) {
290        clientCom.LogMessage(string.Format("Exception when unloading the appdomain: {0}", ex.ToString()));
291      }
292      finally {
293        core.RemoveSlaveJobFromList(JobId);
294      }
295
296      GC.Collect();
297      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
298    }
299
300    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
301      clientCom.LogMessage(string.Format("Exception in AppDomain: ", e.ExceptionObject.ToString()));
302      KillAppDomain();
303    }
304
305
306    #region ExecutorMonitorThread
307
308    private void StartExecutorMonitoringThread() {
309      executorMonitoringThread = new Thread(MonitorExecutor);
310      executorMonitoringThread.Start();
311    }
312
313    private void StopExecutorMonitoringThread() {
314      if (executorMonitoringThread != null) {
315        if (executorMonitoringRun) {
316          executorMonitoringRun = false;
317          executor.executorQueue.AddMessage(ExecutorMessageType.StopExecutorMonitoringThread);
318        }
319      }
320    }
321
322    /// <summary>
323    /// Because the executor is in an appdomain and is not able to call back
324    /// (because of security -> lease time for marshall-by-ref object is 5 min),
325    /// we have to poll the executor for events we have to react to (e.g. job finished...)   
326    /// </summary>
327    private void MonitorExecutor() {
328      while (executorMonitoringRun) {
329        //this blocks through the appdomain border, that's why the lease gets renewed
330        ExecutorMessage message = executor.executorQueue.GetMessage();
331
332        switch (message.MessageType) {
333          case ExecutorMessageType.JobStopped:
334            executorMonitoringRun = false;
335            SendFinishedJob();
336            break;
337
338          case ExecutorMessageType.JobFailed:
339            executorMonitoringRun = false;
340            SendFinishedJob();
341            break;
342
343          case ExecutorMessageType.NewChildJob:
344            WcfService.Instance.AddChildJob(JobId, message.MsgJob, message.MsgData);
345            break;
346
347          case ExecutorMessageType.WaitForChildJobs:
348            executorMonitoringRun = false;
349            PauseWaitJob(message.MsgData);
350            break;
351
352          case ExecutorMessageType.DeleteChildJobs:
353            WcfService.Instance.DeleteChildJobs(JobId);
354            break;
355
356          case ExecutorMessageType.StopExecutorMonitoringThread:
357            executorMonitoringRun = false;
358            return;
359        }
360      }
361    }
362    #endregion
363  }
364}
Note: See TracBrowser for help on using the repository browser.