#region License Information /* HeuristicLab * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Runtime.CompilerServices; using System.ServiceModel; using System.Threading; using System.Threading.Tasks; using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts; using HeuristicLab.Common; using HeuristicLab.Core; namespace HeuristicLab.Clients.Hive.SlaveCore { /// /// The core component of the Hive Client /// public class Core : MarshalByRefObject { //TODO: this class should be a singleton; there is only one instance, the reference is meanwhile save in TheCore public static Core TheCore; public EventLog ServiceEventLog { get; set; } public static bool abortRequested { get; set; } private Semaphore waitShutdownSem = new Semaphore(0, 1); public static ILog Log { get; set; } private Dictionary engines = new Dictionary(); private Dictionary appDomains = new Dictionary(); private Dictionary jobs = new Dictionary(); private WcfService wcfService; private HeartbeatManager heartbeatManager; private int coreThreadId; private ISlaveCommunication clientCom; private ServiceHost slaveComm; public Dictionary ExecutionEngines { get { return engines; } } internal Dictionary Jobs { get { return jobs; } } public Core() { TheCore = this; } /// /// Main Method for the client /// public void Start() { coreThreadId = Thread.CurrentThread.ManagedThreadId; abortRequested = false; try { //start the client communication service (pipe between slave and slave gui) slaveComm = new ServiceHost(typeof(SlaveCommunicationService)); slaveComm.Open(); clientCom = SlaveClientCom.Instance.ClientCom; clientCom.LogMessage("Hive Slave started"); ConfigManager manager = ConfigManager.Instance; manager.Core = this; wcfService = WcfService.Instance; RegisterServiceEvents(); StartHeartbeats(); // Start heartbeats thread DispatchMessageQueue(); // dispatch messages until abortRequested } catch (Exception ex) { if (ServiceEventLog != null) { try { ServiceEventLog.WriteEntry("Hive Slave threw exception: " + ex.ToString() + " with stack trace: " + ex.StackTrace); } catch (Exception) { } } else { throw ex; } } finally { DeRegisterServiceEvents(); waitShutdownSem.Release(); } } private void StartHeartbeats() { //Initialize the heartbeat if (heartbeatManager == null) { heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) }; heartbeatManager.StartHeartbeat(); } } private void DispatchMessageQueue() { MessageQueue queue = MessageQueue.GetInstance(); while (!abortRequested) { MessageContainer container = queue.GetMessage(); DetermineAction(container); } } private void RegisterServiceEvents() { WcfService.Instance.Connected += new EventHandler(WcfService_Connected); WcfService.Instance.ExceptionOccured += new EventHandler>(WcfService_ExceptionOccured); } private void DeRegisterServiceEvents() { WcfService.Instance.Connected -= WcfService_Connected; WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured; } void WcfService_ExceptionOccured(object sender, EventArgs e) { clientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message); } void WcfService_Connected(object sender, EventArgs e) { clientCom.LogMessage("Connected successfully to Hive server"); } /// /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions /// /// The Container, containing the message private void DetermineAction(MessageContainer container) { clientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId); if (container is ExecutorMessageContainer) { ExecutorMessageContainer c = (ExecutorMessageContainer)container; c.execute(); } else if (container is MessageContainer) { switch (container.Message) { case MessageContainer.MessageType.CalculateJob: Task.Factory.StartNew((jobIdObj) => { Guid jobId = (Guid)jobIdObj; Job job = wcfService.GetJob(jobId); if (job == null) throw new JobNotFoundException(jobId); lock (engines) { if (!jobs.ContainsKey(job.Id)) { jobs.Add(job.Id, job); } } JobData jobData = wcfService.GetJobData(job.Id); if (job == null) throw new JobDataNotFoundException(jobId); job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null); StartJobInAppDomain(job, jobData); }, container.JobId) .ContinueWith((t) => { // handle exception of task clientCom.LogMessage(t.Exception.ToString()); }, TaskContinuationOptions.OnlyOnFaulted); break; case MessageContainer.MessageType.ShutdownSlave: ShutdownCore(); break; case MessageContainer.MessageType.StopAll: DoStopAll(); break; case MessageContainer.MessageType.PauseAll: DoPauseAll(); break; case MessageContainer.MessageType.AbortAll: DoAbortAll(); break; case MessageContainer.MessageType.AbortJob: KillAppDomain(container.JobId); break; case MessageContainer.MessageType.StopJob: DoStopJob(container.JobId); break; case MessageContainer.MessageType.PauseJob: DoPauseJob(container.JobId); break; case MessageContainer.MessageType.Restart: DoStartSlave(); break; case MessageContainer.MessageType.Sleep: Sleep(); break; case MessageContainer.MessageType.SayHello: wcfService.Connect(ConfigManager.Instance.GetClientInfo()); break; } } else { clientCom.LogMessage("Unknown MessageContainer: " + container); } } private void DoPauseJob(Guid jobId) { if (!Jobs.ContainsKey(jobId)) { clientCom.LogMessage("DoPauseJob: Can't find job with id " + jobId); } else { Job job = Jobs[jobId]; if (job != null) { engines[job.Id].Pause(); JobData sJob = engines[job.Id].GetPausedJob(); job.ExecutionTime = engines[job.Id].ExecutionTime; try { clientCom.LogMessage("Sending the paused job with id: " + job.Id); wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused); SlaveStatusInfo.JobsProcessed++; //TODO: count or not count, thats the question } catch (Exception e) { clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")"); } finally { KillAppDomain(job.Id); // kill app-domain in every case } } } } private void DoStopJob(Guid jobId) { if (!Jobs.ContainsKey(jobId)) { clientCom.LogMessage("DoStopJob: Can't find job with id " + jobId); } else { Job job = Jobs[jobId]; if (job != null) { engines[job.Id].Stop(); JobData sJob = engines[job.Id].GetFinishedJob(); job.ExecutionTime = engines[job.Id].ExecutionTime; try { clientCom.LogMessage("Sending the stoppped job with id: " + job.Id); wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished); SlaveStatusInfo.JobsProcessed++; //TODO: count or not count, thats the question } catch (Exception e) { clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")"); } finally { KillAppDomain(job.Id); // kill app-domain in every case } } } } /// /// aborts all running jobs, no results are sent back /// private void DoAbortAll() { List guids = new List(); foreach (Guid job in Jobs.Keys) { guids.Add(job); } foreach (Guid g in guids) { KillAppDomain(g); } clientCom.LogMessage("Aborted all jobs!"); } /// /// wait for jobs to finish, then pause client /// private void DoPauseAll() { clientCom.LogMessage("Pause all received"); //copy guids because there will be removed items from 'Jobs' List guids = new List(); foreach (Guid job in Jobs.Keys) { guids.Add(job); } foreach (Guid g in guids) { DoPauseJob(g); } } /// /// pause slave immediately /// private void DoStopAll() { clientCom.LogMessage("Stop all received"); //copy guids because there will be removed items from 'Jobs' List guids = new List(); foreach (Guid job in Jobs.Keys) { guids.Add(job); } foreach (Guid g in guids) { DoStopJob(g); } } /// /// completly shudown slave /// public void Shutdown() { MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave); MessageQueue.GetInstance().AddMessage(mc); waitShutdownSem.WaitOne(); } /// /// complete shutdown, should be called before the the application is exited /// private void ShutdownCore() { clientCom.LogMessage("Shutdown Signal received"); clientCom.LogMessage("Stopping heartbeat"); heartbeatManager.StopHeartBeat(); abortRequested = true; clientCom.LogMessage("Logging out"); lock (engines) { clientCom.LogMessage("engines locked"); foreach (KeyValuePair kvp in appDomains) { clientCom.LogMessage("Shutting down Appdomain for " + kvp.Key); appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException); AppDomain.Unload(kvp.Value); } } WcfService.Instance.Disconnect(); clientCom.Shutdown(); SlaveClientCom.Close(); if (slaveComm.State != CommunicationState.Closed) slaveComm.Close(); } /// /// reinitializes everything and continues operation, /// can be called after Sleep() /// private void DoStartSlave() { clientCom.LogMessage("Restart received"); StartHeartbeats(); clientCom.LogMessage("Restart done"); } /// /// stop slave, except for client gui communication, /// primarily used by gui if core is running as windows service /// //TODO: do we need an AbortSleep? private void Sleep() { clientCom.LogMessage("Sleep received"); heartbeatManager.StopHeartBeat(); heartbeatManager = null; DoStopAll(); WcfService.Instance.Disconnect(); clientCom.LogMessage("Sleep done"); } /// /// Pauses a job, which means sending it to the server and killing it locally; /// atm only used when executor is waiting for child jobs /// /// [MethodImpl(MethodImplOptions.Synchronized)] public void PauseWaitJob(JobData data) { if (!Jobs.ContainsKey(data.JobId)) { clientCom.LogMessage("Can't find job with id " + data.JobId); } else { Job job = Jobs[data.JobId]; wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused); wcfService.UpdateJobState(job.Id, JobState.Waiting, null); } KillAppDomain(data.JobId); } /// /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk. /// once the connection gets reestablished, the job gets submitted /// /// [MethodImpl(MethodImplOptions.Synchronized)] public void SendFinishedJob(Guid jobId) { try { clientCom.LogMessage("Getting the finished job with id: " + jobId); if (!engines.ContainsKey(jobId)) { clientCom.LogMessage("Engine doesn't exist"); return; } if (!jobs.ContainsKey(jobId)) { clientCom.LogMessage("Job doesn't exist"); return; } Job cJob = jobs[jobId]; cJob.ExecutionTime = engines[jobId].ExecutionTime; JobData sJob = engines[jobId].GetFinishedJob(); // cJob.Exception = engines[jId].CurrentException; // can there be an exception if the job is sent normally. the exception should be entered in the statelog with the corresponding state (Failed) cJob.ExecutionTime = engines[jobId].ExecutionTime; try { clientCom.LogMessage("Sending the finished job with id: " + jobId); wcfService.UpdateJobData(cJob, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished); SlaveStatusInfo.JobsProcessed++; } catch (Exception e) { clientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")"); } finally { KillAppDomain(jobId); // kill app-domain in every case heartbeatManager.AwakeHeartBeatThread(); } } catch (Exception e) { OnExceptionOccured(e); } } /// /// A new Job from the wcfService has been received and will be started within a AppDomain. /// /// /// private void StartJobInAppDomain(Job myJob, JobData jobData) { clientCom.LogMessage("Received new job with id " + myJob.Id); if (engines.ContainsKey(myJob.Id)) throw new Exception("Job with key " + myJob.Id + " already exists"); String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString()); bool pluginsPrepared = false; string configFileName = string.Empty; try { PluginCache.Instance.PreparePlugins(myJob, out configFileName); clientCom.LogMessage("Plugins fetched for job " + myJob.Id); pluginsPrepared = true; } catch (Exception exception) { clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception)); } if (pluginsPrepared) { try { AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName)); appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException); lock (engines) { appDomains.Add(myJob.Id, appDomain); clientCom.LogMessage("Creating AppDomain"); Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName); clientCom.LogMessage("Created AppDomain"); engine.JobId = myJob.Id; engine.Core = this; clientCom.LogMessage("Starting Engine for job " + myJob.Id); engines.Add(myJob.Id, engine); engine.Start(jobData.Data); SlaveStatusInfo.JobsFetched++; clientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched); } } catch (Exception exception) { clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id); clientCom.LogMessage("Error thrown is: " + exception.ToString()); KillAppDomain(myJob.Id); } } heartbeatManager.AwakeHeartBeatThread(); } public event EventHandler> ExceptionOccured; private void OnExceptionOccured(Exception e) { clientCom.LogMessage("Error: " + e.ToString()); var handler = ExceptionOccured; if (handler != null) handler(this, new EventArgs(e)); } private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) { clientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString()); KillAppDomain(new Guid(e.ExceptionObject.ToString())); } /// /// Enqueues messages from the executor to the message queue. /// This is necessary if the core thread has to execute certain actions, e.g. /// killing of an app domain. /// /// /// /// /// true if the calling method can continue execution, else false public void EnqueueExecutorMessage(Action action, T parameter) { ExecutorMessageContainer container = new ExecutorMessageContainer(); container.Callback = action; container.CallbackParameter = parameter; MessageQueue.GetInstance().AddMessage(container); } /// /// Kill a appdomain with a specific id. /// /// the GUID of the job //[MethodImpl(MethodImplOptions.Synchronized)] public void KillAppDomain(Guid id) { if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) { EnqueueExecutorMessage(KillAppDomain, id); return; } clientCom.LogMessage("Shutting down Appdomain for Job " + id); lock (engines) { try { if (engines.ContainsKey(id)) { engines[id].Dispose(); engines.Remove(id); } if (appDomains.ContainsKey(id)) { appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException); int repeat = 5; while (repeat > 0) { try { AppDomain.Unload(appDomains[id]); repeat = 0; } catch (CannotUnloadAppDomainException) { clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec."); Thread.Sleep(1000); repeat--; if (repeat == 0) { throw; // rethrow and let app crash } } } appDomains.Remove(id); } jobs.Remove(id); PluginCache.Instance.DeletePluginsForJob(id); GC.Collect(); } catch (Exception ex) { clientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString()); } } GC.Collect(); } } }