#region License Information
/* HeuristicLab
* Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Diagnostics;
using System.ServiceModel;
using System.Threading;
using System.Threading.Tasks;
using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
using HeuristicLab.Common;
using HeuristicLab.Core;
namespace HeuristicLab.Clients.Hive.SlaveCore {
///
/// The core component of the Hive Slave.
/// Handles commands sent from the Hive Server and does all webservice calls for jobs.
///
public class Core : MarshalByRefObject {
private static HeartbeatManager heartbeatManager;
public static HeartbeatManager HeartbeatManager {
get { return heartbeatManager; }
}
public EventLog ServiceEventLog { get; set; }
private Semaphore waitShutdownSem = new Semaphore(0, 1);
private bool abortRequested;
private ISlaveCommunication clientCom;
private ServiceHost slaveComm;
private WcfService wcfService;
private JobManager jobManager;
private ConfigManager configManager;
private PluginManager pluginManager;
public Core() {
var log = new ThreadSafeLog(new Log());
this.pluginManager = new PluginManager(WcfService.Instance, log);
this.jobManager = new JobManager(pluginManager, log);
log.MessageAdded += new EventHandler>(log_MessageAdded);
RegisterJobManagerEvents();
this.configManager = new ConfigManager(jobManager);
ConfigManager.Instance = this.configManager;
}
///
/// Main method for the client
///
public void Start() {
abortRequested = false;
try {
//start the client communication service (pipe between slave and slave gui)
slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
slaveComm.Open();
clientCom = SlaveClientCom.Instance.ClientCom;
// delete all left over job directories
pluginManager.CleanPluginTemp();
clientCom.LogMessage("Hive Slave started");
wcfService = WcfService.Instance;
RegisterServiceEvents();
StartHeartbeats(); // Start heartbeats thread
DispatchMessageQueue(); // dispatch messages until abortRequested
}
catch (Exception ex) {
if (ServiceEventLog != null) {
try {
ServiceEventLog.WriteEntry(string.Format("Hive Slave threw exception: {0} with stack trace: {1}", ex.ToString(), ex.StackTrace), EventLogEntryType.Error);
}
catch (Exception) { }
} else {
//try to log with clientCom. if this works the user sees at least a message,
//else an exception will be thrown anyways.
clientCom.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
}
ShutdownCore();
}
finally {
DeregisterServiceEvents();
waitShutdownSem.Release();
}
}
private void StartHeartbeats() {
//Initialize the heartbeat
if (heartbeatManager == null) {
heartbeatManager = new HeartbeatManager();
heartbeatManager.StartHeartbeat();
}
}
private void DispatchMessageQueue() {
MessageQueue queue = MessageQueue.GetInstance();
while (!abortRequested) {
MessageContainer container = queue.GetMessage();
DetermineAction(container);
if (!abortRequested) {
clientCom.StatusChanged(configManager.GetStatusForClientConsole());
}
}
}
private void RegisterServiceEvents() {
WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
WcfService.Instance.ExceptionOccured += new EventHandler>(WcfService_ExceptionOccured);
}
private void DeregisterServiceEvents() {
WcfService.Instance.Connected -= WcfService_Connected;
WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
}
private void WcfService_ExceptionOccured(object sender, EventArgs e) {
clientCom.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
}
private void WcfService_Connected(object sender, EventArgs e) {
clientCom.LogMessage("Connected successfully to Hive server");
}
///
/// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
///
/// The container, containing the message
private void DetermineAction(MessageContainer container) {
clientCom.LogMessage(string.Format("Message: {0} for job: {1} ", container.Message.ToString(), container.JobId));
if (container is ExecutorMessageContainer) {
ExecutorMessageContainer c = (ExecutorMessageContainer)container;
c.execute();
} else if (container is MessageContainer) {
switch (container.Message) {
case MessageContainer.MessageType.CalculateJob:
CalculateJobAsync(container.JobId);
break;
case MessageContainer.MessageType.AbortJob:
AbortJobAsync(container.JobId);
break;
case MessageContainer.MessageType.StopJob:
StopJobAsync(container.JobId);
break;
case MessageContainer.MessageType.PauseJob:
PauseJobAsync(container.JobId);
break;
case MessageContainer.MessageType.StopAll:
DoStopAll();
break;
case MessageContainer.MessageType.PauseAll:
DoPauseAll();
break;
case MessageContainer.MessageType.AbortAll:
DoAbortAll();
break;
case MessageContainer.MessageType.ShutdownSlave:
ShutdownCore();
break;
case MessageContainer.MessageType.Restart:
DoStartSlave();
break;
case MessageContainer.MessageType.Sleep:
Sleep();
break;
case MessageContainer.MessageType.SayHello:
wcfService.Connect(configManager.GetClientInfo());
break;
}
} else {
clientCom.LogMessage("Unknown MessageContainer: " + container);
}
}
private void CalculateJobAsync(Guid jobId) {
Task.Factory.StartNew(HandleCalculateJob, jobId)
.ContinueWith((t) => {
SlaveStatusInfo.IncrementExceptionOccured();
clientCom.LogMessage(t.Exception.ToString());
}, TaskContinuationOptions.OnlyOnFaulted);
}
private void StopJobAsync(Guid jobId) {
Task.Factory.StartNew(HandleStopJob, jobId)
.ContinueWith((t) => {
SlaveStatusInfo.IncrementExceptionOccured();
clientCom.LogMessage(t.Exception.ToString());
}, TaskContinuationOptions.OnlyOnFaulted);
}
private void PauseJobAsync(Guid jobId) {
Task.Factory.StartNew(HandlePauseJob, jobId)
.ContinueWith((t) => {
SlaveStatusInfo.IncrementExceptionOccured();
clientCom.LogMessage(t.Exception.ToString());
}, TaskContinuationOptions.OnlyOnFaulted);
}
private void AbortJobAsync(Guid jobId) {
Task.Factory.StartNew(HandleAbortJob, jobId)
.ContinueWith((t) => {
SlaveStatusInfo.IncrementExceptionOccured();
clientCom.LogMessage(t.Exception.ToString());
}, TaskContinuationOptions.OnlyOnFaulted);
}
private void HandleCalculateJob(object jobIdObj) {
Guid jobId = (Guid)jobIdObj;
Job job = null;
int usedCores = 0;
try {
job = wcfService.GetJob(jobId);
if (job == null) throw new JobNotFoundException(jobId);
if (ConfigManager.Instance.GetFreeCores() < job.CoresNeeded) throw new OutOfCoresException();
if (ConfigManager.GetFreeMemory() < job.MemoryNeeded) throw new OutOfMemoryException();
SlaveStatusInfo.IncrementUsedCores(job.CoresNeeded); usedCores = job.CoresNeeded;
JobData jobData = wcfService.GetJobData(jobId);
if (jobData == null) throw new JobDataNotFoundException(jobId);
job = wcfService.UpdateJobState(jobId, JobState.Calculating, null);
if (job == null) throw new JobNotFoundException(jobId);
jobManager.StartJobAsync(job, jobData);
}
catch (JobNotFoundException) {
SlaveStatusInfo.DecrementUsedCores(usedCores);
throw;
}
catch (JobDataNotFoundException) {
SlaveStatusInfo.DecrementUsedCores(usedCores);
throw;
}
catch (JobAlreadyRunningException) {
SlaveStatusInfo.DecrementUsedCores(usedCores);
throw;
}
catch (OutOfCoresException) {
wcfService.UpdateJobState(jobId, JobState.Waiting, "No more cores available");
throw;
}
catch (OutOfMemoryException) {
wcfService.UpdateJobState(jobId, JobState.Waiting, "No more memory available");
throw;
}
catch (Exception e) {
SlaveStatusInfo.DecrementUsedCores(usedCores);
wcfService.UpdateJobState(jobId, JobState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
throw;
}
}
private void HandleStopJob(object jobIdObj) {
Guid jobId = (Guid)jobIdObj;
try {
Job job = wcfService.GetJob(jobId);
if (job == null) throw new JobNotFoundException(jobId);
jobManager.StopJobAsync(jobId);
}
catch (JobNotFoundException) {
throw;
}
catch (JobNotRunningException) {
throw;
}
catch (AppDomainNotCreatedException) {
throw;
}
}
private void HandlePauseJob(object jobIdObj) {
Guid jobId = (Guid)jobIdObj;
try {
Job job = wcfService.GetJob(jobId);
if (job == null) throw new JobNotFoundException(jobId);
jobManager.PauseJobAsync(jobId);
}
catch (JobNotFoundException) {
throw;
}
catch (JobNotRunningException) {
throw;
}
catch (AppDomainNotCreatedException) {
throw;
}
}
private void HandleAbortJob(object jobIdObj) {
Guid jobId = (Guid)jobIdObj;
try {
jobManager.AbortJob(jobId);
}
catch (JobNotFoundException) {
throw;
}
}
#region JobManager Events
private void RegisterJobManagerEvents() {
this.jobManager.JobStarted += new EventHandler>(jobManager_JobStarted);
this.jobManager.JobPaused += new EventHandler>(jobManager_JobPaused);
this.jobManager.JobStopped += new EventHandler>(jobManager_JobStopped);
this.jobManager.JobFailed += new EventHandler>>(jobManager_JobFailed);
this.jobManager.ExceptionOccured += new EventHandler>(jobManager_ExceptionOccured);
this.jobManager.JobAborted += new EventHandler>(jobManager_JobAborted);
}
private void jobManager_JobStarted(object sender, EventArgs e) {
// successfully started, everything is good
}
private void jobManager_JobPaused(object sender, EventArgs e) {
try {
SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
heartbeatManager.AwakeHeartBeatThread();
Job job = wcfService.GetJob(e.Value.JobId);
if (job == null) throw new JobNotFoundException(e.Value.JobId);
job.ExecutionTime = e.Value.ExecutionTime;
JobData jobData = e.Value.GetJobData();
wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Paused);
}
catch (JobNotFoundException ex) {
clientCom.LogMessage(ex.ToString());
}
catch (Exception ex) {
clientCom.LogMessage(ex.ToString());
}
}
private void jobManager_JobStopped(object sender, EventArgs e) {
try {
SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
heartbeatManager.AwakeHeartBeatThread();
Job job = wcfService.GetJob(e.Value.JobId);
if (job == null) throw new JobNotFoundException(e.Value.JobId);
job.ExecutionTime = e.Value.ExecutionTime;
JobData jobData = e.Value.GetJobData();
wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Finished);
}
catch (JobNotFoundException ex) {
clientCom.LogMessage(ex.ToString());
}
catch (Exception ex) {
clientCom.LogMessage(ex.ToString());
}
}
private void jobManager_JobFailed(object sender, EventArgs> e) {
try {
SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
heartbeatManager.AwakeHeartBeatThread();
SlaveJob slaveJob = e.Value.Item1;
JobData jobData = e.Value.Item2;
Exception exception = e.Value.Item3;
Job job = wcfService.GetJob(slaveJob.JobId);
if (job == null) throw new JobNotFoundException(slaveJob.JobId);
job.ExecutionTime = slaveJob.ExecutionTime;
if (jobData != null) {
wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Failed, exception.ToString());
} else {
wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
}
clientCom.LogMessage(exception.Message);
}
catch (JobNotFoundException ex) {
SlaveStatusInfo.IncrementExceptionOccured();
clientCom.LogMessage(ex.ToString());
}
catch (Exception ex) {
SlaveStatusInfo.IncrementExceptionOccured();
clientCom.LogMessage(ex.ToString());
}
}
private void jobManager_ExceptionOccured(object sender, EventArgs e) {
SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
SlaveStatusInfo.IncrementExceptionOccured();
heartbeatManager.AwakeHeartBeatThread();
clientCom.LogMessage(string.Format("Exception occured for job {0}: {1}", e.Value.JobId, e.Value2.ToString()));
wcfService.UpdateJobState(e.Value.JobId, JobState.Waiting, e.Value2.ToString());
}
private void jobManager_JobAborted(object sender, EventArgs e) {
SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
}
#endregion
#region Log Events
private void log_MessageAdded(object sender, EventArgs e) {
clientCom.LogMessage(e.Value.Split('\t')[1]);
((ILog)sender).Clear(); // don't let the log take up memory
}
#endregion
///
/// aborts all running jobs, no results are sent back
///
private void DoAbortAll() {
clientCom.LogMessage("Aborting all jobs.");
foreach (Guid jobId in jobManager.JobIds) {
AbortJobAsync(jobId);
}
}
///
/// wait for jobs to finish, then pause client
///
private void DoPauseAll() {
clientCom.LogMessage("Pausing all jobs.");
foreach (Guid jobId in jobManager.JobIds) {
PauseJobAsync(jobId);
}
}
///
/// pause slave immediately
///
private void DoStopAll() {
clientCom.LogMessage("Stopping all jobs.");
foreach (Guid jobId in jobManager.JobIds) {
StopJobAsync(jobId);
}
}
#region Slave Lifecycle Methods
///
/// completly shudown slave
///
public void Shutdown() {
MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
MessageQueue.GetInstance().AddMessage(mc);
waitShutdownSem.WaitOne();
}
///
/// complete shutdown, should be called before the the application is exited
///
private void ShutdownCore() {
clientCom.LogMessage("Shutdown signal received");
clientCom.LogMessage("Stopping heartbeat");
heartbeatManager.StopHeartBeat();
abortRequested = true;
DoAbortAll();
clientCom.LogMessage("Logging out");
WcfService.Instance.Disconnect();
clientCom.Shutdown();
SlaveClientCom.Close();
if (slaveComm.State != CommunicationState.Closed)
slaveComm.Close();
}
///
/// reinitializes everything and continues operation,
/// can be called after Sleep()
///
private void DoStartSlave() {
clientCom.LogMessage("Restart received");
configManager.Asleep = false;
}
///
/// stop slave, except for client gui communication,
/// primarily used by gui if core is running as windows service
///
private void Sleep() {
clientCom.LogMessage("Sleep received - not accepting any new jobs");
configManager.Asleep = true;
DoPauseAll();
}
#endregion
}
}