#region License Information /* HeuristicLab * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Diagnostics; using System.ServiceModel; using System.Threading; using System.Threading.Tasks; using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts; using HeuristicLab.Common; using HeuristicLab.Core; namespace HeuristicLab.Clients.Hive.SlaveCore { /// /// The core component of the Hive Slave. /// Handles commands sent from the Hive Server and does all webservice calls for jobs. /// public class Core : MarshalByRefObject { private static HeartbeatManager heartbeatManager; public static HeartbeatManager HeartbeatManager { get { return heartbeatManager; } } public EventLog ServiceEventLog { get; set; } private Semaphore waitShutdownSem = new Semaphore(0, 1); private bool abortRequested; private ISlaveCommunication clientCom; private ServiceHost slaveComm; private WcfService wcfService; private JobManager jobManager; private ConfigManager configManager; private PluginManager pluginManager; public Core() { var log = new ThreadSafeLog(new Log()); this.pluginManager = new PluginManager(WcfService.Instance, log); this.jobManager = new JobManager(pluginManager, log); log.MessageAdded += new EventHandler>(log_MessageAdded); RegisterJobManagerEvents(); this.configManager = new ConfigManager(jobManager); ConfigManager.Instance = this.configManager; } /// /// Main method for the client /// public void Start() { abortRequested = false; try { //start the client communication service (pipe between slave and slave gui) slaveComm = new ServiceHost(typeof(SlaveCommunicationService)); slaveComm.Open(); clientCom = SlaveClientCom.Instance.ClientCom; // delete all left over job directories pluginManager.CleanPluginTemp(); clientCom.LogMessage("Hive Slave started"); wcfService = WcfService.Instance; RegisterServiceEvents(); StartHeartbeats(); // Start heartbeats thread DispatchMessageQueue(); // dispatch messages until abortRequested } catch (Exception ex) { if (ServiceEventLog != null) { try { ServiceEventLog.WriteEntry(string.Format("Hive Slave threw exception: {0} with stack trace: {1}", ex.ToString(), ex.StackTrace), EventLogEntryType.Error); } catch (Exception) { } } else { //try to log with clientCom. if this works the user sees at least a message, //else an exception will be thrown anyways. clientCom.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine)); } ShutdownCore(); } finally { DeregisterServiceEvents(); waitShutdownSem.Release(); } } private void StartHeartbeats() { //Initialize the heartbeat if (heartbeatManager == null) { heartbeatManager = new HeartbeatManager(); heartbeatManager.StartHeartbeat(); } } private void DispatchMessageQueue() { MessageQueue queue = MessageQueue.GetInstance(); while (!abortRequested) { MessageContainer container = queue.GetMessage(); DetermineAction(container); if (!abortRequested) { clientCom.StatusChanged(configManager.GetStatusForClientConsole()); } } } private void RegisterServiceEvents() { WcfService.Instance.Connected += new EventHandler(WcfService_Connected); WcfService.Instance.ExceptionOccured += new EventHandler>(WcfService_ExceptionOccured); } private void DeregisterServiceEvents() { WcfService.Instance.Connected -= WcfService_Connected; WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured; } private void WcfService_ExceptionOccured(object sender, EventArgs e) { clientCom.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message)); } private void WcfService_Connected(object sender, EventArgs e) { clientCom.LogMessage("Connected successfully to Hive server"); } /// /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions /// /// The container, containing the message private void DetermineAction(MessageContainer container) { clientCom.LogMessage(string.Format("Message: {0} for job: {1} ", container.Message.ToString(), container.JobId)); if (container is ExecutorMessageContainer) { ExecutorMessageContainer c = (ExecutorMessageContainer)container; c.execute(); } else if (container is MessageContainer) { switch (container.Message) { case MessageContainer.MessageType.CalculateJob: CalculateJobAsync(container.JobId); break; case MessageContainer.MessageType.AbortJob: AbortJobAsync(container.JobId); break; case MessageContainer.MessageType.StopJob: StopJobAsync(container.JobId); break; case MessageContainer.MessageType.PauseJob: PauseJobAsync(container.JobId); break; case MessageContainer.MessageType.StopAll: DoStopAll(); break; case MessageContainer.MessageType.PauseAll: DoPauseAll(); break; case MessageContainer.MessageType.AbortAll: DoAbortAll(); break; case MessageContainer.MessageType.ShutdownSlave: ShutdownCore(); break; case MessageContainer.MessageType.Restart: DoStartSlave(); break; case MessageContainer.MessageType.Sleep: Sleep(); break; case MessageContainer.MessageType.SayHello: wcfService.Connect(configManager.GetClientInfo()); break; } } else { clientCom.LogMessage("Unknown MessageContainer: " + container); } } private void CalculateJobAsync(Guid jobId) { Task.Factory.StartNew(HandleCalculateJob, jobId) .ContinueWith((t) => { SlaveStatusInfo.IncrementExceptionOccured(); clientCom.LogMessage(t.Exception.ToString()); }, TaskContinuationOptions.OnlyOnFaulted); } private void StopJobAsync(Guid jobId) { Task.Factory.StartNew(HandleStopJob, jobId) .ContinueWith((t) => { SlaveStatusInfo.IncrementExceptionOccured(); clientCom.LogMessage(t.Exception.ToString()); }, TaskContinuationOptions.OnlyOnFaulted); } private void PauseJobAsync(Guid jobId) { Task.Factory.StartNew(HandlePauseJob, jobId) .ContinueWith((t) => { SlaveStatusInfo.IncrementExceptionOccured(); clientCom.LogMessage(t.Exception.ToString()); }, TaskContinuationOptions.OnlyOnFaulted); } private void AbortJobAsync(Guid jobId) { Task.Factory.StartNew(HandleAbortJob, jobId) .ContinueWith((t) => { SlaveStatusInfo.IncrementExceptionOccured(); clientCom.LogMessage(t.Exception.ToString()); }, TaskContinuationOptions.OnlyOnFaulted); } private void HandleCalculateJob(object jobIdObj) { Guid jobId = (Guid)jobIdObj; Job job = null; int usedCores = 0; try { job = wcfService.GetJob(jobId); if (job == null) throw new JobNotFoundException(jobId); if (ConfigManager.Instance.GetFreeCores() < job.CoresNeeded) throw new OutOfCoresException(); if (ConfigManager.GetFreeMemory() < job.MemoryNeeded) throw new OutOfMemoryException(); SlaveStatusInfo.IncrementUsedCores(job.CoresNeeded); usedCores = job.CoresNeeded; JobData jobData = wcfService.GetJobData(jobId); if (jobData == null) throw new JobDataNotFoundException(jobId); job = wcfService.UpdateJobState(jobId, JobState.Calculating, null); if (job == null) throw new JobNotFoundException(jobId); jobManager.StartJobAsync(job, jobData); } catch (JobNotFoundException) { SlaveStatusInfo.DecrementUsedCores(usedCores); throw; } catch (JobDataNotFoundException) { SlaveStatusInfo.DecrementUsedCores(usedCores); throw; } catch (JobAlreadyRunningException) { SlaveStatusInfo.DecrementUsedCores(usedCores); throw; } catch (OutOfCoresException) { wcfService.UpdateJobState(jobId, JobState.Waiting, "No more cores available"); throw; } catch (OutOfMemoryException) { wcfService.UpdateJobState(jobId, JobState.Waiting, "No more memory available"); throw; } catch (Exception e) { SlaveStatusInfo.DecrementUsedCores(usedCores); wcfService.UpdateJobState(jobId, JobState.Waiting, e.ToString()); // unknown internal error - report and set waiting again throw; } } private void HandleStopJob(object jobIdObj) { Guid jobId = (Guid)jobIdObj; try { Job job = wcfService.GetJob(jobId); if (job == null) throw new JobNotFoundException(jobId); jobManager.StopJobAsync(jobId); } catch (JobNotFoundException) { throw; } catch (JobNotRunningException) { throw; } catch (AppDomainNotCreatedException) { throw; } } private void HandlePauseJob(object jobIdObj) { Guid jobId = (Guid)jobIdObj; try { Job job = wcfService.GetJob(jobId); if (job == null) throw new JobNotFoundException(jobId); jobManager.PauseJobAsync(jobId); } catch (JobNotFoundException) { throw; } catch (JobNotRunningException) { throw; } catch (AppDomainNotCreatedException) { throw; } } private void HandleAbortJob(object jobIdObj) { Guid jobId = (Guid)jobIdObj; try { jobManager.AbortJob(jobId); } catch (JobNotFoundException) { throw; } } #region JobManager Events private void RegisterJobManagerEvents() { this.jobManager.JobStarted += new EventHandler>(jobManager_JobStarted); this.jobManager.JobPaused += new EventHandler>(jobManager_JobPaused); this.jobManager.JobStopped += new EventHandler>(jobManager_JobStopped); this.jobManager.JobFailed += new EventHandler>>(jobManager_JobFailed); this.jobManager.ExceptionOccured += new EventHandler>(jobManager_ExceptionOccured); this.jobManager.JobAborted += new EventHandler>(jobManager_JobAborted); } private void jobManager_JobStarted(object sender, EventArgs e) { // successfully started, everything is good } private void jobManager_JobPaused(object sender, EventArgs e) { try { SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded); heartbeatManager.AwakeHeartBeatThread(); Job job = wcfService.GetJob(e.Value.JobId); if (job == null) throw new JobNotFoundException(e.Value.JobId); job.ExecutionTime = e.Value.ExecutionTime; JobData jobData = e.Value.GetJobData(); wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Paused); } catch (JobNotFoundException ex) { clientCom.LogMessage(ex.ToString()); } catch (Exception ex) { clientCom.LogMessage(ex.ToString()); } } private void jobManager_JobStopped(object sender, EventArgs e) { try { SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded); heartbeatManager.AwakeHeartBeatThread(); Job job = wcfService.GetJob(e.Value.JobId); if (job == null) throw new JobNotFoundException(e.Value.JobId); job.ExecutionTime = e.Value.ExecutionTime; JobData jobData = e.Value.GetJobData(); wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Finished); } catch (JobNotFoundException ex) { clientCom.LogMessage(ex.ToString()); } catch (Exception ex) { clientCom.LogMessage(ex.ToString()); } } private void jobManager_JobFailed(object sender, EventArgs> e) { try { SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded); heartbeatManager.AwakeHeartBeatThread(); SlaveJob slaveJob = e.Value.Item1; JobData jobData = e.Value.Item2; Exception exception = e.Value.Item3; Job job = wcfService.GetJob(slaveJob.JobId); if (job == null) throw new JobNotFoundException(slaveJob.JobId); job.ExecutionTime = slaveJob.ExecutionTime; if (jobData != null) { wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Failed, exception.ToString()); } else { wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString()); } clientCom.LogMessage(exception.Message); } catch (JobNotFoundException ex) { SlaveStatusInfo.IncrementExceptionOccured(); clientCom.LogMessage(ex.ToString()); } catch (Exception ex) { SlaveStatusInfo.IncrementExceptionOccured(); clientCom.LogMessage(ex.ToString()); } } private void jobManager_ExceptionOccured(object sender, EventArgs e) { SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded); SlaveStatusInfo.IncrementExceptionOccured(); heartbeatManager.AwakeHeartBeatThread(); clientCom.LogMessage(string.Format("Exception occured for job {0}: {1}", e.Value.JobId, e.Value2.ToString())); wcfService.UpdateJobState(e.Value.JobId, JobState.Waiting, e.Value2.ToString()); } private void jobManager_JobAborted(object sender, EventArgs e) { SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded); } #endregion #region Log Events private void log_MessageAdded(object sender, EventArgs e) { clientCom.LogMessage(e.Value.Split('\t')[1]); ((ILog)sender).Clear(); // don't let the log take up memory } #endregion /// /// aborts all running jobs, no results are sent back /// private void DoAbortAll() { clientCom.LogMessage("Aborting all jobs."); foreach (Guid jobId in jobManager.JobIds) { AbortJobAsync(jobId); } } /// /// wait for jobs to finish, then pause client /// private void DoPauseAll() { clientCom.LogMessage("Pausing all jobs."); foreach (Guid jobId in jobManager.JobIds) { PauseJobAsync(jobId); } } /// /// pause slave immediately /// private void DoStopAll() { clientCom.LogMessage("Stopping all jobs."); foreach (Guid jobId in jobManager.JobIds) { StopJobAsync(jobId); } } #region Slave Lifecycle Methods /// /// completly shudown slave /// public void Shutdown() { MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave); MessageQueue.GetInstance().AddMessage(mc); waitShutdownSem.WaitOne(); } /// /// complete shutdown, should be called before the the application is exited /// private void ShutdownCore() { clientCom.LogMessage("Shutdown signal received"); clientCom.LogMessage("Stopping heartbeat"); heartbeatManager.StopHeartBeat(); abortRequested = true; DoAbortAll(); clientCom.LogMessage("Logging out"); WcfService.Instance.Disconnect(); clientCom.Shutdown(); SlaveClientCom.Close(); if (slaveComm.State != CommunicationState.Closed) slaveComm.Close(); } /// /// reinitializes everything and continues operation, /// can be called after Sleep() /// private void DoStartSlave() { clientCom.LogMessage("Restart received"); configManager.Asleep = false; } /// /// stop slave, except for client gui communication, /// primarily used by gui if core is running as windows service /// private void Sleep() { clientCom.LogMessage("Sleep received - not accepting any new jobs"); configManager.Asleep = true; DoPauseAll(); } #endregion } }