#region License Information
/* HeuristicLab
* Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Diagnostics;
using System.ServiceModel;
using System.Threading;
using System.Threading.Tasks;
using HeuristicLab.Clients.Hive.SlaveCore.Properties;
using HeuristicLab.Common;
using HeuristicLab.Core;
using TS = System.Threading.Tasks;
namespace HeuristicLab.Clients.Hive.SlaveCore {
///
/// The core component of the Hive Slave.
/// Handles commands sent from the Hive Server and does all webservice calls for jobs.
///
public class Core : MarshalByRefObject {
private static HeartbeatManager heartbeatManager;
public static HeartbeatManager HeartbeatManager {
get { return heartbeatManager; }
}
public EventLog ServiceEventLog { get; set; }
private Semaphore waitShutdownSem = new Semaphore(0, 1);
private bool abortRequested;
private ServiceHost slaveComm;
private WcfService wcfService;
private TaskManager taskManager;
private ConfigManager configManager;
private PluginManager pluginManager;
private bool startClientComService;
public Core()
: this(true) {
}
public Core(bool startClientComService) {
var log = new ThreadSafeLog(Settings.Default.MaxLogCount);
this.pluginManager = new PluginManager(WcfService.Instance, log);
this.taskManager = new TaskManager(pluginManager, log);
log.MessageAdded += new EventHandler>(log_MessageAdded);
RegisterTaskManagerEvents();
this.configManager = new ConfigManager(taskManager);
ConfigManager.Instance = this.configManager;
this.startClientComService = startClientComService;
}
///
/// Main method for the client
///
public void Start() {
abortRequested = false;
EventLogManager.ServiceEventLog = ServiceEventLog;
try {
if (startClientComService) {
//start the client communication service (pipe between slave and slave gui)
slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
slaveComm.Open();
}
// delete all left over task directories
pluginManager.CleanPluginTemp();
SlaveClientCom.Instance.LogMessage("Hive Slave started");
wcfService = WcfService.Instance;
RegisterServiceEvents();
StartHeartbeats(); // Start heartbeats thread
DispatchMessageQueue(); // dispatch messages until abortRequested
}
catch (Exception ex) {
if (ServiceEventLog != null) {
EventLogManager.LogException(ex);
} else {
//try to log with SlaveClientCom.Instance. if this works the user sees at least a message,
//else an exception will be thrown anyways.
SlaveClientCom.Instance.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
}
ShutdownCore();
}
finally {
DeregisterServiceEvents();
waitShutdownSem.Release();
}
}
private void StartHeartbeats() {
//Initialize the heartbeat
if (heartbeatManager == null) {
heartbeatManager = new HeartbeatManager();
heartbeatManager.StartHeartbeat();
}
}
private void DispatchMessageQueue() {
MessageQueue queue = MessageQueue.GetInstance();
while (!abortRequested) {
MessageContainer container = queue.GetMessage();
DetermineAction(container);
if (!abortRequested) {
SlaveClientCom.Instance.StatusChanged(configManager.GetStatusForClientConsole());
}
}
}
private void RegisterServiceEvents() {
WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
WcfService.Instance.ExceptionOccured += new EventHandler>(WcfService_ExceptionOccured);
}
private void DeregisterServiceEvents() {
WcfService.Instance.Connected -= WcfService_Connected;
WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
}
private void WcfService_ExceptionOccured(object sender, EventArgs e) {
SlaveClientCom.Instance.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
}
private void WcfService_Connected(object sender, EventArgs e) {
SlaveClientCom.Instance.LogMessage("Connected successfully to Hive server");
}
///
/// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
///
/// The container, containing the message
private void DetermineAction(MessageContainer container) {
SlaveClientCom.Instance.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
switch (container.Message) {
case MessageContainer.MessageType.CalculateTask:
CalculateTaskAsync(container.TaskId);
break;
case MessageContainer.MessageType.AbortTask:
AbortTaskAsync(container.TaskId);
break;
case MessageContainer.MessageType.StopTask:
StopTaskAsync(container.TaskId);
break;
case MessageContainer.MessageType.PauseTask:
PauseTaskAsync(container.TaskId);
break;
case MessageContainer.MessageType.StopAll:
DoStopAll();
break;
case MessageContainer.MessageType.PauseAll:
DoPauseAll();
break;
case MessageContainer.MessageType.AbortAll:
DoAbortAll();
break;
case MessageContainer.MessageType.ShutdownSlave:
ShutdownCore();
break;
case MessageContainer.MessageType.Restart:
DoStartSlave();
break;
case MessageContainer.MessageType.Sleep:
Sleep();
break;
case MessageContainer.MessageType.SayHello:
wcfService.Connect(configManager.GetClientInfo());
break;
case MessageContainer.MessageType.NewHBInterval:
int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
if (interval != -1) {
HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
}
break;
}
}
private void CalculateTaskAsync(Guid jobId) {
TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
.ContinueWith((t) => {
SlaveStatusInfo.IncrementExceptionOccured();
SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
}, TaskContinuationOptions.OnlyOnFaulted);
}
private void StopTaskAsync(Guid jobId) {
TS.Task.Factory.StartNew(HandleStopTask, jobId)
.ContinueWith((t) => {
SlaveStatusInfo.IncrementExceptionOccured();
SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
}, TaskContinuationOptions.OnlyOnFaulted);
}
private void PauseTaskAsync(Guid jobId) {
TS.Task.Factory.StartNew(HandlePauseTask, jobId)
.ContinueWith((t) => {
SlaveStatusInfo.IncrementExceptionOccured();
SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
}, TaskContinuationOptions.OnlyOnFaulted);
}
private void AbortTaskAsync(Guid jobId) {
TS.Task.Factory.StartNew(HandleAbortTask, jobId)
.ContinueWith((t) => {
SlaveStatusInfo.IncrementExceptionOccured();
SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
}, TaskContinuationOptions.OnlyOnFaulted);
}
private void HandleCalculateTask(object taskIdObj) {
Guid taskId = (Guid)taskIdObj;
Task task = null;
int usedCores = 0;
try {
task = wcfService.GetTask(taskId);
if (task == null) throw new TaskNotFoundException(taskId);
if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
if (ConfigManager.Instance.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
TaskData taskData = wcfService.GetTaskData(taskId);
if (taskData == null) throw new TaskDataNotFoundException(taskId);
task = wcfService.UpdateJobState(taskId, TaskState.Calculating, null);
if (task == null) throw new TaskNotFoundException(taskId);
taskManager.StartTaskAsync(task, taskData);
}
catch (TaskNotFoundException) {
SlaveStatusInfo.DecrementUsedCores(usedCores);
throw;
}
catch (TaskDataNotFoundException) {
SlaveStatusInfo.DecrementUsedCores(usedCores);
throw;
}
catch (TaskAlreadyRunningException) {
SlaveStatusInfo.DecrementUsedCores(usedCores);
throw;
}
catch (OutOfCoresException) {
wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more cores available");
throw;
}
catch (OutOfMemoryException) {
wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more memory available");
throw;
}
catch (Exception e) {
SlaveStatusInfo.DecrementUsedCores(usedCores);
wcfService.UpdateJobState(taskId, TaskState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
throw;
}
}
private void HandleStopTask(object taskIdObj) {
Guid taskId = (Guid)taskIdObj;
try {
Task task = wcfService.GetTask(taskId);
if (task == null) throw new TaskNotFoundException(taskId);
taskManager.StopTaskAsync(taskId);
}
catch (TaskNotFoundException) {
throw;
}
catch (TaskNotRunningException) {
throw;
}
catch (AppDomainNotCreatedException) {
throw;
}
}
private void HandlePauseTask(object taskIdObj) {
Guid taskId = (Guid)taskIdObj;
try {
Task task = wcfService.GetTask(taskId);
if (task == null) throw new TaskNotFoundException(taskId);
taskManager.PauseTaskAsync(taskId);
}
catch (TaskNotFoundException) {
throw;
}
catch (TaskNotRunningException) {
throw;
}
catch (AppDomainNotCreatedException) {
throw;
}
}
private void HandleAbortTask(object taskIdObj) {
Guid taskId = (Guid)taskIdObj;
try {
taskManager.AbortTask(taskId);
}
catch (TaskNotFoundException) {
throw;
}
}
#region TaskManager Events
private void RegisterTaskManagerEvents() {
this.taskManager.TaskStarted += new EventHandler>(taskManager_TaskStarted);
this.taskManager.TaskPaused += new EventHandler>(taskManager_TaskPaused);
this.taskManager.TaskStopped += new EventHandler>(taskManager_TaskStopped);
this.taskManager.TaskFailed += new EventHandler>>(taskManager_TaskFailed);
this.taskManager.ExceptionOccured += new EventHandler>(taskManager_ExceptionOccured);
this.taskManager.TaskAborted += new EventHandler>(taskManager_TaskAborted);
}
private void taskManager_TaskStarted(object sender, EventArgs e) {
// successfully started, everything is good
}
private void taskManager_TaskPaused(object sender, EventArgs e) {
try {
SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
heartbeatManager.AwakeHeartBeatThread();
Task task = wcfService.GetTask(e.Value.TaskId);
if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
task.ExecutionTime = e.Value.ExecutionTime;
TaskData taskData = e.Value2;
wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
}
catch (TaskNotFoundException ex) {
SlaveClientCom.Instance.LogMessage(ex.ToString());
}
catch (Exception ex) {
SlaveClientCom.Instance.LogMessage(ex.ToString());
}
}
private void taskManager_TaskStopped(object sender, EventArgs e) {
try {
SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
heartbeatManager.AwakeHeartBeatThread();
Task task = wcfService.GetTask(e.Value.TaskId);
if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
task.ExecutionTime = e.Value.ExecutionTime;
TaskData taskData = e.Value2;
wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
}
catch (TaskNotFoundException ex) {
SlaveClientCom.Instance.LogMessage(ex.ToString());
}
catch (Exception ex) {
SlaveClientCom.Instance.LogMessage(ex.ToString());
}
}
private void taskManager_TaskFailed(object sender, EventArgs> e) {
try {
SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
heartbeatManager.AwakeHeartBeatThread();
SlaveTask slaveTask = e.Value.Item1;
TaskData taskData = e.Value.Item2;
Exception exception = e.Value.Item3;
Task task = wcfService.GetTask(slaveTask.TaskId);
if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
task.ExecutionTime = slaveTask.ExecutionTime;
if (taskData != null) {
wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
} else {
wcfService.UpdateJobState(task.Id, TaskState.Failed, exception.ToString());
}
SlaveClientCom.Instance.LogMessage(exception.Message);
}
catch (TaskNotFoundException ex) {
SlaveStatusInfo.IncrementExceptionOccured();
SlaveClientCom.Instance.LogMessage(ex.ToString());
}
catch (Exception ex) {
SlaveStatusInfo.IncrementExceptionOccured();
SlaveClientCom.Instance.LogMessage(ex.ToString());
}
}
private void taskManager_ExceptionOccured(object sender, EventArgs e) {
SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
SlaveStatusInfo.IncrementExceptionOccured();
heartbeatManager.AwakeHeartBeatThread();
SlaveClientCom.Instance.LogMessage(string.Format("Exception occured for task {0}: {1}", e.Value.TaskId, e.Value2.ToString()));
wcfService.UpdateJobState(e.Value.TaskId, TaskState.Waiting, e.Value2.ToString());
}
private void taskManager_TaskAborted(object sender, EventArgs e) {
SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
}
#endregion
#region Log Events
private void log_MessageAdded(object sender, EventArgs e) {
try {
SlaveClientCom.Instance.LogMessage(e.Value.Split('\t')[1]);
}
catch { }
}
#endregion
///
/// aborts all running jobs, no results are sent back
///
private void DoAbortAll() {
SlaveClientCom.Instance.LogMessage("Aborting all jobs.");
foreach (Guid taskId in taskManager.TaskIds) {
AbortTaskAsync(taskId);
}
}
///
/// wait for jobs to finish, then pause client
///
private void DoPauseAll() {
SlaveClientCom.Instance.LogMessage("Pausing all jobs.");
foreach (Guid taskId in taskManager.TaskIds) {
PauseTaskAsync(taskId);
}
}
///
/// pause slave immediately
///
private void DoStopAll() {
SlaveClientCom.Instance.LogMessage("Stopping all jobs.");
foreach (Guid taskId in taskManager.TaskIds) {
StopTaskAsync(taskId);
}
}
#region Slave Lifecycle Methods
///
/// completly shudown slave
///
public void Shutdown() {
MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
MessageQueue.GetInstance().AddMessage(mc);
waitShutdownSem.WaitOne();
}
///
/// complete shutdown, should be called before the the application is exited
///
private void ShutdownCore() {
SlaveClientCom.Instance.LogMessage("Shutdown signal received");
SlaveClientCom.Instance.LogMessage("Stopping heartbeat");
heartbeatManager.StopHeartBeat();
abortRequested = true;
DoAbortAll();
SlaveClientCom.Instance.LogMessage("Logging out");
WcfService.Instance.Disconnect();
SlaveClientCom.Instance.ClientCom.Shutdown();
SlaveClientCom.Close();
if (startClientComService) {
if (slaveComm.State != CommunicationState.Closed)
slaveComm.Close();
}
}
///
/// reinitializes everything and continues operation,
/// can be called after Sleep()
///
private void DoStartSlave() {
SlaveClientCom.Instance.LogMessage("Restart received");
configManager.Asleep = false;
}
///
/// stop slave, except for client gui communication,
/// primarily used by gui if core is running as windows service
///
private void Sleep() {
SlaveClientCom.Instance.LogMessage("Sleep received - not accepting any new jobs");
configManager.Asleep = true;
DoPauseAll();
}
#endregion
}
}