#region License Information
/* HeuristicLab
* Copyright (C) 2002-2016 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Threading;
using HeuristicLab.Clients.Hive.SlaveCore.Properties;
using HeuristicLab.Common;
using HeuristicLab.Core;
using HeuristicLab.Hive;
namespace HeuristicLab.Clients.Hive.SlaveCore {
///
/// The executor runs in the appdomain and handles the execution of an Hive task.
///
public class Executor : MarshalByRefObject, IDisposable {
private bool wasTaskAborted = false;
private AutoResetEvent pauseStopSem = new AutoResetEvent(false);
private AutoResetEvent startTaskSem = new AutoResetEvent(false); // block start method call
private AutoResetEvent taskStartedSem = new AutoResetEvent(false); // make pause or stop wait until start is finished
private ExecutorQueue executorQueue;
private bool taskDataInvalid = false; // if true, the jobdata is not sent when the task is failed
private ITask task;
private DateTime creationTime;
public Guid TaskId { get; set; }
public int CoresNeeded { get; set; }
public int MemoryNeeded { get; set; }
public bool IsStopping { get; set; }
public bool IsPausing { get; set; }
public Exception CurrentException;
public String CurrentExceptionStr {
get {
if (CurrentException != null) {
return CurrentException.ToString();
} else {
return string.Empty;
}
}
}
public ExecutorQueue ExecutorCommandQueue {
get { return executorQueue; }
}
private ExecutionState ExecutionState {
get { return task != null ? task.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped; }
}
public TimeSpan ExecutionTime {
get { return task != null ? task.ExecutionTime : new TimeSpan(0, 0, 0); }
}
public Executor() {
IsStopping = false;
IsPausing = false;
executorQueue = new ExecutorQueue();
}
public void Start(byte[] serializedJob) {
try {
creationTime = DateTime.Now;
task = PersistenceUtil.Deserialize(serializedJob);
RegisterJobEvents();
task.Start();
if (!startTaskSem.WaitOne(Settings.Default.ExecutorSemTimeouts)) {
throw new TimeoutException("Timeout when starting the task. TaskStarted event was not fired.");
}
}
catch (Exception e) {
HandleStartStopPauseError(e);
}
finally {
taskStartedSem.Set();
}
}
public void Pause() {
IsPausing = true;
// wait until task is started. if this does not happen, the Task is null an we give up
taskStartedSem.WaitOne(Settings.Default.ExecutorSemTimeouts);
if (task == null) {
HandleStartStopPauseError(new Exception("Pausing task " + this.TaskId + ": Task is null"));
return;
}
if (task.ExecutionState == ExecutionState.Started) {
try {
task.Pause();
//we need to block the pause...
if (!pauseStopSem.WaitOne(Settings.Default.ExecutorSemTimeouts)) {
throw new Exception("Pausing task " + this.TaskId + " timed out.");
}
}
catch (Exception ex) {
HandleStartStopPauseError(ex);
}
}
}
public void Stop() {
IsStopping = true;
// wait until task is started. if this does not happen, the Task is null an we give up
taskStartedSem.WaitOne(Settings.Default.ExecutorSemTimeouts);
wasTaskAborted = true;
if (task == null) {
HandleStartStopPauseError(new Exception("Stopping task " + this.TaskId + ": Task is null"));
return;
}
if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) {
try {
task.Stop();
if (!pauseStopSem.WaitOne(Settings.Default.ExecutorSemTimeouts)) {
throw new Exception("Stopping task " + this.TaskId + " timed out.");
}
}
catch (Exception ex) {
HandleStartStopPauseError(ex);
}
}
}
private void RegisterJobEvents() {
task.TaskStopped += new EventHandler(Task_TaskStopped);
task.TaskFailed += new EventHandler(Task_TaskFailed);
task.TaskPaused += new EventHandler(Task_TaskPaused);
task.TaskStarted += new EventHandler(Task_TaskStarted);
}
private void DeregisterJobEvents() {
task.TaskStopped -= new EventHandler(Task_TaskStopped);
task.TaskFailed -= new EventHandler(Task_TaskFailed);
task.TaskPaused -= new EventHandler(Task_TaskPaused);
task.TaskStarted -= new EventHandler(Task_TaskStarted);
}
#region Task Events
private void Task_TaskFailed(object sender, EventArgs e) {
IsStopping = true;
EventArgs ex = (EventArgs)e;
CurrentException = ex.Value;
executorQueue.AddMessage(ExecutorMessageType.TaskFailed);
}
private void Task_TaskStopped(object sender, EventArgs e) {
IsStopping = true;
if (wasTaskAborted) {
pauseStopSem.Set();
}
executorQueue.AddMessage(ExecutorMessageType.TaskStopped);
}
private void Task_TaskPaused(object sender, EventArgs e) {
IsPausing = true;
pauseStopSem.Set();
executorQueue.AddMessage(ExecutorMessageType.TaskPaused);
}
private void Task_TaskStarted(object sender, EventArgs e) {
startTaskSem.Set();
executorQueue.AddMessage(ExecutorMessageType.TaskStarted);
}
#endregion
public Tuple GetTaskDataSnapshot() {
if (taskDataInvalid) return null;
Tuple snapshot = null;
if (task == null) {
if (CurrentException == null) {
CurrentException = new Exception("Task with id " + this.TaskId + " is null, sending empty task");
}
} else {
var taskData = new TaskData();
var pausedTrigger = new EventWaitHandle(false, EventResetMode.ManualReset);
EventHandler pausedHandler = null;
pausedHandler = (s, e) => {
task.TaskPaused -= pausedHandler;
task.TaskPaused += Task_TaskPaused;
pausedTrigger.Set();
};
task.TaskPaused -= Task_TaskPaused;
task.TaskPaused += pausedHandler;
task.Pause();
pausedTrigger.WaitOne();
taskData.Data = PersistenceUtil.Serialize(task);
var timestamp = DateTime.Now;
EventHandler startedHandler = null;
startedHandler = (s, e) => {
task.TaskStarted -= startedHandler;
task.TaskStarted += Task_TaskStarted;
};
task.TaskStarted -= Task_TaskStarted;
task.TaskStarted += startedHandler;
task.Start();
taskData.TaskId = TaskId;
snapshot = Tuple.Create(taskData, timestamp);
}
return snapshot;
}
public TaskData GetTaskData() {
if (taskDataInvalid) return null;
if (task != null && task.ExecutionState == ExecutionState.Started) {
throw new InvalidStateException("Task is still running");
}
TaskData taskData = null;
if (task == null) {
if (CurrentException == null) {
CurrentException = new Exception("Task with id " + this.TaskId + " is null, sending empty task");
}
} else {
taskData = new TaskData();
taskData.Data = PersistenceUtil.Serialize(task);
taskData.TaskId = TaskId;
}
return taskData;
}
public void Dispose() {
if (task != null)
DeregisterJobEvents();
task = null;
}
private void HandleStartStopPauseError(Exception e) {
taskDataInvalid = true;
Task_TaskFailed(this, new EventArgs(e));
}
}
}