Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Executor.cs @ 6451

Last change on this file since 6451 was 6419, checked in by cneumuel, 14 years ago

#1233

  • created events when statelog changed
  • fixed memory leak in hiveengine
  • extended timeout for long running transactions and database contexts (when jobdata is stored)
  • replaced random guids in database with sequential guids for performance reasons
  • minor fixes and cleanups
File size: 6.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Threading;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Hive;
27
28namespace HeuristicLab.Clients.Hive.SlaveCore {
29  /// <summary>
30  /// The executor runs in the appdomain and handles the execution of an Hive job.
31  /// </summary>
32  public class Executor : MarshalByRefObject, IDisposable {
33    private bool wasJobAborted = false;
34    private Semaphore pauseStopSem = new Semaphore(0, 1);
35    private Semaphore startJobSem = new Semaphore(0, 1);
36    private Semaphore jobStartedSem = new Semaphore(0, 1); // make pause or stop wait until start is finished
37    private ExecutorQueue executorQueue;
38    private bool jobDataInvalid = false; // if true, the jobdata is not sent when the job is failed
39    private IJob job;
40    private DateTime creationTime;
41
42    public Guid JobId { get; set; }
43    public int CoresNeeded { get; set; }
44    public int MemoryNeeded { get; set; }
45    public bool IsStopping { get; set; }
46    public bool IsPausing { get; set; }
47
48    private Exception currentException;
49    public String CurrentException {
50      get {
51        if (currentException != null) {
52          return currentException.ToString();
53        } else {
54          return string.Empty;
55        }
56      }
57    }
58
59    public ExecutorQueue ExecutorCommandQueue {
60      get { return executorQueue; }
61    }
62
63    private ExecutionState ExecutionState {
64      get { return job != null ? job.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped; }
65    }
66
67    public TimeSpan ExecutionTime {
68      get { return job != null ? job.ExecutionTime : new TimeSpan(0, 0, 0); }
69    }
70
71    public Executor() {
72      IsStopping = false;
73      IsPausing = false;
74      executorQueue = new ExecutorQueue();
75    }
76
77    public void Start(byte[] serializedJob) {
78      try {
79        creationTime = DateTime.Now;
80        job = PersistenceUtil.Deserialize<IJob>(serializedJob);
81
82        RegisterJobEvents();
83
84        job.Start();
85        if (!jobStartedSem.WaitOne(TimeSpan.FromSeconds(25))) {
86          jobDataInvalid = true;
87          throw new TimeoutException("Timeout when starting the job. JobStarted event was not fired.");
88        }
89        jobStartedSem.Release();
90      }
91      catch (Exception e) {
92        this.currentException = e;
93        Job_JobFailed(this, new EventArgs<Exception>(e));
94      }
95    }
96
97    public void Pause() {
98      IsPausing = true;
99      // wait until job is started. if this does not happen, the Job is null an we give up
100      jobStartedSem.WaitOne(TimeSpan.FromSeconds(15));
101      if (job == null) {
102        currentException = new Exception("Pausing job " + this.JobId + ": Job is null");
103        return;
104      }
105
106      if (job.ExecutionState == ExecutionState.Started) {
107        try {
108          job.Pause();
109          //we need to block the pause...
110          pauseStopSem.WaitOne();
111        }
112        catch (Exception ex) {
113          currentException = new Exception("Error pausing job " + this.JobId + ": " + ex.ToString());
114        }
115      }
116    }
117
118    public void Stop() {
119      IsStopping = true;
120      // wait until job is started. if this does not happen, the Job is null an we give up
121      jobStartedSem.WaitOne(TimeSpan.FromSeconds(15));
122      if (job == null) {
123        currentException = new Exception("Stopping job " + this.JobId + ": Job is null");
124      }
125      wasJobAborted = true;
126
127      if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) {
128        try {
129          job.Stop();
130          pauseStopSem.WaitOne();
131        }
132        catch (Exception ex) {
133          currentException = new Exception("Error stopping job " + this.JobId + ": " + ex.ToString());
134        }
135      }
136    }
137
138    private void RegisterJobEvents() {
139      job.JobStopped += new EventHandler(Job_JobStopped);
140      job.JobFailed += new EventHandler(Job_JobFailed);
141      job.JobPaused += new EventHandler(Job_JobPaused);
142      job.JobStarted += new EventHandler(Job_JobStarted);
143    }
144
145    private void DeregisterJobEvents() {
146      job.JobStopped -= new EventHandler(Job_JobStopped);
147      job.JobFailed -= new EventHandler(Job_JobFailed);
148      job.JobPaused -= new EventHandler(Job_JobPaused);
149      job.JobStarted -= new EventHandler(Job_JobStarted);
150    }
151
152    #region Job Events
153    private void Job_JobFailed(object sender, EventArgs e) {
154      IsStopping = true;
155      EventArgs<Exception> ex = (EventArgs<Exception>)e;
156      currentException = ex.Value;
157
158      executorQueue.AddMessage(ExecutorMessageType.JobFailed);
159    }
160
161    private void Job_JobStopped(object sender, EventArgs e) {
162      IsStopping = true;
163      if (wasJobAborted)
164        pauseStopSem.Release();
165
166      executorQueue.AddMessage(ExecutorMessageType.JobStopped);
167    }
168
169    private void Job_JobPaused(object sender, EventArgs e) {
170      IsPausing = true;
171      pauseStopSem.Release();
172      executorQueue.AddMessage(ExecutorMessageType.JobPaused);
173    }
174
175    private void Job_JobStarted(object sender, EventArgs e) {
176      jobStartedSem.Release();
177      executorQueue.AddMessage(ExecutorMessageType.JobStarted);
178    }
179    #endregion
180
181    public JobData GetJobData() {
182      if (jobDataInvalid) return null;
183
184      if (job.ExecutionState == ExecutionState.Started) {
185        throw new InvalidStateException("Job is still running");
186      } else {
187        JobData jobData = new JobData();
188        if (job == null) {
189          //send empty job and save exception
190          jobData.Data = PersistenceUtil.Serialize(new JobData());
191          if (currentException == null) {
192            currentException = new Exception("Job with id " + this.JobId + " is null, sending empty job");
193          }
194        } else {
195          jobData.Data = PersistenceUtil.Serialize(job);
196        }
197        jobData.JobId = JobId;
198        return jobData;
199      }
200    }
201
202    public void Dispose() {
203      if (job != null)
204        DeregisterJobEvents();
205      job = null;
206    }
207  }
208}
Note: See TracBrowser for help on using the repository browser.