1 |
|
---|
2 | using System;
|
---|
3 | using System.IO;
|
---|
4 | using System.Threading;
|
---|
5 | using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
|
---|
6 |
|
---|
7 | namespace HeuristicLab.Clients.Hive.SlaveCore {
|
---|
8 |
|
---|
9 | public class SlaveJob : MarshalByRefObject {
|
---|
10 | private Executor executor;
|
---|
11 | private AppDomain appDomain;
|
---|
12 | private Semaphore waitForStartBeforeKillSem;
|
---|
13 | private bool executorMonitoringRun;
|
---|
14 | private Thread executorMonitoringThread;
|
---|
15 | private Core core;
|
---|
16 | private bool finished;
|
---|
17 | private int coresNeeded;
|
---|
18 |
|
---|
19 | private ISlaveCommunication clientCom;
|
---|
20 | private WcfService wcfService;
|
---|
21 |
|
---|
22 | public Guid JobId;
|
---|
23 | public Executor JobExecutor { get { return executor; } }
|
---|
24 | public bool Finished { get { return finished; } }
|
---|
25 |
|
---|
26 |
|
---|
27 | public SlaveJob(Core core) {
|
---|
28 | clientCom = SlaveClientCom.Instance.ClientCom;
|
---|
29 | wcfService = WcfService.Instance;
|
---|
30 | waitForStartBeforeKillSem = new Semaphore(0, 1);
|
---|
31 | executorMonitoringRun = true;
|
---|
32 | this.core = core;
|
---|
33 | finished = false;
|
---|
34 | }
|
---|
35 |
|
---|
36 | public void PrepareJob(Guid jobId) {
|
---|
37 | JobId = jobId;
|
---|
38 | Job job = wcfService.GetJob(jobId);
|
---|
39 | if (job == null) throw new JobNotFoundException(jobId);
|
---|
40 | coresNeeded = job.CoresNeeded;
|
---|
41 | SlaveStatusInfo.IncrementUsedCores(coresNeeded);
|
---|
42 | }
|
---|
43 |
|
---|
44 | public void CalculateJob() {
|
---|
45 | Job job = wcfService.GetJob(JobId);
|
---|
46 | if (job == null) throw new JobNotFoundException(JobId);
|
---|
47 |
|
---|
48 | JobData jobData = wcfService.GetJobData(job.Id);
|
---|
49 | if (jobData == null) throw new JobDataNotFoundException(JobId);
|
---|
50 | SlaveStatusInfo.IncrementJobsFetched();
|
---|
51 | job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
|
---|
52 | if (job == null) throw new JobNotFoundException(JobId);
|
---|
53 | StartJobInAppDomain(job, jobData);
|
---|
54 | }
|
---|
55 |
|
---|
56 | public void StopJob() {
|
---|
57 | if (executor == null) {
|
---|
58 | clientCom.LogMessage(string.Format("StopJob: job with id {0} is missing the executor", JobId));
|
---|
59 | } else {
|
---|
60 | Job job = wcfService.GetJob(JobId);
|
---|
61 |
|
---|
62 | if (job != null) {
|
---|
63 | executor.Stop();
|
---|
64 |
|
---|
65 | try {
|
---|
66 | JobData sJob = executor.GetFinishedJob();
|
---|
67 | job.ExecutionTime = executor.ExecutionTime;
|
---|
68 |
|
---|
69 | if (executor.CurrentException != string.Empty) {
|
---|
70 | wcfService.UpdateJobState(job.Id, JobState.Failed, executor.CurrentException);
|
---|
71 | }
|
---|
72 | SlaveStatusInfo.IncrementJobsAborted();
|
---|
73 |
|
---|
74 | clientCom.LogMessage(string.Format("Sending the stopped job with id: {0}", job.Id));
|
---|
75 | wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Aborted);
|
---|
76 | }
|
---|
77 | catch (Exception e) {
|
---|
78 | clientCom.LogMessage(string.Format("Transmitting the stopped job with id {0} to server failed. Exception is: {1}", job.Id, e.ToString()));
|
---|
79 | }
|
---|
80 | finally {
|
---|
81 | KillAppDomain(); // kill appdomain in every case
|
---|
82 | }
|
---|
83 | }
|
---|
84 | }
|
---|
85 | }
|
---|
86 |
|
---|
87 | public void PauseJob() {
|
---|
88 | if (executor == null) {
|
---|
89 | clientCom.LogMessage("PauseJob: Can't pause job with uninitialized executor");
|
---|
90 | } else {
|
---|
91 | Job job = wcfService.GetJob(JobId);
|
---|
92 |
|
---|
93 | if (job != null) {
|
---|
94 | executor.Pause();
|
---|
95 | JobData sJob = executor.GetPausedJob();
|
---|
96 | job.ExecutionTime = executor.ExecutionTime;
|
---|
97 |
|
---|
98 | try {
|
---|
99 | if (executor.CurrentException != string.Empty) {
|
---|
100 | wcfService.UpdateJobState(job.Id, JobState.Failed, executor.CurrentException);
|
---|
101 | SlaveStatusInfo.IncrementJobsFailed();
|
---|
102 | } else {
|
---|
103 | SlaveStatusInfo.IncrementJobsFinished();
|
---|
104 | }
|
---|
105 | clientCom.LogMessage("Sending the paused job with id: " + job.Id);
|
---|
106 | wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
|
---|
107 | }
|
---|
108 | catch (Exception e) {
|
---|
109 | clientCom.LogMessage(string.Format("Transmitting the paused job with id {0} to server failed. Exception is: {1}", job.Id, e.ToString()));
|
---|
110 | }
|
---|
111 | finally {
|
---|
112 | KillAppDomain(); // kill appdomain in every case
|
---|
113 | }
|
---|
114 | }
|
---|
115 | }
|
---|
116 | }
|
---|
117 |
|
---|
118 | /// <summary>
|
---|
119 | /// Pauses a job, which means sending it to the server and killing it locally;
|
---|
120 | /// atm only used when executor is waiting for child jobs
|
---|
121 | /// </summary>
|
---|
122 | public void PauseWaitJob(JobData data) {
|
---|
123 | try {
|
---|
124 | if (executor == null) {
|
---|
125 | clientCom.LogMessage(string.Format("PauseWaitJob: Can't pause job with id {0} with uninitialized executor", JobId));
|
---|
126 | } else {
|
---|
127 | Job job = wcfService.GetJob(data.JobId);
|
---|
128 | wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
|
---|
129 | wcfService.UpdateJobState(job.Id, JobState.Waiting, null);
|
---|
130 | }
|
---|
131 | }
|
---|
132 | catch (Exception ex) {
|
---|
133 | clientCom.LogMessage(string.Format("Pausing job with id {0} failed. Exception: {1}", JobId, ex.ToString()));
|
---|
134 | }
|
---|
135 | finally {
|
---|
136 | KillAppDomain();
|
---|
137 | }
|
---|
138 | }
|
---|
139 |
|
---|
140 | /// <summary>
|
---|
141 | /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
|
---|
142 | /// once the connection gets reestablished, the job gets submitted
|
---|
143 | /// </summary>
|
---|
144 | public void SendFinishedJob() {
|
---|
145 | try {
|
---|
146 | clientCom.LogMessage(string.Format("Getting the finished job with id: {0} ", JobId));
|
---|
147 | if (executor == null) {
|
---|
148 | clientCom.LogMessage(string.Format("SendFinishedJob: Can't pause job with id {0} with uninitialized executor", JobId));
|
---|
149 | return;
|
---|
150 | }
|
---|
151 |
|
---|
152 | Job job = wcfService.GetJob(JobId);
|
---|
153 | job.ExecutionTime = executor.ExecutionTime;
|
---|
154 |
|
---|
155 | if (executor.Aborted) {
|
---|
156 | SlaveStatusInfo.IncrementJobsAborted();
|
---|
157 | } else {
|
---|
158 | SlaveStatusInfo.IncrementJobsFinished();
|
---|
159 | }
|
---|
160 |
|
---|
161 | if (executor.CurrentException != string.Empty) {
|
---|
162 | wcfService.UpdateJobState(JobId, JobState.Failed, executor.CurrentException);
|
---|
163 | }
|
---|
164 |
|
---|
165 | try {
|
---|
166 | JobData sJob = executor.GetFinishedJob();
|
---|
167 | if (sJob != null) {
|
---|
168 | clientCom.LogMessage(string.Format("Sending the finished job with id: {0}", JobId));
|
---|
169 | wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
|
---|
170 | }
|
---|
171 | }
|
---|
172 | catch (Exception e) {
|
---|
173 | clientCom.LogMessage(string.Format("Transmitting the job with id {0} to server failed. Exception is: {1}", job.Id, e.ToString()));
|
---|
174 | }
|
---|
175 | finally {
|
---|
176 | KillAppDomain();
|
---|
177 | Core.HBManager.AwakeHeartBeatThread();
|
---|
178 | }
|
---|
179 | }
|
---|
180 | catch (Exception e) {
|
---|
181 | clientCom.LogMessage(string.Format("SendFinishedJob: The following exception has been thrown: {0}", e.ToString()));
|
---|
182 | }
|
---|
183 | }
|
---|
184 |
|
---|
185 | /// <summary>
|
---|
186 | /// A new Job from the wcfService has been received and will be started within a AppDomain.
|
---|
187 | /// </summary>
|
---|
188 | private void StartJobInAppDomain(Job job, JobData jobData) {
|
---|
189 | JobId = job.Id;
|
---|
190 |
|
---|
191 | clientCom.LogMessage(string.Format("Received new job with id {0}", job.Id));
|
---|
192 | clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
|
---|
193 |
|
---|
194 | String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, job.Id.ToString());
|
---|
195 | bool pluginsPrepared = false;
|
---|
196 | string configFileName = string.Empty;
|
---|
197 |
|
---|
198 | try {
|
---|
199 | PluginCache.Instance.PreparePlugins(job, out configFileName);
|
---|
200 | clientCom.LogMessage(string.Format("Plugins fetched for job {0}", job.Id));
|
---|
201 | pluginsPrepared = true;
|
---|
202 | }
|
---|
203 | catch (Exception exception) {
|
---|
204 | clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", job.Id, exception));
|
---|
205 | wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
|
---|
206 | SlaveStatusInfo.IncrementJobsFailed();
|
---|
207 | core.RemoveSlaveJobFromList(JobId);
|
---|
208 | finished = true;
|
---|
209 | SlaveStatusInfo.DecrementUsedCores(coresNeeded);
|
---|
210 | }
|
---|
211 |
|
---|
212 | if (pluginsPrepared) {
|
---|
213 | try {
|
---|
214 | appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitPrivilegedSandbox(job.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
|
---|
215 | appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
|
---|
216 |
|
---|
217 | clientCom.LogMessage("Creating AppDomain");
|
---|
218 | executor = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
|
---|
219 | clientCom.LogMessage("Created AppDomain");
|
---|
220 |
|
---|
221 | executor.JobId = job.Id;
|
---|
222 | executor.CoresNeeded = job.CoresNeeded;
|
---|
223 | executor.MemoryNeeded = job.MemoryNeeded;
|
---|
224 | clientCom.LogMessage(string.Format("Starting Executor for job {0}", job.Id));
|
---|
225 |
|
---|
226 | executor.Start(jobData.Data);
|
---|
227 | waitForStartBeforeKillSem.Release();
|
---|
228 |
|
---|
229 | StartExecutorMonitoringThread();
|
---|
230 | }
|
---|
231 | catch (Exception exception) {
|
---|
232 | clientCom.LogMessage(string.Format("Creating the Appdomain and loading the job failed for job {0}", job.Id));
|
---|
233 | clientCom.LogMessage(string.Format("Error thrown is: {0}", exception.ToString()));
|
---|
234 |
|
---|
235 | if (executor != null && executor.CurrentException != string.Empty) {
|
---|
236 | wcfService.UpdateJobState(job.Id, JobState.Failed, executor.CurrentException);
|
---|
237 | } else {
|
---|
238 | wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
|
---|
239 | }
|
---|
240 | SlaveStatusInfo.IncrementJobsFailed();
|
---|
241 |
|
---|
242 | KillAppDomain();
|
---|
243 | }
|
---|
244 | }
|
---|
245 | Core.HBManager.AwakeHeartBeatThread();
|
---|
246 | }
|
---|
247 |
|
---|
248 | /// <summary>
|
---|
249 | /// Kill a appdomain with a specific id.
|
---|
250 | /// </summary>
|
---|
251 | /// <param name="JobId">the GUID of the job</param>
|
---|
252 | public void KillAppDomain() {
|
---|
253 | clientCom.LogMessage(string.Format("Shutting down Appdomain for Job {0}", JobId));
|
---|
254 |
|
---|
255 | try {
|
---|
256 | StopExecutorMonitoringThread();
|
---|
257 | finished = true;
|
---|
258 | SlaveStatusInfo.DecrementUsedCores(coresNeeded);
|
---|
259 |
|
---|
260 | if (executor != null) {
|
---|
261 | executor.Dispose();
|
---|
262 | }
|
---|
263 |
|
---|
264 | if (appDomain != null) {
|
---|
265 | appDomain.UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
|
---|
266 | int repeat = 5;
|
---|
267 | while (repeat > 0) {
|
---|
268 | try {
|
---|
269 | waitForStartBeforeKillSem.WaitOne();
|
---|
270 | AppDomain.Unload(appDomain);
|
---|
271 | waitForStartBeforeKillSem.Dispose();
|
---|
272 | repeat = 0;
|
---|
273 | }
|
---|
274 | catch (CannotUnloadAppDomainException) {
|
---|
275 | clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
|
---|
276 | Thread.Sleep(1000);
|
---|
277 | repeat--;
|
---|
278 | if (repeat == 0) {
|
---|
279 | clientCom.LogMessage("Could not unload AppDomain, shutting down core...");
|
---|
280 | throw; // rethrow and let app crash
|
---|
281 | }
|
---|
282 | }
|
---|
283 | }
|
---|
284 | }
|
---|
285 |
|
---|
286 | PluginCache.Instance.DeletePluginsForJob(JobId);
|
---|
287 | GC.Collect();
|
---|
288 | }
|
---|
289 | catch (Exception ex) {
|
---|
290 | clientCom.LogMessage(string.Format("Exception when unloading the appdomain: {0}", ex.ToString()));
|
---|
291 | }
|
---|
292 | finally {
|
---|
293 | core.RemoveSlaveJobFromList(JobId);
|
---|
294 | }
|
---|
295 |
|
---|
296 | GC.Collect();
|
---|
297 | clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
|
---|
298 | }
|
---|
299 |
|
---|
300 | private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
|
---|
301 | clientCom.LogMessage(string.Format("Exception in AppDomain: ", e.ExceptionObject.ToString()));
|
---|
302 | KillAppDomain();
|
---|
303 | }
|
---|
304 |
|
---|
305 |
|
---|
306 | #region ExecutorMonitorThread
|
---|
307 |
|
---|
308 | private void StartExecutorMonitoringThread() {
|
---|
309 | executorMonitoringThread = new Thread(MonitorExecutor);
|
---|
310 | executorMonitoringThread.Start();
|
---|
311 | }
|
---|
312 |
|
---|
313 | private void StopExecutorMonitoringThread() {
|
---|
314 | if (executorMonitoringThread != null) {
|
---|
315 | if (executorMonitoringRun) {
|
---|
316 | executorMonitoringRun = false;
|
---|
317 | executor.executorQueue.AddMessage(ExecutorMessageType.StopExecutorMonitoringThread);
|
---|
318 | }
|
---|
319 | }
|
---|
320 | }
|
---|
321 |
|
---|
322 | /// <summary>
|
---|
323 | /// Because the executor is in an appdomain and is not able to call back
|
---|
324 | /// (because of security -> lease time for marshall-by-ref object is 5 min),
|
---|
325 | /// we have to poll the executor for events we have to react to (e.g. job finished...)
|
---|
326 | /// </summary>
|
---|
327 | private void MonitorExecutor() {
|
---|
328 | while (executorMonitoringRun) {
|
---|
329 | //this blocks through the appdomain border, that's why the lease gets renewed
|
---|
330 | ExecutorMessage message = executor.executorQueue.GetMessage();
|
---|
331 |
|
---|
332 | switch (message.MessageType) {
|
---|
333 | case ExecutorMessageType.JobStopped:
|
---|
334 | executorMonitoringRun = false;
|
---|
335 | SendFinishedJob();
|
---|
336 | break;
|
---|
337 |
|
---|
338 | case ExecutorMessageType.JobFailed:
|
---|
339 | executorMonitoringRun = false;
|
---|
340 | SendFinishedJob();
|
---|
341 | break;
|
---|
342 |
|
---|
343 | case ExecutorMessageType.NewChildJob:
|
---|
344 | WcfService.Instance.AddChildJob(JobId, message.MsgJob, message.MsgData);
|
---|
345 | break;
|
---|
346 |
|
---|
347 | case ExecutorMessageType.WaitForChildJobs:
|
---|
348 | executorMonitoringRun = false;
|
---|
349 | PauseWaitJob(message.MsgData);
|
---|
350 | break;
|
---|
351 |
|
---|
352 | case ExecutorMessageType.DeleteChildJobs:
|
---|
353 | WcfService.Instance.DeleteChildJobs(JobId);
|
---|
354 | break;
|
---|
355 |
|
---|
356 | case ExecutorMessageType.StopExecutorMonitoringThread:
|
---|
357 | executorMonitoringRun = false;
|
---|
358 | return;
|
---|
359 | }
|
---|
360 | }
|
---|
361 | }
|
---|
362 | #endregion
|
---|
363 | }
|
---|
364 | }
|
---|