#region License Information /* HeuristicLab * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Threading; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Hive; using HeuristicLab.PluginInfrastructure; namespace HeuristicLab.Clients.Hive.SlaveCore { public class Executor : MarshalByRefObject, IDisposable { public Guid JobId { get; set; } public IJob Job { get; set; } public int CoresNeeded { get; set; } public int MemoryNeeded { get; set; } private bool wasJobAborted = false; public Core Core { get; set; } private Semaphore pauseStopSem = new Semaphore(0, 1); private Semaphore startJobSem = new Semaphore(0, 1); //make pause or stop wait until start is finished private Semaphore jobStartedSem = new Semaphore(0, 1); public ExecutorQueue executorQueue; public bool SendHeartbeatForExecutor { get; set; } public bool Aborted { get; set; } public DateTime CreationTime { get; set; } private Exception currentException; public String CurrentException { get { if (currentException != null) { return currentException.ToString(); } else { return string.Empty; } } } public ExecutionState ExecutionState { get { return Job != null ? Job.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped; } } public TimeSpan ExecutionTime { get { return Job != null ? Job.ExecutionTime : new TimeSpan(0, 0, 0); } } public Executor() { SendHeartbeatForExecutor = true; executorQueue = new ExecutorQueue(); } /// /// if true, all child-jobs are downloaded and the job will be resumed. public void Start(byte[] serializedJob) { try { CreationTime = DateTime.Now; Aborted = false; Job = PersistenceUtil.Deserialize(serializedJob); RegisterJobEvents(); if (Job.CollectChildJobs) { IEnumerable childjobs = WcfService.Instance.GetChildJobs(JobId); Job.Resume(childjobs.Select(j => PersistenceUtil.Deserialize(j.Data))); } else { Job.Start(); if (!jobStartedSem.WaitOne(TimeSpan.FromSeconds(15))) { throw new TimeoutException("Timeout when starting the job. JobStarted event was not fired."); } jobStartedSem.Release(); } } catch (Exception e) { this.currentException = e; Job_JobFailed(this, new EventArgs(e)); } } public void Pause() { SendHeartbeatForExecutor = false; // wait until job is started. if this does not happen, the Job is null an we give up jobStartedSem.WaitOne(TimeSpan.FromSeconds(15)); if (Job == null) { currentException = new Exception("Pausing job " + this.JobId + ": Job is null"); return; } if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) { try { Job.Pause(); //we need to block the pause... pauseStopSem.WaitOne(); } catch (Exception ex) { currentException = new Exception("Error pausing job " + this.JobId + ": " + ex.ToString()); } } } public void Stop() { SendHeartbeatForExecutor = false; // wait until job is started. if this does not happen, the Job is null an we give up jobStartedSem.WaitOne(TimeSpan.FromSeconds(15)); if (Job == null) { currentException = new Exception("Stopping job " + this.JobId + ": Job is null"); } wasJobAborted = true; if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) { try { Job.Stop(); pauseStopSem.WaitOne(); } catch (Exception ex) { currentException = new Exception("Error stopping job " + this.JobId + ": " + ex.ToString()); } } } private void RegisterJobEvents() { Job.JobStopped += new EventHandler(Job_JobStopped); Job.JobFailed += new EventHandler(Job_JobFailed); Job.NewChildJob += new EventHandler>(Job_NewChildJob); Job.WaitForChildJobs += new EventHandler(Job_WaitForChildJobs); Job.DeleteChildJobs += new EventHandler(Job_DeleteChildJobs); Job.JobPaused += new EventHandler(Job_JobPaused); Job.JobStarted += new EventHandler(Job_JobStarted); } private void DeregisterJobEvents() { Job.JobStopped -= new EventHandler(Job_JobStopped); Job.JobFailed -= new EventHandler(Job_JobFailed); Job.NewChildJob -= new EventHandler>(Job_NewChildJob); Job.WaitForChildJobs -= new EventHandler(Job_WaitForChildJobs); Job.DeleteChildJobs -= new EventHandler(Job_DeleteChildJobs); Job.JobPaused -= new EventHandler(Job_JobPaused); Job.JobStarted -= new EventHandler(Job_JobStarted); } private List FindPluginsNeeded(IJob obj) { List guids = new List(); foreach (IPluginDescription desc in PluginUtil.GetDeclaringPlugins(obj)) { } throw new NotImplementedException("FindPluginsNeeded for Job_NewChildJob"); return guids; } private void Job_NewChildJob(object sender, EventArgs e) { JobData childJobData = new JobData(); childJobData.Data = PersistenceUtil.Serialize(e.Value); Job childJob = new Job(); childJob.CoresNeeded = 1; childJob.MemoryNeeded = 0; childJob.PluginsNeededIds = FindPluginsNeeded(e.Value); ExecutorMessage msg = new ExecutorMessage(ExecutorMessageType.NewChildJob); msg.MsgData = childJobData; msg.MsgJob = childJob; executorQueue.AddMessage(msg); } private void Job_WaitForChildJobs(object sender, EventArgs e) { // Pause the job and send it back to the hive. The server will awake it when all child-jobs are finished this.Job.CollectChildJobs = true; JobData jdata = new JobData(); jdata.Data = PersistenceUtil.Serialize(Job); jdata.JobId = this.JobId; ExecutorMessage msg = new ExecutorMessage(ExecutorMessageType.WaitForChildJobs); msg.MsgData = jdata; executorQueue.AddMessage(msg); } private void Job_DeleteChildJobs(object sender, EventArgs e) { executorQueue.AddMessage(ExecutorMessageType.DeleteChildJobs); } private void Job_JobFailed(object sender, EventArgs e) { HeuristicLab.Common.EventArgs ex = (HeuristicLab.Common.EventArgs)e; currentException = ex.Value; Aborted = true; executorQueue.AddMessage(ExecutorMessageType.JobFailed); } private void Job_JobStopped(object sender, EventArgs e) { if (wasJobAborted) { pauseStopSem.Release(); Aborted = true; } else { //it's a clean and finished job, so send it executorQueue.AddMessage(ExecutorMessageType.JobStopped); } } public JobData GetFinishedJob() { if (Job == null) { if (currentException == null) { currentException = new Exception("Getting finished job " + this.JobId + ": Job is null"); return GetJob(); } } if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) { try { Job.Stop(); wasJobAborted = true; pauseStopSem.WaitOne(); } catch (Exception ex) { currentException = new Exception("Error getting finished job " + this.JobId + ": " + ex.ToString()); } } return GetJob(); } public JobData GetPausedJob() { if (Job.ExecutionState != HeuristicLab.Core.ExecutionState.Paused) { throw new Exception("Executor: Job has to be paused before fetching results."); } return GetJob(); } private void Job_JobPaused(object sender, EventArgs e) { pauseStopSem.Release(); } void Job_JobStarted(object sender, EventArgs e) { jobStartedSem.Release(); } private JobData GetJob() { if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) { throw new InvalidStateException("Job is still running"); } else { JobData jdata = new JobData(); if (Job == null) { //send empty job and save exception jdata.Data = PersistenceUtil.Serialize(new JobData()); if (currentException == null) { currentException = new Exception("Job with id " + this.JobId + " is null, sending empty job"); } } else { jdata.Data = PersistenceUtil.Serialize(Job); } jdata.JobId = JobId; return jdata; } } public void Dispose() { if (Job != null) DeregisterJobEvents(); Job = null; } } }