#region License Information
/* HeuristicLab
* Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using HeuristicLab.Core;
using HeuristicLab.Hive.Contracts;
using HeuristicLab.Hive.Contracts.BusinessObjects;
using HeuristicLab.Hive.Contracts.ResponseObjects;
using HeuristicLab.Hive.Slave.Common;
using HeuristicLab.Hive.Slave.Communication;
using HeuristicLab.Hive.Slave.Communication.ServerService;
using HeuristicLab.Hive.Slave.Core.ClientConsoleService;
using HeuristicLab.Hive.Slave.Core.ConfigurationManager;
using HeuristicLab.Hive.Slave.Core.JobStorage;
using HeuristicLab.Hive.Slave.ExecutionEngine;
using HeuristicLab.Tracing;
namespace HeuristicLab.Hive.Slave.Core {
///
/// The core component of the Hive Client
///
public class Core : MarshalByRefObject {
public static bool abortRequested { get; set; }
private bool _currentlyFetching;
private bool CurrentlyFetching {
get {
return _currentlyFetching;
}
set {
_currentlyFetching = value;
Logger.Debug("Set CurrentlyFetching to " + _currentlyFetching);
}
}
private Dictionary engines = new Dictionary();
private Dictionary appDomains = new Dictionary();
private Dictionary jobs = new Dictionary();
private WcfService wcfService;
private Heartbeat beat;
///
/// Main Method for the client
///
public void Start() {
abortRequested = false;
Logger.Info("Hive Slave started");
SlaveConsoleServer server = new SlaveConsoleServer();
server.StartClientConsoleServer(new Uri("net.tcp://127.0.0.1:8000/SlaveConsole/"));
ConfigManager manager = ConfigManager.Instance;
manager.Core = this;
//Register all Wcf Service references
wcfService = WcfService.Instance;
wcfService.LoginCompleted += new EventHandler(wcfService_LoginCompleted);
wcfService.GetJobCompleted += new EventHandler(wcfService_GetJobCompleted);
wcfService.GetFinishedJobResultCompleted += new EventHandler(wcfService_StoreFinishedJobResultCompleted);
wcfService.ProcessSnapshotCompleted += new EventHandler(wcfService_ProcessSnapshotCompleted);
wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
wcfService.Connected += new EventHandler(wcfService_Connected);
//Recover Server IP and Port from the Settings Framework
ConnectionContainer cc = ConfigManager.Instance.GetServerIPAndPort();
if (cc.IPAdress != String.Empty && cc.Port != 0)
wcfService.SetIPAndPort(cc.IPAdress, cc.Port);
//Initialize the heartbeat
beat = new Heartbeat { Interval = new TimeSpan(0, 0, 10) };
beat.StartHeartbeat();
MessageQueue queue = MessageQueue.GetInstance();
//Main processing loop
//Todo: own thread for message handling
//Rly?!
while (!abortRequested) {
MessageContainer container = queue.GetMessage();
DetermineAction(container);
}
Logger.Info("Program shutdown");
}
///
/// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
///
/// The Container, containing the message
private void DetermineAction(MessageContainer container) {
Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
switch (container.Message) {
//Server requests to abort a job
case MessageContainer.MessageType.AbortJob:
if (engines.ContainsKey(container.JobId))
engines[container.JobId].Abort();
else
Logger.Error("AbortJob: Engine doesn't exist");
break;
//Job has been successfully aborted
case MessageContainer.MessageType.JobAborted:
Guid jobId = new Guid(container.JobId.ToString());
KillAppDomain(jobId);
break;
//Request a Snapshot from the Execution Engine
case MessageContainer.MessageType.RequestSnapshot:
if (engines.ContainsKey(container.JobId))
engines[container.JobId].RequestSnapshot();
else
Logger.Error("RequestSnapshot: Engine with Job doesn't exist");
break;
//Snapshot is ready and can be sent back to the Server
case MessageContainer.MessageType.SnapshotReady:
ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);
break;
//Pull a Job from the Server
case MessageContainer.MessageType.FetchJob:
if (!CurrentlyFetching) {
wcfService.GetJobAsync(ConfigManager.Instance.GetClientInfo().Id);
CurrentlyFetching = true;
} else
Logger.Info("Currently fetching, won't fetch this time!");
break;
//A Job has finished and can be sent back to the server
case MessageContainer.MessageType.FinishedJob:
ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);
break;
//When the timeslice is up
case MessageContainer.MessageType.UptimeLimitDisconnect:
Logger.Info("Uptime Limit reached, storing jobs and sending them back");
ShutdownRunningJobsAndSubmitSnapshots();
break;
//Fetch or Force Fetch Calendar!
case MessageContainer.MessageType.FetchOrForceFetchCalendar:
Logger.Info("Fetch Calendar from Server");
FetchCalendarFromServer();
break;
//Hard shutdown of the client
case MessageContainer.MessageType.Shutdown:
Logger.Info("Shutdown Signal received");
lock (engines) {
Logger.Debug("engines locked");
foreach (KeyValuePair kvp in appDomains) {
Logger.Debug("Shutting down Appdomain for " + kvp.Key);
appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
AppDomain.Unload(kvp.Value);
}
}
Logger.Debug("Stopping heartbeat");
abortRequested = true;
beat.StopHeartBeat();
Logger.Debug("Logging out");
WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
break;
}
}
private void ShutdownRunningJobsAndSubmitSnapshots() {
//check if there are running jobs
if (engines.Count > 0) {
//make sure there is no more fetching of jobs while the snapshots get processed
CurrentlyFetching = true;
//request a snapshot of each running job
foreach (KeyValuePair kvp in engines) {
kvp.Value.RequestSnapshot();
}
}
}
//Asynchronous Threads for interaction with the Execution Engine
#region Async Threads for the EE
///
/// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
/// once the connection gets reestablished, the job gets submitted
///
///
private void GetFinishedJob(object jobId) {
Guid jId = (Guid)jobId;
Logger.Info("Getting the finished job with id: " + jId);
try {
if (!engines.ContainsKey(jId)) {
Logger.Info("Engine doesn't exist");
return;
}
byte[] sJob = engines[jId].GetFinishedJob();
if (WcfService.Instance.ConnState == NetworkEnum.WcfConnState.Loggedin) {
Logger.Info("Sending the finished job with id: " + jId);
wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, 1.0, engines[jId].CurrentException, true);
} else {
Logger.Info("Storing the finished job with id: " + jId + " to hdd");
JobStorageManager.PersistObjectToDisc(wcfService.ServerIP, wcfService.ServerPort, jId, sJob);
KillAppDomain(jId);
}
}
catch (InvalidStateException ise) {
Logger.Error("Invalid State while Snapshoting:", ise);
}
}
private void GetSnapshot(object jobId) {
Logger.Info("Fetching a snapshot for job " + jobId);
Guid jId = (Guid)jobId;
byte[] obj = engines[jId].GetSnapshot();
Logger.Debug("BEGIN: Sending snapshot sync");
wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
jId,
obj,
engines[jId].Progress,
null);
Logger.Debug("END: Sended snapshot sync");
//Uptime Limit reached, now is a good time to destroy this jobs.
Logger.Debug("Checking if uptime limit is reached");
if (!UptimeManager.Instance.IsOnline()) {
Logger.Debug("Uptime limit reached");
Logger.Debug("Killing Appdomain");
KillAppDomain(jId);
//Still anything running?
if (engines.Count == 0) {
Logger.Info("All jobs snapshotted and sent back, disconnecting");
WcfService.Instance.Disconnect();
} else {
Logger.Debug("There are still active Jobs in the Field, not disconnecting");
}
} else {
Logger.Debug("Restarting the job" + jobId);
engines[jId].StartOnlyJob();
Logger.Info("Restarted the job" + jobId);
}
}
#endregion
//Eventhandlers for the communication with the wcf Layer
#region wcfService Events
///
/// Login has returned
///
///
///
void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
if (e.Result.StatusMessage == ResponseStatus.Ok) {
CurrentlyFetching = false;
Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
} else
Logger.Error("Error during login: " + e.Result.StatusMessage.ToString());
}
///
/// A new Job from the wcfService has been received and will be started within a AppDomain.
///
///
///
void wcfService_GetJobCompleted(object sender, GetJobCompletedEventArgs e) {
if (e.Result.StatusMessage != ResponseStatus.GetJob_NoJobsAvailable) {
Logger.Info("Received new job with id " + e.Result.Obj.Id);
bool sandboxed = false;
Logger.Debug("Fetching plugins for job " + e.Result.Obj.Id);
PluginCache.Instance.PreparePlugins(e.Result.Obj.PluginsNeeded);
PluginCache.Instance.CopyPluginsForJob(e.Result.Obj.PluginsNeeded, e.Result.Obj.Id);
// foreach (CachedHivePluginInfoDto plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
// files.AddRange(plugininfo.PluginFiles);
Logger.Debug("Plugins fetched for job " + e.Result.Obj.Id);
try {
String pluginDir = Path.Combine(PluginCache.PLUGIN_REPO, e.Result.Obj.Id.ToString());
AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(pluginDir, null);
appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
lock (engines) {
if (!jobs.ContainsKey(e.Result.Obj.Id)) {
jobs.Add(e.Result.Obj.Id, e.Result.Obj);
appDomains.Add(e.Result.Obj.Id, appDomain);
Logger.Debug("Creating AppDomain");
Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
Logger.Debug("Created AppDomain");
engine.JobId = e.Result.Obj.Id;
engine.Queue = MessageQueue.GetInstance();
Logger.Debug("Starting Engine for job " + e.Result.Obj.Id);
engine.Start(e.Data);
engines.Add(e.Result.Obj.Id, engine);
SlaveStatusInfo.JobsFetched++;
Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
}
}
}
catch (Exception exception) {
Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Obj.Id);
Logger.Error("Error thrown is: ", exception);
CurrentlyFetching = false;
KillAppDomain(e.Result.Obj.Id);
wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, 1, exception, true);
}
} else {
Logger.Info("No more jobs left!");
}
CurrentlyFetching = false;
}
///
/// A finished job has been stored on the server
///
///
///
void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
Logger.Info("Job submitted with id " + e.Result.JobId);
KillAppDomain(e.Result.JobId);
if (e.Result.StatusMessage == ResponseStatus.Ok) {
SlaveStatusInfo.JobsProcessed++;
Logger.Info("Increased ProcessedJobs to:" + SlaveStatusInfo.JobsProcessed);
} else {
Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
}
}
///
/// A snapshot has been stored on the server
///
///
///
void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
}
///
/// The server has been changed. All Appdomains and Jobs must be aborted!
///
///
///
void wcfService_ServerChanged(object sender, EventArgs e) {
Logger.Info("ServerChanged has been called");
lock (engines) {
foreach (KeyValuePair entries in engines) {
engines[entries.Key].Abort();
//appDomains[entries.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
//AppDomain.Unload(appDomains[entries.Key]);
}
//appDomains = new Dictionary();
//engines = new Dictionary();
//jobs = new Dictionary();
}
}
///
/// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
///
///
///
void wcfService_Connected(object sender, EventArgs e) {
Logger.Info("WCF Service got a connection");
if (!UptimeManager.Instance.CalendarAvailable) {
Logger.Info("No local calendar available, fetch it");
FetchCalendarFromServer();
}
//if the fetching from the server failed - still set the client online... maybe we get
//a result within the next few heartbeats
if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline()) {
Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsOnline());
Logger.Info("Setting client online");
wcfService.LoginSync(ConfigManager.Instance.GetClientInfo());
JobStorageManager.CheckAndSubmitJobsFromDisc();
CurrentlyFetching = false;
}
}
private void FetchCalendarFromServer() {
ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
if (calres.StatusMessage == ResponseStatus.Ok) {
if (UptimeManager.Instance.SetAppointments(false, calres)) {
Logger.Info("Remote calendar installed");
wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
} else {
Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
}
} else {
Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
}
}
//this is a little bit tricky -
void wcfService_ConnectionRestored(object sender, EventArgs e) {
Logger.Info("Reconnected to old server - checking currently running appdomains");
foreach (KeyValuePair execKVP in engines) {
if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
finThread.Start(execKVP.Value.JobId);
}
}
}
#endregion
public Dictionary GetExecutionEngines() {
return engines;
}
void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
}
internal Dictionary GetJobs() {
return jobs;
}
///
/// Kill a appdomain with a specific id.
///
/// the GUID of the job
private void KillAppDomain(Guid id) {
Logger.Debug("Shutting down Appdomain for Job " + id);
lock (engines) {
try {
if (engines.ContainsKey(id))
engines[id].Dispose();
if (appDomains.ContainsKey(id)) {
appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
AppDomain.Unload(appDomains[id]);
appDomains.Remove(id);
}
engines.Remove(id);
jobs.Remove(id);
PluginCache.Instance.DeletePluginsForJob(id);
GC.Collect();
}
catch (Exception ex) {
Logger.Error("Exception when unloading the appdomain: ", ex);
}
}
GC.Collect();
}
}
}