Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Executor.cs @ 6004

Last change on this file since 6004 was 6004, checked in by ascheibe, 13 years ago

#1233

  • fix pause/stop bug when serializing big experiments
  • use proper newlines
  • use GetPlugin(..) instead of GetPlugins()
File size: 8.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Common;
27using HeuristicLab.Core;
28using HeuristicLab.Hive;
29using HeuristicLab.PluginInfrastructure;
30
31
32namespace HeuristicLab.Clients.Hive.SlaveCore {
33  public class Executor : MarshalByRefObject, IDisposable {
34    public Guid JobId { get; set; }
35    public IJob Job { get; set; }
36    private bool wasJobAborted = false;
37    public Core Core { get; set; }
38    private Semaphore pauseStopSem = new Semaphore(0, 1);
39
40    public bool SendHeartbeatForExecutor { get; set; }
41
42    public bool Aborted { get; set; }
43
44    public DateTime CreationTime { get; set; }
45
46    private Exception currentException;
47    public String CurrentException {
48      get {
49        if (currentException != null) {
50          return currentException.ToString();
51        } else {
52          return string.Empty;
53        }
54      }
55    }
56
57    public ExecutionState ExecutionState {
58      get {
59        return Job != null ? Job.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped;
60      }
61    }
62
63    public TimeSpan ExecutionTime {
64      get {
65        return Job != null ? Job.ExecutionTime : new TimeSpan(0, 0, 0);
66      }
67    }
68
69    public Executor() {
70      SendHeartbeatForExecutor = true;
71    }
72
73    /// <param name="serializedJob"></param>
74    /// <param name="collectChildJobs">if true, all child-jobs are downloaded and the job will be resumed.</param>
75    public void Start(byte[] serializedJob) {
76      try {
77        CreationTime = DateTime.Now;
78        Aborted = false;
79        Job = PersistenceUtil.Deserialize<IJob>(serializedJob);
80
81        RegisterJobEvents();
82
83        if (Job.CollectChildJobs) {
84          IEnumerable<JobData> childjobs = WcfService.Instance.GetChildJobs(JobId);
85          Job.Resume(childjobs.Select(j => PersistenceUtil.Deserialize<IJob>(j.Data)));
86        } else {
87          // Job.Prepare(); // do NOT prepare here, otherwise paused jobs get restarted
88          Job.Start();
89        }
90      }
91      catch (Exception e) {
92        this.currentException = e;
93        Job_JobFailed(this, new HeuristicLab.Common.EventArgs<Exception>(e));
94      }
95    }
96
97    public void Pause() {
98      SendHeartbeatForExecutor = false;
99      if (Job == null) {
100        SlaveClientCom.Instance.ClientCom.LogMessage("Pausing job: Job is null");
101        Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
102      }
103
104      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
105        try {
106          Job.Pause();
107          //we need to block the pause...
108          pauseStopSem.WaitOne();
109        }
110        catch (Exception ex) {
111          SlaveClientCom.Instance.ClientCom.LogMessage("Error pausing job:" + ex.ToString());
112        }
113      }
114    }
115
116    public void Stop() {
117      SendHeartbeatForExecutor = false;
118      if (Job == null) {
119        SlaveClientCom.Instance.ClientCom.LogMessage("Stopping job: Job is null");
120        Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
121      }
122      wasJobAborted = true;
123
124      if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) {
125        try {
126          Job.Stop();
127          pauseStopSem.WaitOne();
128        }
129        catch (Exception ex) {
130          SlaveClientCom.Instance.ClientCom.LogMessage("Error stopping job:" + ex.ToString());
131        }
132      }
133    }
134
135    private void RegisterJobEvents() {
136      Job.JobStopped += new EventHandler(Job_JobStopped);
137      Job.JobFailed += new EventHandler(Job_JobFailed);
138      Job.NewChildJob += new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
139      Job.WaitForChildJobs += new EventHandler(Job_WaitForChildJobs);
140      Job.DeleteChildJobs += new EventHandler(Job_DeleteChildJobs);
141      Job.JobPaused += new EventHandler(Job_JobPaused);
142    }
143
144    private void DeregisterJobEvents() {
145      Job.JobStopped -= new EventHandler(Job_JobStopped);
146      Job.JobFailed -= new EventHandler(Job_JobFailed);
147      Job.NewChildJob -= new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
148      Job.WaitForChildJobs -= new EventHandler(Job_WaitForChildJobs);
149      Job.DeleteChildJobs -= new EventHandler(Job_DeleteChildJobs);
150      Job.JobPaused -= new EventHandler(Job_JobPaused);
151    }
152
153    private List<Guid> FindPluginsNeeded(IJob obj) {
154      List<Guid> guids = new List<Guid>();
155      foreach (IPluginDescription desc in PluginUtil.GetDeclaringPlugins(obj)) {
156      }
157      throw new NotImplementedException("FindPluginsNeeded for Job_NewChildJob");
158
159      return guids;
160    }
161
162    private void Job_NewChildJob(object sender, EventArgs<IJob> e) {
163      JobData childJobData = new JobData();
164      childJobData.Data = PersistenceUtil.Serialize(e.Value);
165
166      Job childJob = new Job();
167      childJob.CoresNeeded = 1;
168      childJob.MemoryNeeded = 0;
169      childJob.PluginsNeededIds = FindPluginsNeeded(e.Value);
170
171      //TODO: is return value needed?
172      WcfService.Instance.AddChildJob(this.JobId, childJob, childJobData);
173    }
174
175    private void Job_WaitForChildJobs(object sender, EventArgs e) {
176      // Pause the job and send it back to the hive. The server will awake it when all child-jobs are finished
177      this.Job.CollectChildJobs = true;
178
179      JobData jdata = new JobData();
180      jdata.Data = PersistenceUtil.Serialize(Job);
181      jdata.JobId = this.JobId;
182
183      Core.PauseWaitJob(jdata);
184    }
185
186    private void Job_DeleteChildJobs(object sender, EventArgs e) {
187      WcfService.Instance.DeleteChildJobs(JobId);
188    }
189
190    private void Job_JobFailed(object sender, EventArgs e) {
191      HeuristicLab.Common.EventArgs<Exception> ex = (HeuristicLab.Common.EventArgs<Exception>)e;
192      currentException = ex.Value;
193      Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId);
194      Aborted = true;
195    }
196
197    private void Job_JobStopped(object sender, EventArgs e) {
198      if (wasJobAborted) {
199        pauseStopSem.Release();
200        Aborted = true;
201      } else {
202        //it's a clean and finished job, so send it       
203        Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId);
204      }
205    }
206
207    public JobData GetFinishedJob() {
208      if (Job == null) {
209        SlaveClientCom.Instance.ClientCom.LogMessage("Getting finished job: Job is null");
210        Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
211      }
212
213      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
214        try {
215          Job.Stop();
216          wasJobAborted = true;
217          pauseStopSem.WaitOne();
218        }
219        catch (Exception ex) {
220          SlaveClientCom.Instance.ClientCom.LogMessage("Error stopping job:" + ex.ToString());
221        }
222      }
223
224      return GetJob();
225    }
226
227
228    public JobData GetPausedJob() {
229      if (Job.ExecutionState != HeuristicLab.Core.ExecutionState.Paused) {
230        throw new Exception("Executor: Job has to be paused before fetching results.");
231      }
232      return GetJob();
233    }
234
235    private void Job_JobPaused(object sender, EventArgs e) {
236      pauseStopSem.Release();
237    }
238
239    private JobData GetJob() {
240      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
241        throw new InvalidStateException("Job is still running");
242      } else {
243        JobData jdata = new JobData();
244        jdata.Data = PersistenceUtil.Serialize(Job);
245        jdata.JobId = JobId;
246        return jdata;
247      }
248    }
249
250    public void Dispose() {
251      if (Job != null)
252        DeregisterJobEvents();
253      Job = null;
254    }
255  }
256}
Note: See TracBrowser for help on using the repository browser.