#region License Information /* HeuristicLab * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Threading; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Hive; using HeuristicLab.PluginInfrastructure; namespace HeuristicLab.Clients.Hive.SlaveCore { public class Executor : MarshalByRefObject, IDisposable { public Guid JobId { get; set; } public IJob Job { get; set; } public int CoresNeeded { get; set; } public int MemoryNeeded { get; set; } private bool wasJobAborted = false; public Core Core { get; set; } private Semaphore pauseStopSem = new Semaphore(0, 1); private Semaphore startJobSem = new Semaphore(0, 1); public bool SendHeartbeatForExecutor { get; set; } public bool Aborted { get; set; } public DateTime CreationTime { get; set; } private Exception currentException; public String CurrentException { get { if (currentException != null) { return currentException.ToString(); } else { return string.Empty; } } } public ExecutionState ExecutionState { get { return Job != null ? Job.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped; } } public TimeSpan ExecutionTime { get { return Job != null ? Job.ExecutionTime : new TimeSpan(0, 0, 0); } } public Executor() { SendHeartbeatForExecutor = true; } /// /// if true, all child-jobs are downloaded and the job will be resumed. public void Start(byte[] serializedJob) { try { CreationTime = DateTime.Now; Aborted = false; Job = PersistenceUtil.Deserialize(serializedJob); RegisterJobEvents(); if (Job.CollectChildJobs) { IEnumerable childjobs = WcfService.Instance.GetChildJobs(JobId); Job.Resume(childjobs.Select(j => PersistenceUtil.Deserialize(j.Data))); } else { Job.Start(); startJobSem.WaitOne(); } } catch (Exception e) { this.currentException = e; Job_JobFailed(this, new HeuristicLab.Common.EventArgs(e)); } } public void Pause() { SendHeartbeatForExecutor = false; if (Job == null) { currentException = new Exception("Pausing job " + this.JobId + ": Job is null"); Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId); } if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) { try { Job.Pause(); //we need to block the pause... pauseStopSem.WaitOne(); } catch (Exception ex) { currentException = new Exception("Error pausing job " + this.JobId + ": " + ex.ToString()); } } } public void Stop() { SendHeartbeatForExecutor = false; if (Job == null) { currentException = new Exception("Stopping job " + this.JobId + ": Job is null"); Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId); } wasJobAborted = true; if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) { try { Job.Stop(); pauseStopSem.WaitOne(); } catch (Exception ex) { currentException = new Exception("Error stopping job " + this.JobId + ": " + ex.ToString()); } } } private void RegisterJobEvents() { Job.JobStopped += new EventHandler(Job_JobStopped); Job.JobFailed += new EventHandler(Job_JobFailed); Job.NewChildJob += new EventHandler>(Job_NewChildJob); Job.WaitForChildJobs += new EventHandler(Job_WaitForChildJobs); Job.DeleteChildJobs += new EventHandler(Job_DeleteChildJobs); Job.JobPaused += new EventHandler(Job_JobPaused); Job.JobStarted += new EventHandler(Job_JobStarted); } private void DeregisterJobEvents() { Job.JobStopped -= new EventHandler(Job_JobStopped); Job.JobFailed -= new EventHandler(Job_JobFailed); Job.NewChildJob -= new EventHandler>(Job_NewChildJob); Job.WaitForChildJobs -= new EventHandler(Job_WaitForChildJobs); Job.DeleteChildJobs -= new EventHandler(Job_DeleteChildJobs); Job.JobPaused -= new EventHandler(Job_JobPaused); Job.JobStarted -= new EventHandler(Job_JobStarted); } private List FindPluginsNeeded(IJob obj) { List guids = new List(); foreach (IPluginDescription desc in PluginUtil.GetDeclaringPlugins(obj)) { } throw new NotImplementedException("FindPluginsNeeded for Job_NewChildJob"); return guids; } private void Job_NewChildJob(object sender, EventArgs e) { JobData childJobData = new JobData(); childJobData.Data = PersistenceUtil.Serialize(e.Value); Job childJob = new Job(); childJob.CoresNeeded = 1; childJob.MemoryNeeded = 0; childJob.PluginsNeededIds = FindPluginsNeeded(e.Value); //TODO: is return value needed? WcfService.Instance.AddChildJob(this.JobId, childJob, childJobData); } private void Job_WaitForChildJobs(object sender, EventArgs e) { // Pause the job and send it back to the hive. The server will awake it when all child-jobs are finished this.Job.CollectChildJobs = true; JobData jdata = new JobData(); jdata.Data = PersistenceUtil.Serialize(Job); jdata.JobId = this.JobId; Core.PauseWaitJob(jdata); } private void Job_DeleteChildJobs(object sender, EventArgs e) { WcfService.Instance.DeleteChildJobs(JobId); } private void Job_JobFailed(object sender, EventArgs e) { HeuristicLab.Common.EventArgs ex = (HeuristicLab.Common.EventArgs)e; currentException = ex.Value; Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId); Aborted = true; } private void Job_JobStopped(object sender, EventArgs e) { if (wasJobAborted) { pauseStopSem.Release(); Aborted = true; } else { //it's a clean and finished job, so send it Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId); } } public JobData GetFinishedJob() { if (Job == null) { if (currentException == null) { currentException = new Exception("Getting finished job " + this.JobId + ": Job is null"); } Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId); } if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) { try { Job.Stop(); wasJobAborted = true; pauseStopSem.WaitOne(); } catch (Exception ex) { currentException = new Exception("Error getting finished job " + this.JobId + ": " + ex.ToString()); } } return GetJob(); } public JobData GetPausedJob() { if (Job.ExecutionState != HeuristicLab.Core.ExecutionState.Paused) { throw new Exception("Executor: Job has to be paused before fetching results."); } return GetJob(); } private void Job_JobPaused(object sender, EventArgs e) { pauseStopSem.Release(); } void Job_JobStarted(object sender, EventArgs e) { startJobSem.Release(); } private JobData GetJob() { if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) { throw new InvalidStateException("Job is still running"); } else { JobData jdata = new JobData(); jdata.Data = PersistenceUtil.Serialize(Job); jdata.JobId = JobId; return jdata; } } public void Dispose() { if (Job != null) DeregisterJobEvents(); Job = null; } } }