#region License Information
/* HeuristicLab
* Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.ServiceModel;
using System.Threading;
using System.Threading.Tasks;
using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
using HeuristicLab.Common;
using HeuristicLab.Core;
namespace HeuristicLab.Clients.Hive.SlaveCore {
///
/// The core component of the Hive Client
///
public class Core : MarshalByRefObject {
//TODO: this class should be a singleton; there is only one instance, the reference is meanwhile save in TheCore
public static Core TheCore;
public EventLog ServiceEventLog { get; set; }
public static bool abortRequested { get; set; }
private Semaphore waitShutdownSem = new Semaphore(0, 1);
public static ILog Log { get; set; }
private Dictionary engines = new Dictionary();
private Dictionary appDomains = new Dictionary();
private Dictionary jobs = new Dictionary();
private WcfService wcfService;
private HeartbeatManager heartbeatManager;
private int coreThreadId;
private ISlaveCommunication ClientCom;
private ServiceHost slaveComm;
public Dictionary ExecutionEngines {
get { return engines; }
}
internal Dictionary Jobs {
get { return jobs; }
}
public Core() {
TheCore = this;
}
///
/// Main Method for the client
///
public void Start() {
coreThreadId = Thread.CurrentThread.ManagedThreadId;
abortRequested = false;
try {
//start the client communication service (pipe between slave and slave gui)
slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
slaveComm.Open();
ClientCom = SlaveClientCom.Instance.ClientCom;
ClientCom.LogMessage("Hive Slave started");
ConfigManager manager = ConfigManager.Instance;
manager.Core = this;
wcfService = WcfService.Instance;
RegisterServiceEvents();
StartHeartbeats(); // Start heartbeats thread
DispatchMessageQueue(); // dispatch messages until abortRequested
}
catch (Exception ex) {
if (ServiceEventLog != null) {
try {
ServiceEventLog.WriteEntry("Hive Slave threw exception: " + ex.ToString() + " with stack trace: " + ex.StackTrace);
}
catch (Exception) { }
} else {
throw ex;
}
}
finally {
DeRegisterServiceEvents();
waitShutdownSem.Release();
}
}
private void StartHeartbeats() {
//Initialize the heartbeat
if (heartbeatManager != null) {
heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
heartbeatManager.StartHeartbeat();
}
}
private void DispatchMessageQueue() {
MessageQueue queue = MessageQueue.GetInstance();
while (!abortRequested) {
MessageContainer container = queue.GetMessage();
DetermineAction(container);
}
}
private void RegisterServiceEvents() {
WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
WcfService.Instance.ExceptionOccured += new EventHandler>(WcfService_ExceptionOccured);
}
private void DeRegisterServiceEvents() {
WcfService.Instance.Connected -= WcfService_Connected;
WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
}
void WcfService_ExceptionOccured(object sender, EventArgs e) {
ClientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
}
void WcfService_Connected(object sender, EventArgs e) {
ClientCom.LogMessage("Connected successfully to Hive server");
}
///
/// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
///
/// The Container, containing the message
private void DetermineAction(MessageContainer container) {
ClientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
if (container is ExecutorMessageContainer) {
ExecutorMessageContainer c = (ExecutorMessageContainer)container;
c.execute();
} else if (container is MessageContainer) {
switch (container.Message) {
case MessageContainer.MessageType.CalculateJob:
Task.Factory.StartNew(() => {
Job job = wcfService.GetJob(container.JobId);
lock (engines) {
if (!jobs.ContainsKey(job.Id)) {
jobs.Add(job.Id, job);
}
}
JobData jobData = wcfService.GetJobData(job.Id);
job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
StartJobInAppDomain(job, jobData);
});
break;
case MessageContainer.MessageType.ShutdownSlave:
ShutdownCore();
break;
case MessageContainer.MessageType.StopAll:
DoStopAll();
break;
case MessageContainer.MessageType.PauseAll:
DoPauseAll();
break;
case MessageContainer.MessageType.AbortAll:
DoAbortAll();
break;
case MessageContainer.MessageType.AbortJob:
KillAppDomain(container.JobId);
break;
case MessageContainer.MessageType.StopJob:
DoStopJob(container.JobId);
break;
case MessageContainer.MessageType.PauseJob:
DoPauseJob(container.JobId);
break;
case MessageContainer.MessageType.Restart:
DoStartSlave();
break;
case MessageContainer.MessageType.Sleep:
Sleep();
break;
case MessageContainer.MessageType.SayHello:
wcfService.Connect(ConfigManager.Instance.GetClientInfo());
break;
}
} else {
ClientCom.LogMessage("Unknown MessageContainer: " + container);
}
}
private void DoPauseJob(Guid jobId) {
Job job = Jobs[jobId];
if (job != null) {
engines[job.Id].Pause();
JobData sJob = engines[job.Id].GetPausedJob();
// job.Exception = engines[job.Id].CurrentException; // can there be an exception if a job is paused
job.ExecutionTime = engines[job.Id].ExecutionTime;
try {
ClientCom.LogMessage("Sending the paused job with id: " + job.Id);
wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
SlaveStatusInfo.JobsProcessed++; //TODO: count or not count, thats the question
}
catch (Exception e) {
ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
}
finally {
KillAppDomain(job.Id); // kill app-domain in every case
}
}
}
private void DoStopJob(Guid guid) {
Job job = Jobs[guid];
if (job != null) {
engines[job.Id].Stop();
JobData sJob = engines[job.Id].GetFinishedJob();
// job.Exception = engines[job.Id].CurrentException; // can there be an exception if a job is stopped regularly
job.ExecutionTime = engines[job.Id].ExecutionTime;
try {
ClientCom.LogMessage("Sending the stoppped job with id: " + job.Id);
wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
SlaveStatusInfo.JobsProcessed++; //TODO: count or not count, thats the question
}
catch (Exception e) {
ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
}
finally {
KillAppDomain(job.Id); // kill app-domain in every case
}
}
}
///
/// aborts all running jobs, no results are sent back
///
private void DoAbortAll() {
List guids = new List();
foreach (Guid job in Jobs.Keys) {
guids.Add(job);
}
foreach (Guid g in guids) {
KillAppDomain(g);
}
ClientCom.LogMessage("Aborted all jobs!");
}
///
/// wait for jobs to finish, then pause client
///
private void DoPauseAll() {
ClientCom.LogMessage("Pause all received");
//copy guids because there will be removed items from 'Jobs'
List guids = new List();
foreach (Guid job in Jobs.Keys) {
guids.Add(job);
}
foreach (Guid g in guids) {
DoPauseJob(g);
}
}
///
/// pause slave immediately
///
private void DoStopAll() {
ClientCom.LogMessage("Stop all received");
//copy guids because there will be removed items from 'Jobs'
List guids = new List();
foreach (Guid job in Jobs.Keys) {
guids.Add(job);
}
foreach (Guid g in guids) {
DoStopJob(g);
}
}
///
/// completly shudown slave
///
public void Shutdown() {
MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
MessageQueue.GetInstance().AddMessage(mc);
waitShutdownSem.WaitOne();
}
///
/// complete shutdown, should be called before the the application is exited
///
private void ShutdownCore() {
ClientCom.LogMessage("Shutdown Signal received");
ClientCom.LogMessage("Stopping heartbeat");
heartbeatManager.StopHeartBeat();
abortRequested = true;
ClientCom.LogMessage("Logging out");
lock (engines) {
ClientCom.LogMessage("engines locked");
foreach (KeyValuePair kvp in appDomains) {
ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
AppDomain.Unload(kvp.Value);
}
}
WcfService.Instance.Disconnect();
ClientCom.Shutdown();
SlaveClientCom.Close();
if (slaveComm.State != CommunicationState.Closed)
slaveComm.Close();
}
///
/// reinitializes everything and continues operation,
/// can be called after Sleep()
///
private void DoStartSlave() {
ClientCom.LogMessage("Restart received");
StartHeartbeats();
ClientCom.LogMessage("Restart done");
}
///
/// stop slave, except for client gui communication,
/// primarily used by gui if core is running as windows service
///
//TODO: do we need an AbortSleep?
private void Sleep() {
ClientCom.LogMessage("Sleep received");
heartbeatManager.StopHeartBeat();
heartbeatManager = null;
DoStopAll();
WcfService.Instance.Disconnect();
ClientCom.LogMessage("Sleep done");
}
///
/// Pauses a job, which means sending it to the server and killing it locally;
/// atm only used when executor is waiting for child jobs
///
///
[MethodImpl(MethodImplOptions.Synchronized)]
public void PauseWaitJob(JobData data) {
if (!Jobs.ContainsKey(data.JobId)) {
ClientCom.LogMessage("Can't find job with id " + data.JobId);
} else {
Job job = Jobs[data.JobId];
wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
wcfService.UpdateJobState(job.Id, JobState.Waiting, null);
}
KillAppDomain(data.JobId);
}
///
/// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
/// once the connection gets reestablished, the job gets submitted
///
///
[MethodImpl(MethodImplOptions.Synchronized)]
public void SendFinishedJob(Guid jobId) {
try {
ClientCom.LogMessage("Getting the finished job with id: " + jobId);
if (!engines.ContainsKey(jobId)) {
ClientCom.LogMessage("Engine doesn't exist");
return;
}
if (!jobs.ContainsKey(jobId)) {
ClientCom.LogMessage("Job doesn't exist");
return;
}
Job cJob = jobs[jobId];
cJob.ExecutionTime = engines[jobId].ExecutionTime;
JobData sJob = engines[jobId].GetFinishedJob();
// cJob.Exception = engines[jId].CurrentException; // can there be an exception if the job is sent normally. the exception should be entered in the statelog with the corresponding state (Failed)
cJob.ExecutionTime = engines[jobId].ExecutionTime;
try {
ClientCom.LogMessage("Sending the finished job with id: " + jobId);
wcfService.UpdateJobData(cJob, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
SlaveStatusInfo.JobsProcessed++;
}
catch (Exception e) {
ClientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
}
finally {
KillAppDomain(jobId); // kill app-domain in every case
heartbeatManager.AwakeHeartBeatThread();
}
}
catch (Exception e) {
OnExceptionOccured(e);
}
}
///
/// A new Job from the wcfService has been received and will be started within a AppDomain.
///
///
///
private void StartJobInAppDomain(Job myJob, JobData jobData) {
ClientCom.LogMessage("Received new job with id " + myJob.Id);
String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
bool pluginsPrepared = false;
string configFileName = string.Empty;
try {
PluginCache.Instance.PreparePlugins(myJob, out configFileName);
ClientCom.LogMessage("Plugins fetched for job " + myJob.Id);
pluginsPrepared = true;
}
catch (Exception exception) {
ClientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
}
if (pluginsPrepared) {
try {
AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
lock (engines) {
appDomains.Add(myJob.Id, appDomain);
ClientCom.LogMessage("Creating AppDomain");
Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
ClientCom.LogMessage("Created AppDomain");
engine.JobId = myJob.Id;
engine.Core = this;
ClientCom.LogMessage("Starting Engine for job " + myJob.Id);
engines.Add(myJob.Id, engine);
engine.Start(jobData.Data);
SlaveStatusInfo.JobsFetched++;
ClientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
}
}
catch (Exception exception) {
ClientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
ClientCom.LogMessage("Error thrown is: " + exception.ToString());
KillAppDomain(myJob.Id);
}
}
heartbeatManager.AwakeHeartBeatThread();
}
public event EventHandler> ExceptionOccured;
private void OnExceptionOccured(Exception e) {
ClientCom.LogMessage("Error: " + e.ToString());
var handler = ExceptionOccured;
if (handler != null) handler(this, new EventArgs(e));
}
private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
ClientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
KillAppDomain(new Guid(e.ExceptionObject.ToString()));
}
///
/// Enqueues messages from the executor to the message queue.
/// This is necessary if the core thread has to execute certain actions, e.g.
/// killing of an app domain.
///
///
///
///
/// true if the calling method can continue execution, else false
public void EnqueueExecutorMessage(Action action, T parameter) {
ExecutorMessageContainer container = new ExecutorMessageContainer();
container.Callback = action;
container.CallbackParameter = parameter;
MessageQueue.GetInstance().AddMessage(container);
}
///
/// Kill a appdomain with a specific id.
///
/// the GUID of the job
//[MethodImpl(MethodImplOptions.Synchronized)]
public void KillAppDomain(Guid id) {
if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
EnqueueExecutorMessage(KillAppDomain, id);
return;
}
ClientCom.LogMessage("Shutting down Appdomain for Job " + id);
lock (engines) {
try {
if (engines.ContainsKey(id)) {
engines[id].Dispose();
engines.Remove(id);
}
if (appDomains.ContainsKey(id)) {
appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
int repeat = 5;
while (repeat > 0) {
try {
AppDomain.Unload(appDomains[id]);
repeat = 0;
}
catch (CannotUnloadAppDomainException) {
ClientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
Thread.Sleep(1000);
repeat--;
if (repeat == 0) {
throw; // rethrow and let app crash
}
}
}
appDomains.Remove(id);
}
jobs.Remove(id);
PluginCache.Instance.DeletePluginsForJob(id);
GC.Collect();
}
catch (Exception ex) {
ClientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
}
}
GC.Collect();
}
}
}