Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Executor.cs @ 6112

Last change on this file since 6112 was 6112, checked in by ascheibe, 13 years ago

#1233

  • HeartbeatManager: don't sleep while starting jobs
  • Executor: make Start() blocking
  • shutdown properly if an uncaught exception is thrown
File size: 8.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Common;
27using HeuristicLab.Core;
28using HeuristicLab.Hive;
29using HeuristicLab.PluginInfrastructure;
30
31
32namespace HeuristicLab.Clients.Hive.SlaveCore {
33  public class Executor : MarshalByRefObject, IDisposable {
34    public Guid JobId { get; set; }
35    public IJob Job { get; set; }
36    private bool wasJobAborted = false;
37    public Core Core { get; set; }
38    private Semaphore pauseStopSem = new Semaphore(0, 1);
39    private Semaphore startJobSem = new Semaphore(0, 1);
40
41    public bool SendHeartbeatForExecutor { get; set; }
42
43    public bool Aborted { get; set; }
44
45    public DateTime CreationTime { get; set; }
46
47    private Exception currentException;
48    public String CurrentException {
49      get {
50        if (currentException != null) {
51          return currentException.ToString();
52        } else {
53          return string.Empty;
54        }
55      }
56    }
57
58    public ExecutionState ExecutionState {
59      get {
60        return Job != null ? Job.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped;
61      }
62    }
63
64    public TimeSpan ExecutionTime {
65      get {
66        return Job != null ? Job.ExecutionTime : new TimeSpan(0, 0, 0);
67      }
68    }
69
70    public Executor() {
71      SendHeartbeatForExecutor = true;
72    }
73
74    /// <param name="serializedJob"></param>
75    /// <param name="collectChildJobs">if true, all child-jobs are downloaded and the job will be resumed.</param>
76    public void Start(byte[] serializedJob) {
77      try {
78        CreationTime = DateTime.Now;
79        Aborted = false;
80        Job = PersistenceUtil.Deserialize<IJob>(serializedJob);
81
82        RegisterJobEvents();
83
84        if (Job.CollectChildJobs) {
85          IEnumerable<JobData> childjobs = WcfService.Instance.GetChildJobs(JobId);
86          Job.Resume(childjobs.Select(j => PersistenceUtil.Deserialize<IJob>(j.Data)));
87        } else {
88          Job.Start();
89          startJobSem.WaitOne();
90        }
91      }
92      catch (Exception e) {
93        this.currentException = e;
94        Job_JobFailed(this, new HeuristicLab.Common.EventArgs<Exception>(e));
95      }
96    }
97
98    public void Pause() {
99      SendHeartbeatForExecutor = false;
100      if (Job == null) {
101        currentException = new Exception("Pausing job " + this.JobId + ": Job is null");
102        Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
103      }
104
105      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
106        try {
107          Job.Pause();
108          //we need to block the pause...
109          pauseStopSem.WaitOne();
110        }
111        catch (Exception ex) {
112          currentException = new Exception("Error pausing job " + this.JobId + ": " + ex.ToString());
113        }
114      }
115    }
116
117    public void Stop() {
118      SendHeartbeatForExecutor = false;
119      if (Job == null) {
120        currentException = new Exception("Stopping job " + this.JobId + ": Job is null");
121        Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
122      }
123      wasJobAborted = true;
124
125      if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) {
126        try {
127          Job.Stop();
128          pauseStopSem.WaitOne();
129        }
130        catch (Exception ex) {
131          currentException = new Exception("Error stopping job " + this.JobId + ": " + ex.ToString());
132        }
133      }
134    }
135
136    private void RegisterJobEvents() {
137      Job.JobStopped += new EventHandler(Job_JobStopped);
138      Job.JobFailed += new EventHandler(Job_JobFailed);
139      Job.NewChildJob += new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
140      Job.WaitForChildJobs += new EventHandler(Job_WaitForChildJobs);
141      Job.DeleteChildJobs += new EventHandler(Job_DeleteChildJobs);
142      Job.JobPaused += new EventHandler(Job_JobPaused);
143      Job.JobStarted += new EventHandler(Job_JobStarted);
144    }
145
146    private void DeregisterJobEvents() {
147      Job.JobStopped -= new EventHandler(Job_JobStopped);
148      Job.JobFailed -= new EventHandler(Job_JobFailed);
149      Job.NewChildJob -= new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
150      Job.WaitForChildJobs -= new EventHandler(Job_WaitForChildJobs);
151      Job.DeleteChildJobs -= new EventHandler(Job_DeleteChildJobs);
152      Job.JobPaused -= new EventHandler(Job_JobPaused);
153      Job.JobStarted -= new EventHandler(Job_JobStarted);
154    }
155
156    private List<Guid> FindPluginsNeeded(IJob obj) {
157      List<Guid> guids = new List<Guid>();
158      foreach (IPluginDescription desc in PluginUtil.GetDeclaringPlugins(obj)) {
159      }
160      throw new NotImplementedException("FindPluginsNeeded for Job_NewChildJob");
161
162      return guids;
163    }
164
165    private void Job_NewChildJob(object sender, EventArgs<IJob> e) {
166      JobData childJobData = new JobData();
167      childJobData.Data = PersistenceUtil.Serialize(e.Value);
168
169      Job childJob = new Job();
170      childJob.CoresNeeded = 1;
171      childJob.MemoryNeeded = 0;
172      childJob.PluginsNeededIds = FindPluginsNeeded(e.Value);
173
174      //TODO: is return value needed?
175      WcfService.Instance.AddChildJob(this.JobId, childJob, childJobData);
176    }
177
178    private void Job_WaitForChildJobs(object sender, EventArgs e) {
179      // Pause the job and send it back to the hive. The server will awake it when all child-jobs are finished
180      this.Job.CollectChildJobs = true;
181
182      JobData jdata = new JobData();
183      jdata.Data = PersistenceUtil.Serialize(Job);
184      jdata.JobId = this.JobId;
185
186      Core.PauseWaitJob(jdata);
187    }
188
189    private void Job_DeleteChildJobs(object sender, EventArgs e) {
190      WcfService.Instance.DeleteChildJobs(JobId);
191    }
192
193    private void Job_JobFailed(object sender, EventArgs e) {
194      HeuristicLab.Common.EventArgs<Exception> ex = (HeuristicLab.Common.EventArgs<Exception>)e;
195      currentException = ex.Value;
196      Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId);
197      Aborted = true;
198    }
199
200    private void Job_JobStopped(object sender, EventArgs e) {
201      if (wasJobAborted) {
202        pauseStopSem.Release();
203        Aborted = true;
204      } else {
205        //it's a clean and finished job, so send it
206        Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId);
207      }
208    }
209
210    public JobData GetFinishedJob() {
211      if (Job == null) {
212        if (currentException == null) {
213          currentException = new Exception("Getting finished job " + this.JobId + ": Job is null");
214        }
215        Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
216      }
217
218      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
219        try {
220          Job.Stop();
221          wasJobAborted = true;
222          pauseStopSem.WaitOne();
223        }
224        catch (Exception ex) {
225          currentException = new Exception("Error getting finished job " + this.JobId + ": " + ex.ToString());
226        }
227      }
228
229      return GetJob();
230    }
231
232
233    public JobData GetPausedJob() {
234      if (Job.ExecutionState != HeuristicLab.Core.ExecutionState.Paused) {
235        throw new Exception("Executor: Job has to be paused before fetching results.");
236      }
237      return GetJob();
238    }
239
240    private void Job_JobPaused(object sender, EventArgs e) {
241      pauseStopSem.Release();
242    }
243
244    void Job_JobStarted(object sender, EventArgs e) {
245      startJobSem.Release();
246    }
247
248    private JobData GetJob() {
249      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
250        throw new InvalidStateException("Job is still running");
251      } else {
252        JobData jdata = new JobData();
253        jdata.Data = PersistenceUtil.Serialize(Job);
254        jdata.JobId = JobId;
255        return jdata;
256      }
257    }
258
259    public void Dispose() {
260      if (Job != null)
261        DeregisterJobEvents();
262      Job = null;
263    }
264  }
265}
Note: See TracBrowser for help on using the repository browser.