#region License Information /* HeuristicLab * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Threading; using HeuristicLab.Clients.Hive.SlaveCore.Properties; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Hive; namespace HeuristicLab.Clients.Hive.SlaveCore { /// /// The executor runs in the appdomain and handles the execution of an Hive job. /// public class Executor : MarshalByRefObject, IDisposable { private bool wasJobAborted = false; private AutoResetEvent pauseStopSem = new AutoResetEvent(false); private AutoResetEvent startJobSem = new AutoResetEvent(false); // block start method call private AutoResetEvent jobStartedSem = new AutoResetEvent(false); // make pause or stop wait until start is finished private ExecutorQueue executorQueue; private bool jobDataInvalid = false; // if true, the jobdata is not sent when the job is failed private IJob job; private DateTime creationTime; public Guid JobId { get; set; } public int CoresNeeded { get; set; } public int MemoryNeeded { get; set; } public bool IsStopping { get; set; } public bool IsPausing { get; set; } public Exception CurrentException; public String CurrentExceptionStr { get { if (CurrentException != null) { return CurrentException.ToString(); } else { return string.Empty; } } } public ExecutorQueue ExecutorCommandQueue { get { return executorQueue; } } private ExecutionState ExecutionState { get { return job != null ? job.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped; } } public TimeSpan ExecutionTime { get { return job != null ? job.ExecutionTime : new TimeSpan(0, 0, 0); } } public Executor() { IsStopping = false; IsPausing = false; executorQueue = new ExecutorQueue(); } public void Start(byte[] serializedJob) { try { creationTime = DateTime.Now; job = PersistenceUtil.Deserialize(serializedJob); RegisterJobEvents(); job.Start(); if (!startJobSem.WaitOne(Settings.Default.ExecutorSemTimeouts)) { jobDataInvalid = true; throw new TimeoutException("Timeout when starting the job. JobStarted event was not fired."); } } catch (Exception e) { this.CurrentException = e; Job_JobFailed(this, new EventArgs(e)); } finally { jobStartedSem.Set(); } } public void Pause() { IsPausing = true; // wait until job is started. if this does not happen, the Job is null an we give up jobStartedSem.WaitOne(Settings.Default.ExecutorSemTimeouts); if (job == null) { CurrentException = new Exception("Pausing job " + this.JobId + ": Job is null"); executorQueue.AddMessage(ExecutorMessageType.ExceptionOccured); return; } if (job.ExecutionState == ExecutionState.Started) { try { job.Pause(); //we need to block the pause... pauseStopSem.WaitOne(); } catch (Exception ex) { CurrentException = new Exception("Error pausing job " + this.JobId + ": " + ex.ToString()); executorQueue.AddMessage(ExecutorMessageType.ExceptionOccured); } } } public void Stop() { IsStopping = true; // wait until job is started. if this does not happen, the Job is null an we give up jobStartedSem.WaitOne(Settings.Default.ExecutorSemTimeouts); if (job == null) { CurrentException = new Exception("Stopping job " + this.JobId + ": Job is null"); executorQueue.AddMessage(ExecutorMessageType.ExceptionOccured); } wasJobAborted = true; if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) { try { job.Stop(); pauseStopSem.WaitOne(); } catch (Exception ex) { CurrentException = new Exception("Error stopping job " + this.JobId + ": " + ex.ToString()); executorQueue.AddMessage(ExecutorMessageType.ExceptionOccured); } } } private void RegisterJobEvents() { job.JobStopped += new EventHandler(Job_JobStopped); job.JobFailed += new EventHandler(Job_JobFailed); job.JobPaused += new EventHandler(Job_JobPaused); job.JobStarted += new EventHandler(Job_JobStarted); } private void DeregisterJobEvents() { job.JobStopped -= new EventHandler(Job_JobStopped); job.JobFailed -= new EventHandler(Job_JobFailed); job.JobPaused -= new EventHandler(Job_JobPaused); job.JobStarted -= new EventHandler(Job_JobStarted); } #region Job Events private void Job_JobFailed(object sender, EventArgs e) { IsStopping = true; EventArgs ex = (EventArgs)e; CurrentException = ex.Value; executorQueue.AddMessage(ExecutorMessageType.JobFailed); } private void Job_JobStopped(object sender, EventArgs e) { IsStopping = true; if (wasJobAborted) { pauseStopSem.Set(); } executorQueue.AddMessage(ExecutorMessageType.JobStopped); } private void Job_JobPaused(object sender, EventArgs e) { IsPausing = true; pauseStopSem.Set(); executorQueue.AddMessage(ExecutorMessageType.JobPaused); } private void Job_JobStarted(object sender, EventArgs e) { startJobSem.Set(); executorQueue.AddMessage(ExecutorMessageType.JobStarted); } #endregion public JobData GetJobData() { if (jobDataInvalid) return null; if (job.ExecutionState == ExecutionState.Started) { throw new InvalidStateException("Job is still running"); } else { JobData jobData = new JobData(); if (job == null) { //send empty job and save exception jobData.Data = PersistenceUtil.Serialize(new JobData()); if (CurrentException == null) { CurrentException = new Exception("Job with id " + this.JobId + " is null, sending empty job"); } } else { jobData.Data = PersistenceUtil.Serialize(job); } jobData.JobId = JobId; return jobData; } } public void Dispose() { if (job != null) DeregisterJobEvents(); job = null; } } }