Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/SlaveJob.cs @ 6360

Last change on this file since 6360 was 6357, checked in by cneumuel, 13 years ago

#1233

  • refactoring of slave core
  • created JobManager, which is responsible for managing jobs without knowing anything about the service. this class is easier testable than slave core
  • lots of cleanup
  • created console test project for slave
File size: 8.8 KB
Line 
1
2using System;
3using System.IO;
4using System.Threading;
5using HeuristicLab.Common;
6using HeuristicLab.Core;
7using HeuristicLab.Hive;
8using HeuristicLab.PluginInfrastructure.Sandboxing;
9
10namespace HeuristicLab.Clients.Hive.SlaveCore {
11
12  public class SlaveJob : MarshalByRefObject {
13    private Executor executor;
14    private AppDomain appDomain;
15    private Semaphore waitForStartBeforeKillSem;
16    private bool executorMonitoringRun;
17    private Thread executorMonitoringThread;
18    private PluginManager pluginManager;
19    private ILog log;
20
21    public Guid JobId { get; private set; }
22    public bool IsPrepared { get; private set; }
23
24    private int coresNeeded;
25    public int CoresNeeded {
26      get { return coresNeeded; }
27      set { this.coresNeeded = value; }
28    }
29
30    public TimeSpan ExecutionTime {
31      get { return executor != null ? executor.ExecutionTime : TimeSpan.Zero; }
32    }
33
34    public SlaveJob(PluginManager pluginManager, int coresNeeded, ILog log) {
35      this.pluginManager = pluginManager;
36      this.coresNeeded = coresNeeded;
37      this.log = log;
38      waitForStartBeforeKillSem = new Semaphore(0, 1);
39      executorMonitoringRun = true;
40      IsPrepared = false;
41    }
42
43    public void StartJobAsync(Job job, JobData jobData) {
44      try {
45        this.JobId = job.Id;
46        Prepare(job);
47        StartJobInAppDomain(jobData);
48      }
49      catch (Exception) {
50        DisposeAppDomain(); // make sure to clean up if something went wrong
51        throw;
52      }
53    }
54
55    public void PauseJob() {
56      if (!IsPrepared) throw new AppDomainNotCreatedException();
57      if (!executor.IsPausing && !executor.IsStopping) executor.Pause();
58    }
59
60    public void StopJob() {
61      if (!IsPrepared) throw new AppDomainNotCreatedException();
62      if (!executor.IsPausing && !executor.IsStopping) executor.Stop();
63    }
64
65    private void Prepare(Job job) {
66      string pluginDir = Path.Combine(pluginManager.PluginTempBaseDir, job.Id.ToString());
67      string configFileName;
68      pluginManager.PreparePlugins(job, out configFileName);
69      appDomain = CreateAppDomain(job, pluginDir, configFileName);
70      IsPrepared = true;
71    }
72
73    private AppDomain CreateAppDomain(Job job, String pluginDir, string configFileName) {
74      appDomain = SandboxManager.CreateAndInitPrivilegedSandbox(job.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
75      appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
76
77      log.LogMessage("Creating AppDomain");
78      executor = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
79
80      executor.JobId = job.Id;
81      executor.CoresNeeded = job.CoresNeeded;
82      executor.MemoryNeeded = job.MemoryNeeded;
83      return appDomain;
84    }
85
86    private void StartJobInAppDomain(JobData jobData) {
87      executor.Start(jobData.Data);
88      waitForStartBeforeKillSem.Release();
89      StartExecutorMonitoringThread();
90    }
91
92    /// <summary>
93    /// Kill a appdomain with a specific id.
94    /// </summary>
95    /// <param name="JobId">the GUID of the job</param>   
96    public void DisposeAppDomain() {
97      log.LogMessage(string.Format("Shutting down Appdomain for Job {0}", JobId));
98      StopExecutorMonitoringThread();
99
100      if (executor != null) {
101        executor.Dispose();
102      }
103
104      if (appDomain != null) {
105        appDomain.UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
106        int repeat = 5;
107        while (repeat > 0) {
108          try {
109            waitForStartBeforeKillSem.WaitOne();
110            AppDomain.Unload(appDomain);
111            waitForStartBeforeKillSem.Dispose();
112            repeat = 0;
113          }
114          catch (CannotUnloadAppDomainException) {
115            log.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
116            Thread.Sleep(1000);
117            repeat--;
118            if (repeat == 0) {
119              throw; // rethrow and let app crash
120            }
121          }
122        }
123      }
124      pluginManager.DeletePluginsForJob(JobId);
125      GC.Collect();
126    }
127
128    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
129      DisposeAppDomain();
130      OnExceptionOccured(new Exception("Unhandled exception: " +e.ExceptionObject.ToString()));
131    }
132
133    public JobData GetJobData() {
134      return executor.GetJobData();
135    }
136
137    #region ExecutorMonitorThread
138    private void StartExecutorMonitoringThread() {
139      executorMonitoringThread = new Thread(MonitorExecutor);
140      executorMonitoringThread.Start();
141    }
142
143    private void StopExecutorMonitoringThread() {
144      if (executorMonitoringThread != null) {
145        if (executorMonitoringRun) {
146          executorMonitoringRun = false;
147          executor.executorQueue.AddMessage(ExecutorMessageType.StopExecutorMonitoringThread);
148        }
149      }
150    }
151
152    /// <summary>
153    /// Because the executor is in an appdomain and is not able to call back
154    /// (because of security -> lease time for marshall-by-ref object is 5 min),
155    /// we have to poll the executor for events we have to react to (e.g. job finished...)   
156    /// </summary>
157    private void MonitorExecutor() {
158      while (executorMonitoringRun) {
159        //this blocks through the appdomain border, that's why the lease gets renewed
160        ExecutorMessage message = executor.executorQueue.GetMessage();
161
162        switch (message.MessageType) {
163          case ExecutorMessageType.JobStarted:
164            OnJobStarted();
165            break;
166
167          case ExecutorMessageType.JobPaused:
168            executorMonitoringRun = false;
169            OnJobPaused();
170            DisposeAppDomain();
171            break;
172
173          case ExecutorMessageType.JobStopped:
174            executorMonitoringRun = false;
175            OnJobStopped();
176            DisposeAppDomain();
177            break;
178
179          case ExecutorMessageType.JobFailed:
180            executorMonitoringRun = false;
181            OnJobFailed(new JobFailedException(executor.CurrentException));
182            DisposeAppDomain();
183            break;
184
185          case ExecutorMessageType.NewChildJob:
186            OnNewChildJob(((JobExecutorMessage)message).Job);
187            break;
188
189          case ExecutorMessageType.WaitForChildJobs:
190            executorMonitoringRun = false;
191            OnWaitForChildJobs();
192            DisposeAppDomain();
193            break;
194
195          case ExecutorMessageType.DeleteChildJobs:
196            OnDeleteChildJobs();
197            break;
198
199          case ExecutorMessageType.StopExecutorMonitoringThread:
200            executorMonitoringRun = false;
201            return;
202        }
203      }
204    }
205    #endregion
206
207    public event EventHandler<EventArgs<Guid>> JobStarted;
208    private void OnJobStarted() {
209      var handler = JobStarted;
210      if (handler != null) handler(this, new EventArgs<Guid>(this.JobId));
211    }
212
213    public event EventHandler<EventArgs<Guid>> JobStopped;
214    private void OnJobStopped() {
215      var handler = JobStopped;
216      if (handler != null) handler(this, new EventArgs<Guid>(this.JobId));
217    }
218
219    public event EventHandler<EventArgs<Guid>> JobPaused;
220    private void OnJobPaused() {
221      var handler = JobPaused;
222      if (handler != null) handler(this, new EventArgs<Guid>(this.JobId));
223    }
224
225    public event EventHandler<EventArgs<Guid>> JobAborted;
226    private void OnJobAborted() {
227      var handler = JobAborted;
228      if (handler != null) handler(this, new EventArgs<Guid>(this.JobId));
229    }
230
231    public event EventHandler<EventArgs<Guid, Exception>> JobFailed;
232    private void OnJobFailed(Exception exception) {
233      var handler = JobFailed;
234      if (handler != null) handler(this, new EventArgs<Guid, Exception>(this.JobId, exception));
235    }
236
237    public event EventHandler<EventArgs<Guid, Exception>> ExceptionOccured;
238    private void OnExceptionOccured(Exception exception) {
239      var handler = ExceptionOccured;
240      if (handler != null) handler(this, new EventArgs<Guid, Exception>(this.JobId, exception));
241    }
242
243    public event EventHandler<EventArgs<Guid, IJob>> NewChildJob;
244    private void OnNewChildJob(IJob job) {
245      var handler = NewChildJob;
246      if (handler != null) handler(this, new EventArgs<Guid, IJob>(this.JobId, job));
247    }
248
249    public event EventHandler<EventArgs<Guid>> WaitForChildJobs;
250    private void OnWaitForChildJobs() {
251      var handler = WaitForChildJobs;
252      if (handler != null) handler(this, new EventArgs<Guid>(this.JobId));
253    }
254
255    public event EventHandler<EventArgs<Guid>> DeleteChildJobs;
256    private void OnDeleteChildJobs() {
257      var handler = DeleteChildJobs;
258      if (handler != null) handler(this, new EventArgs<Guid>(this.JobId));
259    }
260  }
261}
Note: See TracBrowser for help on using the repository browser.