Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Executor.cs @ 6269

Last change on this file since 6269 was 6203, checked in by ascheibe, 13 years ago

#1233

  • dropped dependency of Core from Executor
  • enabled sandboxing
  • moved most parts of Job handling from Core to SlaveJob to simplify locking
  • optimized how UsedCores is handled
  • SlaveStatusInfo is now thread-save and counts jobs more correct
File size: 9.8 KB
RevLine 
[5105]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
[5137]23using System.Collections.Generic;
[5105]24using System.Linq;
[5782]25using System.Threading;
[5105]26using HeuristicLab.Common;
27using HeuristicLab.Core;
28using HeuristicLab.Hive;
[5137]29using HeuristicLab.PluginInfrastructure;
[5105]30
[5137]31
[5599]32namespace HeuristicLab.Clients.Hive.SlaveCore {
[5105]33  public class Executor : MarshalByRefObject, IDisposable {
34    public Guid JobId { get; set; }
35    public IJob Job { get; set; }
[6168]36    public int CoresNeeded { get; set; }
37    public int MemoryNeeded { get; set; }
[5137]38    private bool wasJobAborted = false;
[5450]39    public Core Core { get; set; }
[5782]40    private Semaphore pauseStopSem = new Semaphore(0, 1);
[6112]41    private Semaphore startJobSem = new Semaphore(0, 1);
[6203]42    //make pause or stop wait until start is finished
43    private Semaphore jobStartedSem = new Semaphore(0, 1);
[5137]44
[6203]45    public ExecutorQueue executorQueue;
46
[6004]47    public bool SendHeartbeatForExecutor { get; set; }
48
[5826]49    public bool Aborted { get; set; }
50
[6004]51    public DateTime CreationTime { get; set; }
52
[5105]53    private Exception currentException;
54    public String CurrentException {
55      get {
56        if (currentException != null) {
57          return currentException.ToString();
58        } else {
59          return string.Empty;
60        }
61      }
62    }
[5137]63
[5105]64    public ExecutionState ExecutionState {
65      get {
66        return Job != null ? Job.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped;
67      }
68    }
69
70    public TimeSpan ExecutionTime {
71      get {
72        return Job != null ? Job.ExecutionTime : new TimeSpan(0, 0, 0);
73      }
74    }
75
[6004]76    public Executor() {
77      SendHeartbeatForExecutor = true;
[6203]78      executorQueue = new ExecutorQueue();
[6004]79    }
[5105]80
81    /// <param name="serializedJob"></param>
82    /// <param name="collectChildJobs">if true, all child-jobs are downloaded and the job will be resumed.</param>
83    public void Start(byte[] serializedJob) {
84      try {
85        CreationTime = DateTime.Now;
[5826]86        Aborted = false;
[5105]87        Job = PersistenceUtil.Deserialize<IJob>(serializedJob);
88
89        RegisterJobEvents();
90
[5137]91        if (Job.CollectChildJobs) {
[5105]92          IEnumerable<JobData> childjobs = WcfService.Instance.GetChildJobs(JobId);
93          Job.Resume(childjobs.Select(j => PersistenceUtil.Deserialize<IJob>(j.Data)));
94        } else {
95          Job.Start();
[6203]96          if (!jobStartedSem.WaitOne(TimeSpan.FromSeconds(15))) {
[6178]97            throw new TimeoutException("Timeout when starting the job. JobStarted event was not fired.");
98          }
[6203]99          jobStartedSem.Release();
[5105]100        }
101      }
102      catch (Exception e) {
[5137]103        this.currentException = e;
[6178]104        Job_JobFailed(this, new EventArgs<Exception>(e));
[5105]105      }
106    }
107
[5314]108    public void Pause() {
[6004]109      SendHeartbeatForExecutor = false;
[6203]110      // wait until job is started. if this does not happen, the Job is null an we give up
111      jobStartedSem.WaitOne(TimeSpan.FromSeconds(15));
[5782]112      if (Job == null) {
[6100]113        currentException = new Exception("Pausing job " + this.JobId + ": Job is null");
[6203]114        return;
[5782]115      }
[5469]116
[5782]117      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
118        try {
119          Job.Pause();
120          //we need to block the pause...
121          pauseStopSem.WaitOne();
122        }
123        catch (Exception ex) {
[6100]124          currentException = new Exception("Error pausing job " + this.JobId + ": " + ex.ToString());
[5782]125        }
126      }
[5314]127    }
128
[5450]129    public void Stop() {
[6004]130      SendHeartbeatForExecutor = false;
[6203]131      // wait until job is started. if this does not happen, the Job is null an we give up
132      jobStartedSem.WaitOne(TimeSpan.FromSeconds(15));
[5782]133      if (Job == null) {
[6100]134        currentException = new Exception("Stopping job " + this.JobId + ": Job is null");
[5782]135      }
[5137]136      wasJobAborted = true;
[5782]137
[5105]138      if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) {
[5782]139        try {
140          Job.Stop();
141          pauseStopSem.WaitOne();
142        }
143        catch (Exception ex) {
[6100]144          currentException = new Exception("Error stopping job " + this.JobId + ": " + ex.ToString());
[5782]145        }
[5105]146      }
147    }
148
149    private void RegisterJobEvents() {
150      Job.JobStopped += new EventHandler(Job_JobStopped);
151      Job.JobFailed += new EventHandler(Job_JobFailed);
152      Job.NewChildJob += new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
153      Job.WaitForChildJobs += new EventHandler(Job_WaitForChildJobs);
154      Job.DeleteChildJobs += new EventHandler(Job_DeleteChildJobs);
[5782]155      Job.JobPaused += new EventHandler(Job_JobPaused);
[6112]156      Job.JobStarted += new EventHandler(Job_JobStarted);
[5105]157    }
158
159    private void DeregisterJobEvents() {
160      Job.JobStopped -= new EventHandler(Job_JobStopped);
161      Job.JobFailed -= new EventHandler(Job_JobFailed);
162      Job.NewChildJob -= new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
163      Job.WaitForChildJobs -= new EventHandler(Job_WaitForChildJobs);
164      Job.DeleteChildJobs -= new EventHandler(Job_DeleteChildJobs);
[5782]165      Job.JobPaused -= new EventHandler(Job_JobPaused);
[6112]166      Job.JobStarted -= new EventHandler(Job_JobStarted);
[5105]167    }
168
169    private List<Guid> FindPluginsNeeded(IJob obj) {
170      List<Guid> guids = new List<Guid>();
[5137]171      foreach (IPluginDescription desc in PluginUtil.GetDeclaringPlugins(obj)) {
[5105]172      }
173      throw new NotImplementedException("FindPluginsNeeded for Job_NewChildJob");
174
175      return guids;
176    }
177
178    private void Job_NewChildJob(object sender, EventArgs<IJob> e) {
179      JobData childJobData = new JobData();
180      childJobData.Data = PersistenceUtil.Serialize(e.Value);
181
182      Job childJob = new Job();
183      childJob.CoresNeeded = 1;
184      childJob.MemoryNeeded = 0;
185      childJob.PluginsNeededIds = FindPluginsNeeded(e.Value);
186
[6203]187      ExecutorMessage msg = new ExecutorMessage(ExecutorMessageType.NewChildJob);
188      msg.MsgData = childJobData;
189      msg.MsgJob = childJob;
190
191      executorQueue.AddMessage(msg);
[5105]192    }
193
194    private void Job_WaitForChildJobs(object sender, EventArgs e) {
195      // Pause the job and send it back to the hive. The server will awake it when all child-jobs are finished
196      this.Job.CollectChildJobs = true;
[5137]197
[5105]198      JobData jdata = new JobData();
199      jdata.Data = PersistenceUtil.Serialize(Job);
200      jdata.JobId = this.JobId;
[5137]201
[6203]202      ExecutorMessage msg = new ExecutorMessage(ExecutorMessageType.WaitForChildJobs);
203      msg.MsgData = jdata;
204      executorQueue.AddMessage(msg);
[5105]205    }
206
207    private void Job_DeleteChildJobs(object sender, EventArgs e) {
[6203]208      executorQueue.AddMessage(ExecutorMessageType.DeleteChildJobs);
[5105]209    }
210
211    private void Job_JobFailed(object sender, EventArgs e) {
212      HeuristicLab.Common.EventArgs<Exception> ex = (HeuristicLab.Common.EventArgs<Exception>)e;
213      currentException = ex.Value;
[5826]214      Aborted = true;
[6203]215
216      executorQueue.AddMessage(ExecutorMessageType.JobFailed);
[5105]217    }
218
219    private void Job_JobStopped(object sender, EventArgs e) {
[5137]220      if (wasJobAborted) {
[5782]221        pauseStopSem.Release();
[5826]222        Aborted = true;
[5105]223      } else {
[6110]224        //it's a clean and finished job, so send it
[6203]225        executorQueue.AddMessage(ExecutorMessageType.JobStopped);
[5105]226      }
227    }
228
229    public JobData GetFinishedJob() {
230      if (Job == null) {
[6100]231        if (currentException == null) {
232          currentException = new Exception("Getting finished job " + this.JobId + ": Job is null");
[6203]233          return GetJob();
[6100]234        }
[5137]235      }
[5105]236
237      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
[5782]238        try {
239          Job.Stop();
240          wasJobAborted = true;
241          pauseStopSem.WaitOne();
242        }
243        catch (Exception ex) {
[6100]244          currentException = new Exception("Error getting finished job " + this.JobId + ": " + ex.ToString());
[5782]245        }
[5105]246      }
247
[5778]248      return GetJob();
249    }
250
251    public JobData GetPausedJob() {
[5782]252      if (Job.ExecutionState != HeuristicLab.Core.ExecutionState.Paused) {
253        throw new Exception("Executor: Job has to be paused before fetching results.");
[5778]254      }
255      return GetJob();
256    }
257
[5782]258    private void Job_JobPaused(object sender, EventArgs e) {
259      pauseStopSem.Release();
260    }
261
[6112]262    void Job_JobStarted(object sender, EventArgs e) {
[6203]263      jobStartedSem.Release();
[6112]264    }
265
[5778]266    private JobData GetJob() {
267      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
[5137]268        throw new InvalidStateException("Job is still running");
[5105]269      } else {
270        JobData jdata = new JobData();
[6203]271        if (Job == null) {
272          //send empty job and save exception
273          jdata.Data = PersistenceUtil.Serialize(new JobData());
274          if (currentException == null) {
275            currentException = new Exception("Job with id " + this.JobId + " is null, sending empty job");
276          }
277        } else {
278          jdata.Data = PersistenceUtil.Serialize(Job);
279        }
[5105]280        jdata.JobId = JobId;
281        return jdata;
282      }
[5137]283    }
284
[5105]285    public void Dispose() {
286      if (Job != null)
287        DeregisterJobEvents();
288      Job = null;
289    }
290  }
291}
Note: See TracBrowser for help on using the repository browser.