#region License Information /* HeuristicLab * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.IO; using System.Runtime.CompilerServices; using System.ServiceModel; using System.Threading; using HeuristicLab.Clients.Hive.Slave.ServiceContracts; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Services.Hive.Common; using HeuristicLab.Services.Hive.Common.DataTransfer; namespace HeuristicLab.Clients.Hive.Slave { /// /// The core component of the Hive Client /// public class Core : MarshalByRefObject { //TODO: this class should be a singleton; there is only one instance, the reference is meanwhile save in TheCore public static Core TheCore; public static bool abortRequested { get; set; } private Semaphore waitShutdownSem = new Semaphore(0, 1); public static ILog Log { get; set; } private Dictionary engines = new Dictionary(); private Dictionary appDomains = new Dictionary(); private Dictionary jobs = new Dictionary(); private WcfService wcfService; private HeartbeatManager heartbeatManager; private int coreThreadId; private ISlaveCommunication ClientCom; private ServiceHost slaveComm; public Dictionary ExecutionEngines { get { return engines; } } internal Dictionary Jobs { get { return jobs; } } public Core() { TheCore = this; } /// /// Main Method for the client /// public void Start() { coreThreadId = Thread.CurrentThread.ManagedThreadId; abortRequested = false; //start the client communication service (pipe between slave and slave gui) slaveComm = new ServiceHost(typeof(SlaveCommunicationService)); slaveComm.Open(); ClientCom = SlaveClientCom.Instance.ClientCom; ClientCom.LogMessage("Hive Slave started"); ConfigManager manager = ConfigManager.Instance; manager.Core = this; wcfService = WcfService.Instance; RegisterServiceEvents(); StartHeartbeats(); // Start heartbeats thread DispatchMessageQueue(); // dispatch messages until abortRequested DeRegisterServiceEvents(); waitShutdownSem.Release(); } private void StartHeartbeats() { //Initialize the heartbeat heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) }; heartbeatManager.StartHeartbeat(); } private void DispatchMessageQueue() { MessageQueue queue = MessageQueue.GetInstance(); while (!abortRequested) { MessageContainer container = queue.GetMessage(); DetermineAction(container); } } private void RegisterServiceEvents() { WcfService.Instance.Connected += new EventHandler(wcfService_Connected); WcfService.Instance.ExceptionOccured += new EventHandler>(wcfService_ExceptionOccured); } private void DeRegisterServiceEvents() { WcfService.Instance.Connected -= wcfService_Connected; WcfService.Instance.ExceptionOccured -= wcfService_ExceptionOccured; } void wcfService_ExceptionOccured(object sender, EventArgs e) { ClientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message); } void wcfService_Connected(object sender, EventArgs e) { ClientCom.LogMessage("Connected successfully to Hive server"); } /// /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions /// /// The Container, containing the message private void DetermineAction(MessageContainer container) { ClientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId); //TODO: find a better solution if (container is ExecutorMessageContainer) { ExecutorMessageContainer c = (ExecutorMessageContainer)container; c.execute(); } else if (container is MessageContainer) { switch (container.Message) { //Pull a Job from the Server case MessageContainer.MessageType.CalculateJob: Job myJob = wcfService.GetJob(container.JobId); //TODO: handle in own thread!! JobData jobData = wcfService.GetJobData(myJob.Id); StartJobInAppDomain(myJob, jobData); break; case MessageContainer.MessageType.ShutdownSlave: ShutdownCore(); break; case MessageContainer.MessageType.StopAll: DoStopAll(); break; case MessageContainer.MessageType.PauseAll: DoPauseAll(); break; case MessageContainer.MessageType.AbortAll: DoAbortAll(); break; case MessageContainer.MessageType.AbortJob: KillAppDomain(container.JobId); break; case MessageContainer.MessageType.StopJob: DoStopJob(container.JobId); break; case MessageContainer.MessageType.PauseJob: DoPauseJob(container.JobId); break; case MessageContainer.MessageType.Restart: DoStartSlave(); break; } } else { ClientCom.LogMessage("Unknown MessageContainer: " + container); } } private void DoPauseJob(Guid guid) { Job job = Jobs[guid]; if (job != null) { engines[job.Id].Pause(); JobData sJob = engines[job.Id].GetFinishedJob(); job.Exception = engines[job.Id].CurrentException; job.ExecutionTime = engines[job.Id].ExecutionTime; try { ClientCom.LogMessage("Sending the paused job with id: " + job.Id); wcfService.UpdateJob(job, sJob); SlaveStatusInfo.JobsProcessed++; //TODO: count or not count, thats the question } catch (Exception e) { ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")"); } finally { KillAppDomain(job.Id); // kill app-domain in every case } } } private void DoStopJob(Guid guid) { Job job = Jobs[guid]; if (job != null) { engines[job.Id].Stop(); JobData sJob = engines[job.Id].GetFinishedJob(); job.Exception = engines[job.Id].CurrentException; job.ExecutionTime = engines[job.Id].ExecutionTime; try { ClientCom.LogMessage("Sending the stoppped job with id: " + job.Id); wcfService.UpdateJob(job, sJob); SlaveStatusInfo.JobsProcessed++; //TODO: count or not count, thats the question } catch (Exception e) { ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")"); } finally { KillAppDomain(job.Id); // kill app-domain in every case } } } /// /// aborts all running jobs, no results are sent back /// private void DoAbortAll() { List guids = new List(); foreach (Guid job in Jobs.Keys) { guids.Add(job); } foreach (Guid g in guids) { KillAppDomain(g); } ClientCom.LogMessage("Aborted all jobs!"); } /// /// wait for jobs to finish, then pause client /// private void DoPauseAll() { ClientCom.LogMessage("Pause all received"); //copy guids because there will be removed items from 'Jobs' List guids = new List(); foreach (Guid job in Jobs.Keys) { guids.Add(job); } foreach (Guid g in guids) { DoPauseJob(g); } } /// /// pause slave immediately /// private void DoStopAll() { ClientCom.LogMessage("Stop all received"); //copy guids because there will be removed items from 'Jobs' List guids = new List(); foreach (Guid job in Jobs.Keys) { guids.Add(job); } foreach (Guid g in guids) { DoStopJob(g); } } /// /// completly shudown slave /// public void Shutdown() { MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave); MessageQueue.GetInstance().AddMessage(mc); waitShutdownSem.WaitOne(); } /// /// complete shutdown, should be called before the the application is exited /// private void ShutdownCore() { ClientCom.LogMessage("Shutdown Signal received"); ClientCom.LogMessage("Stopping heartbeat"); heartbeatManager.StopHeartBeat(); abortRequested = true; ClientCom.LogMessage("Logging out"); lock (engines) { ClientCom.LogMessage("engines locked"); foreach (KeyValuePair kvp in appDomains) { ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key); appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException); AppDomain.Unload(kvp.Value); } } WcfService.Instance.Disconnect(); ClientCom.Shutdown(); SlaveClientCom.Close(); if (slaveComm.State != CommunicationState.Closed) slaveComm.Close(); } /// /// reinitializes everything and continues operation, /// can be called after Sleep() /// private void DoStartSlave() { ClientCom.LogMessage("Restart received"); StartHeartbeats(); ClientCom.LogMessage("Restart done"); } /// /// stop slave, except for client gui communication, /// primarily used by gui if core is running as windows service /// //TODO: do we need an AbortSleep? private void Sleep() { ClientCom.LogMessage("Sleep received"); heartbeatManager.StopHeartBeat(); DoStopAll(); WcfService.Instance.Disconnect(); ClientCom.LogMessage("Sleep done"); } /// /// Pauses a job, which means sending it to the server and killing it locally; /// atm only used when executor is waiting for child jobs /// /// [MethodImpl(MethodImplOptions.Synchronized)] public void PauseWaitJob(JobData data) { if (!Jobs.ContainsKey(data.JobId)) { ClientCom.LogMessage("Can't find job with id " + data.JobId); } else { Job job = Jobs[data.JobId]; job.JobState = JobState.WaitingForChildJobs; wcfService.UpdateJob(job, data); } KillAppDomain(data.JobId); } /// /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk. /// once the connection gets reestablished, the job gets submitted /// /// [MethodImpl(MethodImplOptions.Synchronized)] public void SendFinishedJob(object jobId) { try { Guid jId = (Guid)jobId; ClientCom.LogMessage("Getting the finished job with id: " + jId); if (!engines.ContainsKey(jId)) { ClientCom.LogMessage("Engine doesn't exist"); return; } if (!jobs.ContainsKey(jId)) { ClientCom.LogMessage("Job doesn't exist"); return; } Job cJob = jobs[jId]; JobData sJob = engines[jId].GetFinishedJob(); cJob.Exception = engines[jId].CurrentException; cJob.ExecutionTime = engines[jId].ExecutionTime; try { ClientCom.LogMessage("Sending the finished job with id: " + jId); wcfService.UpdateJob(cJob, sJob); SlaveStatusInfo.JobsProcessed++; } catch (Exception e) { ClientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")"); } finally { KillAppDomain(jId); // kill app-domain in every case heartbeatManager.AwakeHeartBeatThread(); } } catch (Exception e) { OnExceptionOccured(e); } } /// /// A new Job from the wcfService has been received and will be started within a AppDomain. /// /// /// private void StartJobInAppDomain(Job myJob, JobData jobData) { ClientCom.LogMessage("Received new job with id " + myJob.Id); String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString()); bool pluginsPrepared = false; try { PluginCache.Instance.PreparePlugins(myJob, jobData); ClientCom.LogMessage("Plugins fetched for job " + myJob.Id); pluginsPrepared = true; } catch (Exception exception) { ClientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception)); } if (pluginsPrepared) { try { AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, PluginCache.ConfigFileName)); appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException); lock (engines) { if (!jobs.ContainsKey(myJob.Id)) { jobs.Add(myJob.Id, myJob); appDomains.Add(myJob.Id, appDomain); ClientCom.LogMessage("Creating AppDomain"); Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName); ClientCom.LogMessage("Created AppDomain"); engine.JobId = myJob.Id; engine.Core = this; ClientCom.LogMessage("Starting Engine for job " + myJob.Id); engines.Add(myJob.Id, engine); engine.Start(jobData.Data); SlaveStatusInfo.JobsFetched++; ClientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched); } } heartbeatManager.AwakeHeartBeatThread(); } catch (Exception exception) { ClientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id); ClientCom.LogMessage("Error thrown is: " + exception.ToString()); KillAppDomain(myJob.Id); } } } public event EventHandler> ExceptionOccured; private void OnExceptionOccured(Exception e) { ClientCom.LogMessage("Error: " + e.ToString()); var handler = ExceptionOccured; if (handler != null) handler(this, new EventArgs(e)); } private void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) { ClientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString()); KillAppDomain(new Guid(e.ExceptionObject.ToString())); } /// /// Enqueues messages from the executor to the message queue. /// This is necessary if the core thread has to execute certain actions, e.g. /// killing of an app domain. /// /// /// /// /// true if the calling method can continue execution, else false private bool EnqueueExecutorMessage(Action action, T parameter) { if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) { ExecutorMessageContainer container = new ExecutorMessageContainer(); container.Callback = action; container.CallbackParameter = parameter; MessageQueue.GetInstance().AddMessage(container); return false; } else { return true; } } /// /// Kill a appdomain with a specific id. /// /// the GUID of the job [MethodImpl(MethodImplOptions.Synchronized)] public void KillAppDomain(Guid id) { if (EnqueueExecutorMessage(KillAppDomain, id)) { ClientCom.LogMessage("Shutting down Appdomain for Job " + id); lock (engines) { try { if (engines.ContainsKey(id)) { engines[id].Dispose(); engines.Remove(id); } if (appDomains.ContainsKey(id)) { appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException); int repeat = 5; while (repeat > 0) { try { AppDomain.Unload(appDomains[id]); repeat = 0; } catch (CannotUnloadAppDomainException) { ClientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec."); Thread.Sleep(1000); repeat--; if (repeat == 0) { throw; // rethrow and let app crash } } } appDomains.Remove(id); } jobs.Remove(id); PluginCache.Instance.DeletePluginsForJob(id); GC.Collect(); } catch (Exception ex) { ClientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString()); } } GC.Collect(); } } } }