Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Executor.cs @ 6110

Last change on this file since 6110 was 6110, checked in by cneumuel, 13 years ago

#1233

  • renamed engines to executors
  • changed locking in StartJobInAppDomain
  • avoid destruction of proxy object after 5 minutes for Slave.Core
  • added JobStarted event and fixed ExecutionStateChanged and ExecutionTimeChanged
  • slaves which are moved to another slavegroup will pause their jobs now, if they must not calculate them
File size: 8.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Common;
27using HeuristicLab.Core;
28using HeuristicLab.Hive;
29using HeuristicLab.PluginInfrastructure;
30
31
32namespace HeuristicLab.Clients.Hive.SlaveCore {
33  public class Executor : MarshalByRefObject, IDisposable {
34    public Guid JobId { get; set; }
35    public IJob Job { get; set; }
36    private bool wasJobAborted = false;
37    public Core Core { get; set; }
38    private Semaphore pauseStopSem = new Semaphore(0, 1);
39
40    public bool SendHeartbeatForExecutor { get; set; }
41
42    public bool Aborted { get; set; }
43
44    public DateTime CreationTime { get; set; }
45
46    private Exception currentException;
47    public String CurrentException {
48      get {
49        if (currentException != null) {
50          return currentException.ToString();
51        } else {
52          return string.Empty;
53        }
54      }
55    }
56
57    public ExecutionState ExecutionState {
58      get {
59        return Job != null ? Job.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped;
60      }
61    }
62
63    public TimeSpan ExecutionTime {
64      get {
65        return Job != null ? Job.ExecutionTime : new TimeSpan(0, 0, 0);
66      }
67    }
68
69    public Executor() {
70      SendHeartbeatForExecutor = true;
71    }
72
73    /// <param name="serializedJob"></param>
74    /// <param name="collectChildJobs">if true, all child-jobs are downloaded and the job will be resumed.</param>
75    public void Start(byte[] serializedJob) {
76      try {
77        CreationTime = DateTime.Now;
78        Aborted = false;
79        Job = PersistenceUtil.Deserialize<IJob>(serializedJob);
80
81        RegisterJobEvents();
82
83        if (Job.CollectChildJobs) {
84          IEnumerable<JobData> childjobs = WcfService.Instance.GetChildJobs(JobId);
85          Job.Resume(childjobs.Select(j => PersistenceUtil.Deserialize<IJob>(j.Data)));
86        } else {
87          Job.Start();
88
89        }
90      }
91      catch (Exception e) {
92        this.currentException = e;
93        Job_JobFailed(this, new HeuristicLab.Common.EventArgs<Exception>(e));
94      }
95    }
96
97    public void Pause() {
98      SendHeartbeatForExecutor = false;
99      if (Job == null) {
100        currentException = new Exception("Pausing job " + this.JobId + ": Job is null");
101        Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
102      }
103
104      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
105        try {
106          Job.Pause();
107          //we need to block the pause...
108          pauseStopSem.WaitOne();
109        }
110        catch (Exception ex) {
111          currentException = new Exception("Error pausing job " + this.JobId + ": " + ex.ToString());
112        }
113      }
114    }
115
116    public void Stop() {
117      SendHeartbeatForExecutor = false;
118      if (Job == null) {
119        currentException = new Exception("Stopping job " + this.JobId + ": Job is null");
120        Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
121      }
122      wasJobAborted = true;
123
124      if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) {
125        try {
126          Job.Stop();
127          pauseStopSem.WaitOne();
128        }
129        catch (Exception ex) {
130          currentException = new Exception("Error stopping job " + this.JobId + ": " + ex.ToString());
131        }
132      }
133    }
134
135    private void RegisterJobEvents() {
136      Job.JobStopped += new EventHandler(Job_JobStopped);
137      Job.JobFailed += new EventHandler(Job_JobFailed);
138      Job.NewChildJob += new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
139      Job.WaitForChildJobs += new EventHandler(Job_WaitForChildJobs);
140      Job.DeleteChildJobs += new EventHandler(Job_DeleteChildJobs);
141      Job.JobPaused += new EventHandler(Job_JobPaused);
142    }
143
144    private void DeregisterJobEvents() {
145      Job.JobStopped -= new EventHandler(Job_JobStopped);
146      Job.JobFailed -= new EventHandler(Job_JobFailed);
147      Job.NewChildJob -= new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
148      Job.WaitForChildJobs -= new EventHandler(Job_WaitForChildJobs);
149      Job.DeleteChildJobs -= new EventHandler(Job_DeleteChildJobs);
150      Job.JobPaused -= new EventHandler(Job_JobPaused);
151    }
152
153    private List<Guid> FindPluginsNeeded(IJob obj) {
154      List<Guid> guids = new List<Guid>();
155      foreach (IPluginDescription desc in PluginUtil.GetDeclaringPlugins(obj)) {
156      }
157      throw new NotImplementedException("FindPluginsNeeded for Job_NewChildJob");
158
159      return guids;
160    }
161
162    private void Job_NewChildJob(object sender, EventArgs<IJob> e) {
163      JobData childJobData = new JobData();
164      childJobData.Data = PersistenceUtil.Serialize(e.Value);
165
166      Job childJob = new Job();
167      childJob.CoresNeeded = 1;
168      childJob.MemoryNeeded = 0;
169      childJob.PluginsNeededIds = FindPluginsNeeded(e.Value);
170
171      //TODO: is return value needed?
172      WcfService.Instance.AddChildJob(this.JobId, childJob, childJobData);
173    }
174
175    private void Job_WaitForChildJobs(object sender, EventArgs e) {
176      // Pause the job and send it back to the hive. The server will awake it when all child-jobs are finished
177      this.Job.CollectChildJobs = true;
178
179      JobData jdata = new JobData();
180      jdata.Data = PersistenceUtil.Serialize(Job);
181      jdata.JobId = this.JobId;
182
183      Core.PauseWaitJob(jdata);
184    }
185
186    private void Job_DeleteChildJobs(object sender, EventArgs e) {
187      WcfService.Instance.DeleteChildJobs(JobId);
188    }
189
190    private void Job_JobFailed(object sender, EventArgs e) {
191      HeuristicLab.Common.EventArgs<Exception> ex = (HeuristicLab.Common.EventArgs<Exception>)e;
192      currentException = ex.Value;
193      Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId);
194      Aborted = true;
195    }
196
197    private void Job_JobStopped(object sender, EventArgs e) {
198      if (wasJobAborted) {
199        pauseStopSem.Release();
200        Aborted = true;
201      } else {
202        //it's a clean and finished job, so send it
203        Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId);
204      }
205    }
206
207    public JobData GetFinishedJob() {
208      if (Job == null) {
209        if (currentException == null) {
210          currentException = new Exception("Getting finished job " + this.JobId + ": Job is null");
211        }
212        Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
213      }
214
215      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
216        try {
217          Job.Stop();
218          wasJobAborted = true;
219          pauseStopSem.WaitOne();
220        }
221        catch (Exception ex) {
222          currentException = new Exception("Error getting finished job " + this.JobId + ": " + ex.ToString());
223        }
224      }
225
226      return GetJob();
227    }
228
229
230    public JobData GetPausedJob() {
231      if (Job.ExecutionState != HeuristicLab.Core.ExecutionState.Paused) {
232        throw new Exception("Executor: Job has to be paused before fetching results.");
233      }
234      return GetJob();
235    }
236
237    private void Job_JobPaused(object sender, EventArgs e) {
238      pauseStopSem.Release();
239    }
240
241    private JobData GetJob() {
242      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
243        throw new InvalidStateException("Job is still running");
244      } else {
245        JobData jdata = new JobData();
246        jdata.Data = PersistenceUtil.Serialize(Job);
247        jdata.JobId = JobId;
248        return jdata;
249      }
250    }
251
252    public void Dispose() {
253      if (Job != null)
254        DeregisterJobEvents();
255      Job = null;
256    }
257  }
258}
Note: See TracBrowser for help on using the repository browser.