Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Executor.cs @ 6168

Last change on this file since 6168 was 6168, checked in by cneumuel, 13 years ago

#1233

  • removed Job-dto objects from slave core (since it stores outdated objects)
  • added command textbox to HiveJobView
  • improved the way the control buttons behave in HiveJobView
  • improved job control (pause and stop is also possible when job is not currently calculating)
  • improved gantt chart view (last state log entry is also displayed)
  • unified code for downloading jobs between experiment manager and hive engine
File size: 8.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Common;
27using HeuristicLab.Core;
28using HeuristicLab.Hive;
29using HeuristicLab.PluginInfrastructure;
30
31
32namespace HeuristicLab.Clients.Hive.SlaveCore {
33  public class Executor : MarshalByRefObject, IDisposable {
34    public Guid JobId { get; set; }
35    public IJob Job { get; set; }
36    public int CoresNeeded { get; set; }
37    public int MemoryNeeded { get; set; }
38    private bool wasJobAborted = false;
39    public Core Core { get; set; }
40    private Semaphore pauseStopSem = new Semaphore(0, 1);
41    private Semaphore startJobSem = new Semaphore(0, 1);
42
43    public bool SendHeartbeatForExecutor { get; set; }
44
45    public bool Aborted { get; set; }
46
47    public DateTime CreationTime { get; set; }
48
49    private Exception currentException;
50    public String CurrentException {
51      get {
52        if (currentException != null) {
53          return currentException.ToString();
54        } else {
55          return string.Empty;
56        }
57      }
58    }
59
60    public ExecutionState ExecutionState {
61      get {
62        return Job != null ? Job.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped;
63      }
64    }
65
66    public TimeSpan ExecutionTime {
67      get {
68        return Job != null ? Job.ExecutionTime : new TimeSpan(0, 0, 0);
69      }
70    }
71
72    public Executor() {
73      SendHeartbeatForExecutor = true;
74    }
75
76    /// <param name="serializedJob"></param>
77    /// <param name="collectChildJobs">if true, all child-jobs are downloaded and the job will be resumed.</param>
78    public void Start(byte[] serializedJob) {
79      try {
80        CreationTime = DateTime.Now;
81        Aborted = false;
82        Job = PersistenceUtil.Deserialize<IJob>(serializedJob);
83
84        RegisterJobEvents();
85
86        if (Job.CollectChildJobs) {
87          IEnumerable<JobData> childjobs = WcfService.Instance.GetChildJobs(JobId);
88          Job.Resume(childjobs.Select(j => PersistenceUtil.Deserialize<IJob>(j.Data)));
89        } else {
90          Job.Start();
91          startJobSem.WaitOne();
92        }
93      }
94      catch (Exception e) {
95        this.currentException = e;
96        Job_JobFailed(this, new HeuristicLab.Common.EventArgs<Exception>(e));
97      }
98    }
99
100    public void Pause() {
101      SendHeartbeatForExecutor = false;
102      if (Job == null) {
103        currentException = new Exception("Pausing job " + this.JobId + ": Job is null");
104        Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
105      }
106
107      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
108        try {
109          Job.Pause();
110          //we need to block the pause...
111          pauseStopSem.WaitOne();
112        }
113        catch (Exception ex) {
114          currentException = new Exception("Error pausing job " + this.JobId + ": " + ex.ToString());
115        }
116      }
117    }
118
119    public void Stop() {
120      SendHeartbeatForExecutor = false;
121      if (Job == null) {
122        currentException = new Exception("Stopping job " + this.JobId + ": Job is null");
123        Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
124      }
125      wasJobAborted = true;
126
127      if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) {
128        try {
129          Job.Stop();
130          pauseStopSem.WaitOne();
131        }
132        catch (Exception ex) {
133          currentException = new Exception("Error stopping job " + this.JobId + ": " + ex.ToString());
134        }
135      }
136    }
137
138    private void RegisterJobEvents() {
139      Job.JobStopped += new EventHandler(Job_JobStopped);
140      Job.JobFailed += new EventHandler(Job_JobFailed);
141      Job.NewChildJob += new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
142      Job.WaitForChildJobs += new EventHandler(Job_WaitForChildJobs);
143      Job.DeleteChildJobs += new EventHandler(Job_DeleteChildJobs);
144      Job.JobPaused += new EventHandler(Job_JobPaused);
145      Job.JobStarted += new EventHandler(Job_JobStarted);
146    }
147
148    private void DeregisterJobEvents() {
149      Job.JobStopped -= new EventHandler(Job_JobStopped);
150      Job.JobFailed -= new EventHandler(Job_JobFailed);
151      Job.NewChildJob -= new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
152      Job.WaitForChildJobs -= new EventHandler(Job_WaitForChildJobs);
153      Job.DeleteChildJobs -= new EventHandler(Job_DeleteChildJobs);
154      Job.JobPaused -= new EventHandler(Job_JobPaused);
155      Job.JobStarted -= new EventHandler(Job_JobStarted);
156    }
157
158    private List<Guid> FindPluginsNeeded(IJob obj) {
159      List<Guid> guids = new List<Guid>();
160      foreach (IPluginDescription desc in PluginUtil.GetDeclaringPlugins(obj)) {
161      }
162      throw new NotImplementedException("FindPluginsNeeded for Job_NewChildJob");
163
164      return guids;
165    }
166
167    private void Job_NewChildJob(object sender, EventArgs<IJob> e) {
168      JobData childJobData = new JobData();
169      childJobData.Data = PersistenceUtil.Serialize(e.Value);
170
171      Job childJob = new Job();
172      childJob.CoresNeeded = 1;
173      childJob.MemoryNeeded = 0;
174      childJob.PluginsNeededIds = FindPluginsNeeded(e.Value);
175
176      //TODO: is return value needed?
177      WcfService.Instance.AddChildJob(this.JobId, childJob, childJobData);
178    }
179
180    private void Job_WaitForChildJobs(object sender, EventArgs e) {
181      // Pause the job and send it back to the hive. The server will awake it when all child-jobs are finished
182      this.Job.CollectChildJobs = true;
183
184      JobData jdata = new JobData();
185      jdata.Data = PersistenceUtil.Serialize(Job);
186      jdata.JobId = this.JobId;
187
188      Core.PauseWaitJob(jdata);
189    }
190
191    private void Job_DeleteChildJobs(object sender, EventArgs e) {
192      WcfService.Instance.DeleteChildJobs(JobId);
193    }
194
195    private void Job_JobFailed(object sender, EventArgs e) {
196      HeuristicLab.Common.EventArgs<Exception> ex = (HeuristicLab.Common.EventArgs<Exception>)e;
197      currentException = ex.Value;
198      Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId);
199      Aborted = true;
200    }
201
202    private void Job_JobStopped(object sender, EventArgs e) {
203      if (wasJobAborted) {
204        pauseStopSem.Release();
205        Aborted = true;
206      } else {
207        //it's a clean and finished job, so send it
208        Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId);
209      }
210    }
211
212    public JobData GetFinishedJob() {
213      if (Job == null) {
214        if (currentException == null) {
215          currentException = new Exception("Getting finished job " + this.JobId + ": Job is null");
216        }
217        Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
218      }
219
220      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
221        try {
222          Job.Stop();
223          wasJobAborted = true;
224          pauseStopSem.WaitOne();
225        }
226        catch (Exception ex) {
227          currentException = new Exception("Error getting finished job " + this.JobId + ": " + ex.ToString());
228        }
229      }
230
231      return GetJob();
232    }
233
234
235    public JobData GetPausedJob() {
236      if (Job.ExecutionState != HeuristicLab.Core.ExecutionState.Paused) {
237        throw new Exception("Executor: Job has to be paused before fetching results.");
238      }
239      return GetJob();
240    }
241
242    private void Job_JobPaused(object sender, EventArgs e) {
243      pauseStopSem.Release();
244    }
245
246    void Job_JobStarted(object sender, EventArgs e) {
247      startJobSem.Release();
248    }
249
250    private JobData GetJob() {
251      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
252        throw new InvalidStateException("Job is still running");
253      } else {
254        JobData jdata = new JobData();
255        jdata.Data = PersistenceUtil.Serialize(Job);
256        jdata.JobId = JobId;
257        return jdata;
258      }
259    }
260
261    public void Dispose() {
262      if (Job != null)
263        DeregisterJobEvents();
264      Job = null;
265    }
266  }
267}
Note: See TracBrowser for help on using the repository browser.