Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Executor.cs @ 6269

Last change on this file since 6269 was 6203, checked in by ascheibe, 13 years ago

#1233

  • dropped dependency of Core from Executor
  • enabled sandboxing
  • moved most parts of Job handling from Core to SlaveJob to simplify locking
  • optimized how UsedCores is handled
  • SlaveStatusInfo is now thread-save and counts jobs more correct
File size: 9.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Common;
27using HeuristicLab.Core;
28using HeuristicLab.Hive;
29using HeuristicLab.PluginInfrastructure;
30
31
32namespace HeuristicLab.Clients.Hive.SlaveCore {
33  public class Executor : MarshalByRefObject, IDisposable {
34    public Guid JobId { get; set; }
35    public IJob Job { get; set; }
36    public int CoresNeeded { get; set; }
37    public int MemoryNeeded { get; set; }
38    private bool wasJobAborted = false;
39    public Core Core { get; set; }
40    private Semaphore pauseStopSem = new Semaphore(0, 1);
41    private Semaphore startJobSem = new Semaphore(0, 1);
42    //make pause or stop wait until start is finished
43    private Semaphore jobStartedSem = new Semaphore(0, 1);
44
45    public ExecutorQueue executorQueue;
46
47    public bool SendHeartbeatForExecutor { get; set; }
48
49    public bool Aborted { get; set; }
50
51    public DateTime CreationTime { get; set; }
52
53    private Exception currentException;
54    public String CurrentException {
55      get {
56        if (currentException != null) {
57          return currentException.ToString();
58        } else {
59          return string.Empty;
60        }
61      }
62    }
63
64    public ExecutionState ExecutionState {
65      get {
66        return Job != null ? Job.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped;
67      }
68    }
69
70    public TimeSpan ExecutionTime {
71      get {
72        return Job != null ? Job.ExecutionTime : new TimeSpan(0, 0, 0);
73      }
74    }
75
76    public Executor() {
77      SendHeartbeatForExecutor = true;
78      executorQueue = new ExecutorQueue();
79    }
80
81    /// <param name="serializedJob"></param>
82    /// <param name="collectChildJobs">if true, all child-jobs are downloaded and the job will be resumed.</param>
83    public void Start(byte[] serializedJob) {
84      try {
85        CreationTime = DateTime.Now;
86        Aborted = false;
87        Job = PersistenceUtil.Deserialize<IJob>(serializedJob);
88
89        RegisterJobEvents();
90
91        if (Job.CollectChildJobs) {
92          IEnumerable<JobData> childjobs = WcfService.Instance.GetChildJobs(JobId);
93          Job.Resume(childjobs.Select(j => PersistenceUtil.Deserialize<IJob>(j.Data)));
94        } else {
95          Job.Start();
96          if (!jobStartedSem.WaitOne(TimeSpan.FromSeconds(15))) {
97            throw new TimeoutException("Timeout when starting the job. JobStarted event was not fired.");
98          }
99          jobStartedSem.Release();
100        }
101      }
102      catch (Exception e) {
103        this.currentException = e;
104        Job_JobFailed(this, new EventArgs<Exception>(e));
105      }
106    }
107
108    public void Pause() {
109      SendHeartbeatForExecutor = false;
110      // wait until job is started. if this does not happen, the Job is null an we give up
111      jobStartedSem.WaitOne(TimeSpan.FromSeconds(15));
112      if (Job == null) {
113        currentException = new Exception("Pausing job " + this.JobId + ": Job is null");
114        return;
115      }
116
117      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
118        try {
119          Job.Pause();
120          //we need to block the pause...
121          pauseStopSem.WaitOne();
122        }
123        catch (Exception ex) {
124          currentException = new Exception("Error pausing job " + this.JobId + ": " + ex.ToString());
125        }
126      }
127    }
128
129    public void Stop() {
130      SendHeartbeatForExecutor = false;
131      // wait until job is started. if this does not happen, the Job is null an we give up
132      jobStartedSem.WaitOne(TimeSpan.FromSeconds(15));
133      if (Job == null) {
134        currentException = new Exception("Stopping job " + this.JobId + ": Job is null");
135      }
136      wasJobAborted = true;
137
138      if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) {
139        try {
140          Job.Stop();
141          pauseStopSem.WaitOne();
142        }
143        catch (Exception ex) {
144          currentException = new Exception("Error stopping job " + this.JobId + ": " + ex.ToString());
145        }
146      }
147    }
148
149    private void RegisterJobEvents() {
150      Job.JobStopped += new EventHandler(Job_JobStopped);
151      Job.JobFailed += new EventHandler(Job_JobFailed);
152      Job.NewChildJob += new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
153      Job.WaitForChildJobs += new EventHandler(Job_WaitForChildJobs);
154      Job.DeleteChildJobs += new EventHandler(Job_DeleteChildJobs);
155      Job.JobPaused += new EventHandler(Job_JobPaused);
156      Job.JobStarted += new EventHandler(Job_JobStarted);
157    }
158
159    private void DeregisterJobEvents() {
160      Job.JobStopped -= new EventHandler(Job_JobStopped);
161      Job.JobFailed -= new EventHandler(Job_JobFailed);
162      Job.NewChildJob -= new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
163      Job.WaitForChildJobs -= new EventHandler(Job_WaitForChildJobs);
164      Job.DeleteChildJobs -= new EventHandler(Job_DeleteChildJobs);
165      Job.JobPaused -= new EventHandler(Job_JobPaused);
166      Job.JobStarted -= new EventHandler(Job_JobStarted);
167    }
168
169    private List<Guid> FindPluginsNeeded(IJob obj) {
170      List<Guid> guids = new List<Guid>();
171      foreach (IPluginDescription desc in PluginUtil.GetDeclaringPlugins(obj)) {
172      }
173      throw new NotImplementedException("FindPluginsNeeded for Job_NewChildJob");
174
175      return guids;
176    }
177
178    private void Job_NewChildJob(object sender, EventArgs<IJob> e) {
179      JobData childJobData = new JobData();
180      childJobData.Data = PersistenceUtil.Serialize(e.Value);
181
182      Job childJob = new Job();
183      childJob.CoresNeeded = 1;
184      childJob.MemoryNeeded = 0;
185      childJob.PluginsNeededIds = FindPluginsNeeded(e.Value);
186
187      ExecutorMessage msg = new ExecutorMessage(ExecutorMessageType.NewChildJob);
188      msg.MsgData = childJobData;
189      msg.MsgJob = childJob;
190
191      executorQueue.AddMessage(msg);
192    }
193
194    private void Job_WaitForChildJobs(object sender, EventArgs e) {
195      // Pause the job and send it back to the hive. The server will awake it when all child-jobs are finished
196      this.Job.CollectChildJobs = true;
197
198      JobData jdata = new JobData();
199      jdata.Data = PersistenceUtil.Serialize(Job);
200      jdata.JobId = this.JobId;
201
202      ExecutorMessage msg = new ExecutorMessage(ExecutorMessageType.WaitForChildJobs);
203      msg.MsgData = jdata;
204      executorQueue.AddMessage(msg);
205    }
206
207    private void Job_DeleteChildJobs(object sender, EventArgs e) {
208      executorQueue.AddMessage(ExecutorMessageType.DeleteChildJobs);
209    }
210
211    private void Job_JobFailed(object sender, EventArgs e) {
212      HeuristicLab.Common.EventArgs<Exception> ex = (HeuristicLab.Common.EventArgs<Exception>)e;
213      currentException = ex.Value;
214      Aborted = true;
215
216      executorQueue.AddMessage(ExecutorMessageType.JobFailed);
217    }
218
219    private void Job_JobStopped(object sender, EventArgs e) {
220      if (wasJobAborted) {
221        pauseStopSem.Release();
222        Aborted = true;
223      } else {
224        //it's a clean and finished job, so send it
225        executorQueue.AddMessage(ExecutorMessageType.JobStopped);
226      }
227    }
228
229    public JobData GetFinishedJob() {
230      if (Job == null) {
231        if (currentException == null) {
232          currentException = new Exception("Getting finished job " + this.JobId + ": Job is null");
233          return GetJob();
234        }
235      }
236
237      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
238        try {
239          Job.Stop();
240          wasJobAborted = true;
241          pauseStopSem.WaitOne();
242        }
243        catch (Exception ex) {
244          currentException = new Exception("Error getting finished job " + this.JobId + ": " + ex.ToString());
245        }
246      }
247
248      return GetJob();
249    }
250
251    public JobData GetPausedJob() {
252      if (Job.ExecutionState != HeuristicLab.Core.ExecutionState.Paused) {
253        throw new Exception("Executor: Job has to be paused before fetching results.");
254      }
255      return GetJob();
256    }
257
258    private void Job_JobPaused(object sender, EventArgs e) {
259      pauseStopSem.Release();
260    }
261
262    void Job_JobStarted(object sender, EventArgs e) {
263      jobStartedSem.Release();
264    }
265
266    private JobData GetJob() {
267      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
268        throw new InvalidStateException("Job is still running");
269      } else {
270        JobData jdata = new JobData();
271        if (Job == null) {
272          //send empty job and save exception
273          jdata.Data = PersistenceUtil.Serialize(new JobData());
274          if (currentException == null) {
275            currentException = new Exception("Job with id " + this.JobId + " is null, sending empty job");
276          }
277        } else {
278          jdata.Data = PersistenceUtil.Serialize(Job);
279        }
280        jdata.JobId = JobId;
281        return jdata;
282      }
283    }
284
285    public void Dispose() {
286      if (Job != null)
287        DeregisterJobEvents();
288      Job = null;
289    }
290  }
291}
Note: See TracBrowser for help on using the repository browser.