#region License Information
/* HeuristicLab
* Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using HeuristicLab.Common;
using HeuristicLab.Core;
using HeuristicLab.Hive.Contracts;
using HeuristicLab.Hive.Contracts.BusinessObjects;
using HeuristicLab.Hive.Contracts.ResponseObjects;
using HeuristicLab.Hive.Slave.Common;
using HeuristicLab.Hive.Slave.Communication;
using HeuristicLab.Hive.Slave.Communication.SlaveFacade;
using HeuristicLab.Hive.Slave.Core.ConfigurationManager;
using HeuristicLab.Hive.Slave.Core.JobStorage;
using HeuristicLab.Hive.Slave.Core.SlaveConsoleService;
using HeuristicLab.Hive.Slave.ExecutionEngine;
namespace HeuristicLab.Hive.Slave.Core {
///
/// The core component of the Hive Client
///
public class Core : MarshalByRefObject {
public static bool abortRequested { get; set; }
public static ILog Log { get; set; }
private Dictionary engines = new Dictionary();
private Dictionary appDomains = new Dictionary();
private Dictionary jobs = new Dictionary();
private WcfService wcfService;
private HeartbeatManager heartbeatManager;
private bool currentlyFetching;
private bool CurrentlyFetching {
get {
return currentlyFetching;
}
set {
currentlyFetching = value;
Logger.Debug("Set CurrentlyFetching to " + currentlyFetching);
}
}
public Dictionary ExecutionEngines {
get { return engines; }
}
internal Dictionary Jobs {
get { return jobs; }
}
///
/// Main Method for the client
///
public void Start() {
abortRequested = false;
Logger.Info("Hive Slave started");
SlaveConsoleServer server = new SlaveConsoleServer();
server.Start();
ConfigManager manager = ConfigManager.Instance;
manager.Core = this;
wcfService = WcfService.Instance;
RegisterServiceEvents();
RecoverSettings(); // recover server IP from the settings framework
StartHeartbeats(); // Start heartbeats thread
DispatchMessageQueue(); // dispatch messages until abortRequested
DeRegisterServiceEvents();
server.Close();
Logger.Info("Program shutdown");
}
private void RecoverSettings() {
ConnectionContainer cc = ConfigManager.Instance.GetServerIP();
if (cc.IPAdress != String.Empty) {
wcfService.ServerIp = cc.IPAdress;
}
}
private void StartHeartbeats() {
//Initialize the heartbeat
heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
heartbeatManager.StartHeartbeat();
}
private void DispatchMessageQueue() {
MessageQueue queue = MessageQueue.GetInstance();
while (!abortRequested) {
MessageContainer container = queue.GetMessage();
DetermineAction(container);
}
}
private void RegisterServiceEvents() {
wcfService.GetJobCompleted += new EventHandler(wcfService_GetJobCompleted);
wcfService.GetFinishedJobResultCompleted += new EventHandler(wcfService_StoreFinishedJobResultCompleted);
wcfService.ProcessSnapshotCompleted += new EventHandler(wcfService_ProcessSnapshotCompleted);
wcfService.Connected += new EventHandler(wcfService_Connected);
}
private void DeRegisterServiceEvents() {
wcfService.GetJobCompleted -= new EventHandler(wcfService_GetJobCompleted);
wcfService.GetFinishedJobResultCompleted -= new EventHandler(wcfService_StoreFinishedJobResultCompleted);
wcfService.ProcessSnapshotCompleted -= new EventHandler(wcfService_ProcessSnapshotCompleted);
wcfService.Connected -= new EventHandler(wcfService_Connected);
}
///
/// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
///
/// The Container, containing the message
private void DetermineAction(MessageContainer container) {
Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
switch (container.Message) {
//Server requests to abort a job
case MessageContainer.MessageType.AbortJob:
if (engines.ContainsKey(container.JobId))
try {
engines[container.JobId].Abort();
}
catch (AppDomainUnloadedException) {
// appdomain already unloaded. Finishing job probably ongoing
}
else
Logger.Error("AbortJob: Engine doesn't exist");
break;
//Job has been successfully aborted
case MessageContainer.MessageType.JobAborted:
Guid jobId = new Guid(container.JobId.ToString());
KillAppDomain(jobId);
break;
//Request a Snapshot from the Execution Engine
case MessageContainer.MessageType.RequestSnapshot:
if (engines.ContainsKey(container.JobId))
engines[container.JobId].RequestSnapshot();
else
Logger.Error("RequestSnapshot: Engine with Job doesn't exist");
break;
//Snapshot is ready and can be sent back to the Server
case MessageContainer.MessageType.SnapshotReady:
GetSnapshot(container.JobId);
break;
//Pull a Job from the Server
case MessageContainer.MessageType.FetchJob:
if (!CurrentlyFetching) {
wcfService.GetJobAsync(ConfigManager.Instance.GetClientInfo().Id);
CurrentlyFetching = true;
} else
Logger.Info("Currently fetching, won't fetch this time!");
break;
//A Job has finished and can be sent back to the server
case MessageContainer.MessageType.FinishedJob:
SendFinishedJob(container.JobId);
break;
//When the timeslice is up
case MessageContainer.MessageType.UptimeLimitDisconnect:
Logger.Info("Uptime Limit reached, storing jobs and sending them back");
ShutdownRunningJobsAndSubmitSnapshots();
break;
//Fetch or Force Fetch Calendar!
case MessageContainer.MessageType.FetchOrForceFetchCalendar:
Logger.Info("Fetch Calendar from Server");
FetchCalendarFromServer();
break;
//Hard shutdown of the client
case MessageContainer.MessageType.Shutdown:
Logger.Info("Shutdown Signal received");
lock (engines) {
Logger.Debug("engines locked");
foreach (KeyValuePair kvp in appDomains) {
Logger.Debug("Shutting down Appdomain for " + kvp.Key);
appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
AppDomain.Unload(kvp.Value);
}
}
Logger.Debug("Stopping heartbeat");
abortRequested = true;
heartbeatManager.StopHeartBeat();
Logger.Debug("Logging out");
WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
break;
case MessageContainer.MessageType.AddChildJob:
AddChildJob((MessageContainerWithJob)container);
break;
case MessageContainer.MessageType.PauseJob:
// send the job back to hive
PauseJob((MessageContainerWithJob)container);
break;
case MessageContainer.MessageType.GetChildJobs:
GetChildJobs((MessageContainerWithCallback)container);
break;
case MessageContainer.MessageType.DeleteChildJobs:
wcfService.DeleteChildJobs(container.JobId);
break;
}
}
private void GetChildJobs(MessageContainerWithCallback mc) {
ResponseObject response = wcfService.GetChildJobs(mc.JobId);
if (response != null && response.StatusMessage == ResponseStatus.Ok) {
mc.Callback(response.Obj);
} else {
if (response != null) {
Logger.Error(string.Format("GetChildJobs failed: {0}", response.StatusMessage));
} else {
Logger.Error("GetChildJobs failed.");
}
}
}
private void PauseJob(MessageContainerWithJob mc) {
ResponseObject response = wcfService.PauseJob(mc.SerializedJob);
KillAppDomain(mc.JobId);
if (response == null || response.StatusMessage != ResponseStatus.Ok) {
Logger.Error("PauseJob failed: " + response.StatusMessage);
}
}
private ResponseObject AddChildJob(MessageContainerWithJob mc) {
ResponseObject response = wcfService.AddChildJob(mc.JobId, mc.SerializedJob);
if (response == null || response.StatusMessage != ResponseStatus.Ok) {
Logger.Error("AddChildJob failed: " + response.StatusMessage);
}
return response;
}
private void ShutdownRunningJobsAndSubmitSnapshots() {
//check if there are running jobs
if (engines.Count > 0) {
//make sure there is no more fetching of jobs while the snapshots get processed
CurrentlyFetching = true;
//request a snapshot of each running job
foreach (KeyValuePair kvp in engines) {
kvp.Value.RequestSnapshot();
}
}
}
//Asynchronous Threads for interaction with the Execution Engine
#region Async Threads for the EE
///
/// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
/// once the connection gets reestablished, the job gets submitted
///
///
private void SendFinishedJob(object jobId) {
try {
Guid jId = (Guid)jobId;
Logger.Info("Getting the finished job with id: " + jId);
if (!engines.ContainsKey(jId)) {
Logger.Info("Engine doesn't exist");
return;
}
byte[] sJob = engines[jId].GetFinishedJob();
try {
Logger.Info("Sending the finished job with id: " + jId);
wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, engines[jId].ExecutionTime, engines[jId].CurrentException, true);
}
catch (Exception e) {
Logger.Info("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")");
JobStorageManager.PersistObjectToDisc(wcfService.ServerIp, 0, jId, sJob); // [chn] Port is not unique anymore (since we need two ports for http and net.tcp-streaming). also the port is now specified only in app.config. use port 0 for the moment
}
finally {
KillAppDomain(jId); // kill app-domain in every case
}
}
catch (Exception e) {
OnExceptionOccured(e);
}
}
private void GetSnapshot(object jobId) {
try {
Logger.Info("Fetching a snapshot for job " + jobId);
Guid jId = (Guid)jobId;
byte[] obj = engines[jId].GetSnapshot();
wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id, jId, obj, engines[jId].ExecutionTime, null);
//Uptime Limit reached, now is a good time to destroy this jobs.
Logger.Debug("Checking if uptime limit is reached");
if (!UptimeManager.Instance.IsAllowedToCalculate()) {
Logger.Debug("Uptime limit reached");
Logger.Debug("Killing Appdomain");
KillAppDomain(jId);
//Still anything running?
if (engines.Count == 0) {
Logger.Info("All jobs snapshotted and sent back, disconnecting");
WcfService.Instance.Disconnect();
} else {
Logger.Debug("There are still active Jobs in the Field, not disconnecting");
}
} else {
Logger.Debug("Restarting the job" + jobId);
engines[jId].StartOnlyJob();
Logger.Info("Restarted the job" + jobId);
}
}
catch (Exception e) {
OnExceptionOccured(e);
}
}
#endregion
//Eventhandlers for the communication with the wcf Layer
#region wcfService Events
///
/// Login has returned
///
///
///
void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
if (e.Result.StatusMessage == ResponseStatus.Ok) {
CurrentlyFetching = false;
Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
} else
Logger.Error("Error during login: " + e.Result.StatusMessage.ToString());
}
///
/// A new Job from the wcfService has been received and will be started within a AppDomain.
///
///
///
void wcfService_GetJobCompleted(object sender, GetJobCompletedEventArgs e) {
if (e.Result.StatusMessage != ResponseStatus.GetJob_NoJobsAvailable) {
Logger.Info("Received new job with id " + e.Result.Obj.Id);
Logger.Debug("Fetching plugins for job " + e.Result.Obj.Id);
try {
PluginCache.Instance.PreparePlugins(e.Result.Obj.PluginsNeeded);
PluginCache.Instance.CopyPluginsForJob(e.Result.Obj.PluginsNeeded, e.Result.Obj.Id);
Logger.Debug("Plugins fetched for job " + e.Result.Obj.Id);
String pluginDir = Path.Combine(PluginCache.Instance.PluginRepositoryDir, e.Result.Obj.Id.ToString());
AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(pluginDir, null);
appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
lock (engines) {
if (!jobs.ContainsKey(e.Result.Obj.Id)) {
jobs.Add(e.Result.Obj.Id, e.Result.Obj);
appDomains.Add(e.Result.Obj.Id, appDomain);
Logger.Debug("Creating AppDomain");
Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
Logger.Debug("Created AppDomain");
engine.JobId = e.Result.Obj.Id;
engine.Queue = MessageQueue.GetInstance();
Logger.Debug("Starting Engine for job " + e.Result.Obj.Id);
engine.Start(e.Data);
engines.Add(e.Result.Obj.Id, engine);
SlaveStatusInfo.JobsFetched++;
Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
}
}
heartbeatManager.AwakeHeartBeatThread();
}
catch (Exception exception) {
Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Obj.Id);
Logger.Error("Error thrown is: ", exception);
CurrentlyFetching = false;
KillAppDomain(e.Result.Obj.Id);
wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, e.Result.Obj.ExecutionTime, exception.ToString(), true);
}
} else {
Logger.Info("No more jobs left!");
}
CurrentlyFetching = false;
}
///
/// A finished job has been stored on the server
///
///
///
void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
Logger.Info("Job submitted with id " + e.Result.JobId);
KillAppDomain(e.Result.JobId);
if (e.Result.StatusMessage == ResponseStatus.Ok) {
SlaveStatusInfo.JobsProcessed++;
Logger.Info("Increased ProcessedJobs to:" + SlaveStatusInfo.JobsProcessed);
heartbeatManager.AwakeHeartBeatThread();
} else {
Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
}
}
///
/// A snapshot has been stored on the server
///
///
///
void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
}
///
/// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
///
///
///
void wcfService_Connected(object sender, EventArgs e) {
Logger.Info("WCF Service got a connection");
if (!UptimeManager.Instance.CalendarAvailable) {
Logger.Info("No local calendar available, fetch it");
FetchCalendarFromServer();
}
Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsAllowedToCalculate());
CurrentlyFetching = false;
CheckRunningAppDomains();
JobStorageManager.CheckAndSubmitJobsFromDisc();
}
private void FetchCalendarFromServer() {
ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
if (calres.StatusMessage == ResponseStatus.Ok) {
if (UptimeManager.Instance.SetAppointments(false, calres)) {
Logger.Info("Remote calendar installed");
wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
} else {
Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
}
} else {
Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
}
}
private void CheckRunningAppDomains() {
foreach (KeyValuePair execKVP in engines) {
if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
Thread finThread = new Thread(new ParameterizedThreadStart(SendFinishedJob));
finThread.Start(execKVP.Value.JobId);
}
}
}
#endregion
public event EventHandler> ExceptionOccured;
private void OnExceptionOccured(Exception e) {
Logger.Error("Error: " + e.ToString());
var handler = ExceptionOccured;
if (handler != null) handler(this, new EventArgs(e));
}
void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
}
///
/// Kill a appdomain with a specific id.
///
/// the GUID of the job
private void KillAppDomain(Guid id) {
Logger.Debug("Shutting down Appdomain for Job " + id);
lock (engines) {
try {
if (engines.ContainsKey(id))
engines[id].Dispose();
if (appDomains.ContainsKey(id)) {
appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
int repeat = 5;
while (repeat > 0) {
try {
AppDomain.Unload(appDomains[id]);
repeat = 0;
}
catch (CannotUnloadAppDomainException) {
Logger.Error("Could not unload AppDomain, will try again in 1 sec.");
Thread.Sleep(1000);
repeat--;
if (repeat == 0) {
throw; // rethrow and let app crash
}
}
}
appDomains.Remove(id);
}
engines.Remove(id);
jobs.Remove(id);
PluginCache.Instance.DeletePluginsForJob(id);
GC.Collect();
}
catch (Exception ex) {
Logger.Error("Exception when unloading the appdomain: ", ex);
}
}
GC.Collect();
}
}
}