#region License Information /* HeuristicLab * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.IO; using System.Threading; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Hive.Contracts; using HeuristicLab.Hive.Contracts.BusinessObjects; using HeuristicLab.Hive.Contracts.ResponseObjects; using HeuristicLab.Hive.Slave.Common; using HeuristicLab.Hive.Slave.Communication; using HeuristicLab.Hive.Slave.Communication.SlaveFacade; using HeuristicLab.Hive.Slave.Core.ConfigurationManager; using HeuristicLab.Hive.Slave.Core.JobStorage; using HeuristicLab.Hive.Slave.Core.SlaveConsoleService; using HeuristicLab.Hive.Slave.ExecutionEngine; namespace HeuristicLab.Hive.Slave.Core { /// /// The core component of the Hive Client /// public class Core : MarshalByRefObject { public static bool abortRequested { get; set; } public static ILog Log { get; set; } private Dictionary engines = new Dictionary(); private Dictionary appDomains = new Dictionary(); private Dictionary jobs = new Dictionary(); private WcfService wcfService; private HeartbeatManager heartbeatManager; private bool currentlyFetching; private bool CurrentlyFetching { get { return currentlyFetching; } set { currentlyFetching = value; Logger.Debug("Set CurrentlyFetching to " + currentlyFetching); } } public Dictionary ExecutionEngines { get { return engines; } } internal Dictionary Jobs { get { return jobs; } } /// /// Main Method for the client /// public void Start() { abortRequested = false; Logger.Info("Hive Slave started"); SlaveConsoleServer server = new SlaveConsoleServer(); server.Start(); ConfigManager manager = ConfigManager.Instance; manager.Core = this; wcfService = WcfService.Instance; RegisterServiceEvents(); RecoverSettings(); // recover server IP from the settings framework StartHeartbeats(); // Start heartbeats thread DispatchMessageQueue(); // dispatch messages until abortRequested DeRegisterServiceEvents(); server.Close(); Logger.Info("Program shutdown"); } private void RecoverSettings() { ConnectionContainer cc = ConfigManager.Instance.GetServerIP(); if (cc.IPAdress != String.Empty) { wcfService.ServerIp = cc.IPAdress; } } private void StartHeartbeats() { //Initialize the heartbeat heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) }; heartbeatManager.StartHeartbeat(); } private void DispatchMessageQueue() { MessageQueue queue = MessageQueue.GetInstance(); while (!abortRequested) { MessageContainer container = queue.GetMessage(); DetermineAction(container); } } private void RegisterServiceEvents() { wcfService.GetJobCompleted += new EventHandler(wcfService_GetJobCompleted); wcfService.GetFinishedJobResultCompleted += new EventHandler(wcfService_StoreFinishedJobResultCompleted); wcfService.ProcessSnapshotCompleted += new EventHandler(wcfService_ProcessSnapshotCompleted); wcfService.Connected += new EventHandler(wcfService_Connected); } private void DeRegisterServiceEvents() { wcfService.GetJobCompleted -= new EventHandler(wcfService_GetJobCompleted); wcfService.GetFinishedJobResultCompleted -= new EventHandler(wcfService_StoreFinishedJobResultCompleted); wcfService.ProcessSnapshotCompleted -= new EventHandler(wcfService_ProcessSnapshotCompleted); wcfService.Connected -= new EventHandler(wcfService_Connected); } /// /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions /// /// The Container, containing the message private void DetermineAction(MessageContainer container) { Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId); switch (container.Message) { //Server requests to abort a job case MessageContainer.MessageType.AbortJob: if (engines.ContainsKey(container.JobId)) try { engines[container.JobId].Abort(); } catch (AppDomainUnloadedException) { // appdomain already unloaded. Finishing job probably ongoing } else Logger.Error("AbortJob: Engine doesn't exist"); break; //Job has been successfully aborted case MessageContainer.MessageType.JobAborted: Guid jobId = new Guid(container.JobId.ToString()); KillAppDomain(jobId); break; //Request a Snapshot from the Execution Engine case MessageContainer.MessageType.RequestSnapshot: if (engines.ContainsKey(container.JobId)) engines[container.JobId].RequestSnapshot(); else Logger.Error("RequestSnapshot: Engine with Job doesn't exist"); break; //Snapshot is ready and can be sent back to the Server case MessageContainer.MessageType.SnapshotReady: GetSnapshot(container.JobId); break; //Pull a Job from the Server case MessageContainer.MessageType.FetchJob: if (!CurrentlyFetching) { wcfService.GetJobAsync(ConfigManager.Instance.GetClientInfo().Id); CurrentlyFetching = true; } else Logger.Info("Currently fetching, won't fetch this time!"); break; //A Job has finished and can be sent back to the server case MessageContainer.MessageType.FinishedJob: SendFinishedJob(container.JobId); break; //When the timeslice is up case MessageContainer.MessageType.UptimeLimitDisconnect: Logger.Info("Uptime Limit reached, storing jobs and sending them back"); ShutdownRunningJobsAndSubmitSnapshots(); break; //Fetch or Force Fetch Calendar! case MessageContainer.MessageType.FetchOrForceFetchCalendar: Logger.Info("Fetch Calendar from Server"); FetchCalendarFromServer(); break; //Hard shutdown of the client case MessageContainer.MessageType.Shutdown: Logger.Info("Shutdown Signal received"); lock (engines) { Logger.Debug("engines locked"); foreach (KeyValuePair kvp in appDomains) { Logger.Debug("Shutting down Appdomain for " + kvp.Key); appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException); AppDomain.Unload(kvp.Value); } } Logger.Debug("Stopping heartbeat"); abortRequested = true; heartbeatManager.StopHeartBeat(); Logger.Debug("Logging out"); WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id); break; case MessageContainer.MessageType.AddChildJob: AddChildJob((MessageContainerWithJob)container); break; case MessageContainer.MessageType.PauseJob: // send the job back to hive PauseJob((MessageContainerWithJob)container); break; case MessageContainer.MessageType.GetChildJobs: GetChildJobs((MessageContainerWithCallback)container); break; case MessageContainer.MessageType.DeleteChildJobs: wcfService.DeleteChildJobs(container.JobId); break; } } private void GetChildJobs(MessageContainerWithCallback mc) { ResponseObject response = wcfService.GetChildJobs(mc.JobId); if (response != null && response.StatusMessage == ResponseStatus.Ok) { mc.Callback(response.Obj); } else { if (response != null) { Logger.Error(string.Format("GetChildJobs failed: {0}", response.StatusMessage)); } else { Logger.Error("GetChildJobs failed."); } } } private void PauseJob(MessageContainerWithJob mc) { ResponseObject response = wcfService.PauseJob(mc.SerializedJob); KillAppDomain(mc.JobId); if (response == null || response.StatusMessage != ResponseStatus.Ok) { Logger.Error("PauseJob failed: " + response.StatusMessage); } } private ResponseObject AddChildJob(MessageContainerWithJob mc) { ResponseObject response = wcfService.AddChildJob(mc.JobId, mc.SerializedJob); if (response == null || response.StatusMessage != ResponseStatus.Ok) { Logger.Error("AddChildJob failed: " + response.StatusMessage); } return response; } private void ShutdownRunningJobsAndSubmitSnapshots() { //check if there are running jobs if (engines.Count > 0) { //make sure there is no more fetching of jobs while the snapshots get processed CurrentlyFetching = true; //request a snapshot of each running job foreach (KeyValuePair kvp in engines) { kvp.Value.RequestSnapshot(); } } } //Asynchronous Threads for interaction with the Execution Engine #region Async Threads for the EE /// /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk. /// once the connection gets reestablished, the job gets submitted /// /// private void SendFinishedJob(object jobId) { try { Guid jId = (Guid)jobId; Logger.Info("Getting the finished job with id: " + jId); if (!engines.ContainsKey(jId)) { Logger.Info("Engine doesn't exist"); return; } byte[] sJob = engines[jId].GetFinishedJob(); try { Logger.Info("Sending the finished job with id: " + jId); wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, engines[jId].ExecutionTime, engines[jId].CurrentException, true); } catch (Exception e) { Logger.Info("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")"); JobStorageManager.PersistObjectToDisc(wcfService.ServerIp, 0, jId, sJob); // [chn] Port is not unique anymore (since we need two ports for http and net.tcp-streaming). also the port is now specified only in app.config. use port 0 for the moment } finally { KillAppDomain(jId); // kill app-domain in every case } } catch (Exception e) { OnExceptionOccured(e); } } private void GetSnapshot(object jobId) { try { Logger.Info("Fetching a snapshot for job " + jobId); Guid jId = (Guid)jobId; byte[] obj = engines[jId].GetSnapshot(); wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id, jId, obj, engines[jId].ExecutionTime, null); //Uptime Limit reached, now is a good time to destroy this jobs. Logger.Debug("Checking if uptime limit is reached"); if (!UptimeManager.Instance.IsAllowedToCalculate()) { Logger.Debug("Uptime limit reached"); Logger.Debug("Killing Appdomain"); KillAppDomain(jId); //Still anything running? if (engines.Count == 0) { Logger.Info("All jobs snapshotted and sent back, disconnecting"); WcfService.Instance.Disconnect(); } else { Logger.Debug("There are still active Jobs in the Field, not disconnecting"); } } else { Logger.Debug("Restarting the job" + jobId); engines[jId].StartOnlyJob(); Logger.Info("Restarted the job" + jobId); } } catch (Exception e) { OnExceptionOccured(e); } } #endregion //Eventhandlers for the communication with the wcf Layer #region wcfService Events /// /// Login has returned /// /// /// void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) { if (e.Result.StatusMessage == ResponseStatus.Ok) { CurrentlyFetching = false; Logger.Info("Login completed to Hive Server @ " + DateTime.Now); } else Logger.Error("Error during login: " + e.Result.StatusMessage.ToString()); } /// /// A new Job from the wcfService has been received and will be started within a AppDomain. /// /// /// void wcfService_GetJobCompleted(object sender, GetJobCompletedEventArgs e) { if (e.Result.StatusMessage != ResponseStatus.GetJob_NoJobsAvailable) { Logger.Info("Received new job with id " + e.Result.Obj.Id); Logger.Debug("Fetching plugins for job " + e.Result.Obj.Id); try { PluginCache.Instance.PreparePlugins(e.Result.Obj.PluginsNeeded); PluginCache.Instance.CopyPluginsForJob(e.Result.Obj.PluginsNeeded, e.Result.Obj.Id); Logger.Debug("Plugins fetched for job " + e.Result.Obj.Id); String pluginDir = Path.Combine(PluginCache.Instance.PluginRepositoryDir, e.Result.Obj.Id.ToString()); AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(pluginDir, null); appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException); lock (engines) { if (!jobs.ContainsKey(e.Result.Obj.Id)) { jobs.Add(e.Result.Obj.Id, e.Result.Obj); appDomains.Add(e.Result.Obj.Id, appDomain); Logger.Debug("Creating AppDomain"); Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName); Logger.Debug("Created AppDomain"); engine.JobId = e.Result.Obj.Id; engine.Queue = MessageQueue.GetInstance(); Logger.Debug("Starting Engine for job " + e.Result.Obj.Id); engine.Start(e.Data); engines.Add(e.Result.Obj.Id, engine); SlaveStatusInfo.JobsFetched++; Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched); } } heartbeatManager.AwakeHeartBeatThread(); } catch (Exception exception) { Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Obj.Id); Logger.Error("Error thrown is: ", exception); CurrentlyFetching = false; KillAppDomain(e.Result.Obj.Id); wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, e.Result.Obj.ExecutionTime, exception.ToString(), true); } } else { Logger.Info("No more jobs left!"); } CurrentlyFetching = false; } /// /// A finished job has been stored on the server /// /// /// void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) { Logger.Info("Job submitted with id " + e.Result.JobId); KillAppDomain(e.Result.JobId); if (e.Result.StatusMessage == ResponseStatus.Ok) { SlaveStatusInfo.JobsProcessed++; Logger.Info("Increased ProcessedJobs to:" + SlaveStatusInfo.JobsProcessed); heartbeatManager.AwakeHeartBeatThread(); } else { Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage); } } /// /// A snapshot has been stored on the server /// /// /// void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) { Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan."); } /// /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk. /// /// /// void wcfService_Connected(object sender, EventArgs e) { Logger.Info("WCF Service got a connection"); if (!UptimeManager.Instance.CalendarAvailable) { Logger.Info("No local calendar available, fetch it"); FetchCalendarFromServer(); } Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsAllowedToCalculate()); CurrentlyFetching = false; CheckRunningAppDomains(); JobStorageManager.CheckAndSubmitJobsFromDisc(); } private void FetchCalendarFromServer() { ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id); if (calres.StatusMessage == ResponseStatus.Ok) { if (UptimeManager.Instance.SetAppointments(false, calres)) { Logger.Info("Remote calendar installed"); wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched); } else { Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch); wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch); } } else { Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch); wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch); } } private void CheckRunningAppDomains() { foreach (KeyValuePair execKVP in engines) { if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) { Logger.Info("Checking for JobId: " + execKVP.Value.JobId); Thread finThread = new Thread(new ParameterizedThreadStart(SendFinishedJob)); finThread.Start(execKVP.Value.JobId); } } } #endregion public event EventHandler> ExceptionOccured; private void OnExceptionOccured(Exception e) { Logger.Error("Error: " + e.ToString()); var handler = ExceptionOccured; if (handler != null) handler(this, new EventArgs(e)); } void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) { Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString()); } /// /// Kill a appdomain with a specific id. /// /// the GUID of the job private void KillAppDomain(Guid id) { Logger.Debug("Shutting down Appdomain for Job " + id); lock (engines) { try { if (engines.ContainsKey(id)) engines[id].Dispose(); if (appDomains.ContainsKey(id)) { appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException); int repeat = 5; while (repeat > 0) { try { AppDomain.Unload(appDomains[id]); repeat = 0; } catch (CannotUnloadAppDomainException) { Logger.Error("Could not unload AppDomain, will try again in 1 sec."); Thread.Sleep(1000); repeat--; if (repeat == 0) { throw; // rethrow and let app crash } } } appDomains.Remove(id); } engines.Remove(id); jobs.Remove(id); PluginCache.Instance.DeletePluginsForJob(id); GC.Collect(); } catch (Exception ex) { Logger.Error("Exception when unloading the appdomain: ", ex); } } GC.Collect(); } } }