#region License Information
/* HeuristicLab
* Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Threading;
using HeuristicLab.Common;
using HeuristicLab.Core;
using HeuristicLab.Hive;
namespace HeuristicLab.Clients.Hive.SlaveCore {
///
/// The executor runs in the appdomain and handles the execution of an Hive job.
///
public class Executor : MarshalByRefObject, IDisposable {
private bool wasJobAborted = false;
private Semaphore pauseStopSem = new Semaphore(0, 1);
private Semaphore startJobSem = new Semaphore(0, 1);
private Semaphore jobStartedSem = new Semaphore(0, 1); // make pause or stop wait until start is finished
private ExecutorQueue executorQueue;
private bool jobDataInvalid = false; // if true, the jobdata is not sent when the job is failed
private IJob job;
private DateTime creationTime;
public Guid JobId { get; set; }
public int CoresNeeded { get; set; }
public int MemoryNeeded { get; set; }
public bool IsStopping { get; set; }
public bool IsPausing { get; set; }
private Exception currentException;
public String CurrentException {
get {
if (currentException != null) {
return currentException.ToString();
} else {
return string.Empty;
}
}
}
public ExecutorQueue ExecutorCommandQueue {
get { return executorQueue; }
}
private ExecutionState ExecutionState {
get { return job != null ? job.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped; }
}
public TimeSpan ExecutionTime {
get { return job != null ? job.ExecutionTime : new TimeSpan(0, 0, 0); }
}
public Executor() {
IsStopping = false;
IsPausing = false;
executorQueue = new ExecutorQueue();
}
public void Start(byte[] serializedJob) {
try {
creationTime = DateTime.Now;
job = PersistenceUtil.Deserialize(serializedJob);
RegisterJobEvents();
job.Start();
if (!jobStartedSem.WaitOne(TimeSpan.FromSeconds(25))) {
jobDataInvalid = true;
throw new TimeoutException("Timeout when starting the job. JobStarted event was not fired.");
}
jobStartedSem.Release();
}
catch (Exception e) {
this.currentException = e;
Job_JobFailed(this, new EventArgs(e));
}
}
public void Pause() {
IsPausing = true;
// wait until job is started. if this does not happen, the Job is null an we give up
jobStartedSem.WaitOne(TimeSpan.FromSeconds(15));
if (job == null) {
currentException = new Exception("Pausing job " + this.JobId + ": Job is null");
return;
}
if (job.ExecutionState == ExecutionState.Started) {
try {
job.Pause();
//we need to block the pause...
pauseStopSem.WaitOne();
}
catch (Exception ex) {
currentException = new Exception("Error pausing job " + this.JobId + ": " + ex.ToString());
}
}
}
public void Stop() {
IsStopping = true;
// wait until job is started. if this does not happen, the Job is null an we give up
jobStartedSem.WaitOne(TimeSpan.FromSeconds(15));
if (job == null) {
currentException = new Exception("Stopping job " + this.JobId + ": Job is null");
}
wasJobAborted = true;
if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) {
try {
job.Stop();
pauseStopSem.WaitOne();
}
catch (Exception ex) {
currentException = new Exception("Error stopping job " + this.JobId + ": " + ex.ToString());
}
}
}
private void RegisterJobEvents() {
job.JobStopped += new EventHandler(Job_JobStopped);
job.JobFailed += new EventHandler(Job_JobFailed);
job.JobPaused += new EventHandler(Job_JobPaused);
job.JobStarted += new EventHandler(Job_JobStarted);
}
private void DeregisterJobEvents() {
job.JobStopped -= new EventHandler(Job_JobStopped);
job.JobFailed -= new EventHandler(Job_JobFailed);
job.JobPaused -= new EventHandler(Job_JobPaused);
job.JobStarted -= new EventHandler(Job_JobStarted);
}
#region Job Events
private void Job_JobFailed(object sender, EventArgs e) {
IsStopping = true;
EventArgs ex = (EventArgs)e;
currentException = ex.Value;
executorQueue.AddMessage(ExecutorMessageType.JobFailed);
}
private void Job_JobStopped(object sender, EventArgs e) {
IsStopping = true;
if (wasJobAborted)
pauseStopSem.Release();
executorQueue.AddMessage(ExecutorMessageType.JobStopped);
}
private void Job_JobPaused(object sender, EventArgs e) {
IsPausing = true;
pauseStopSem.Release();
executorQueue.AddMessage(ExecutorMessageType.JobPaused);
}
private void Job_JobStarted(object sender, EventArgs e) {
jobStartedSem.Release();
executorQueue.AddMessage(ExecutorMessageType.JobStarted);
}
#endregion
public JobData GetJobData() {
if (jobDataInvalid) return null;
if (job.ExecutionState == ExecutionState.Started) {
throw new InvalidStateException("Job is still running");
} else {
JobData jobData = new JobData();
if (job == null) {
//send empty job and save exception
jobData.Data = PersistenceUtil.Serialize(new JobData());
if (currentException == null) {
currentException = new Exception("Job with id " + this.JobId + " is null, sending empty job");
}
} else {
jobData.Data = PersistenceUtil.Serialize(job);
}
jobData.JobId = JobId;
return jobData;
}
}
public void Dispose() {
if (job != null)
DeregisterJobEvents();
job = null;
}
}
}