#region License Information
/* HeuristicLab
* Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using HeuristicLab.Common;
using HeuristicLab.Core;
using HeuristicLab.Hive;
using HeuristicLab.PluginInfrastructure;
namespace HeuristicLab.Clients.Hive.SlaveCore {
public class Executor : MarshalByRefObject, IDisposable {
public Guid JobId { get; set; }
public IJob Job { get; set; }
public int CoresNeeded { get; set; }
public int MemoryNeeded { get; set; }
private bool wasJobAborted = false;
public Core Core { get; set; }
private Semaphore pauseStopSem = new Semaphore(0, 1);
private Semaphore startJobSem = new Semaphore(0, 1);
public bool SendHeartbeatForExecutor { get; set; }
public bool Aborted { get; set; }
public DateTime CreationTime { get; set; }
private Exception currentException;
public String CurrentException {
get {
if (currentException != null) {
return currentException.ToString();
} else {
return string.Empty;
}
}
}
public ExecutionState ExecutionState {
get {
return Job != null ? Job.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped;
}
}
public TimeSpan ExecutionTime {
get {
return Job != null ? Job.ExecutionTime : new TimeSpan(0, 0, 0);
}
}
public Executor() {
SendHeartbeatForExecutor = true;
}
///
/// if true, all child-jobs are downloaded and the job will be resumed.
public void Start(byte[] serializedJob) {
try {
CreationTime = DateTime.Now;
Aborted = false;
Job = PersistenceUtil.Deserialize(serializedJob);
RegisterJobEvents();
if (Job.CollectChildJobs) {
IEnumerable childjobs = WcfService.Instance.GetChildJobs(JobId);
Job.Resume(childjobs.Select(j => PersistenceUtil.Deserialize(j.Data)));
} else {
Job.Start();
startJobSem.WaitOne();
}
}
catch (Exception e) {
this.currentException = e;
Job_JobFailed(this, new HeuristicLab.Common.EventArgs(e));
}
}
public void Pause() {
SendHeartbeatForExecutor = false;
if (Job == null) {
currentException = new Exception("Pausing job " + this.JobId + ": Job is null");
Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
}
if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
try {
Job.Pause();
//we need to block the pause...
pauseStopSem.WaitOne();
}
catch (Exception ex) {
currentException = new Exception("Error pausing job " + this.JobId + ": " + ex.ToString());
}
}
}
public void Stop() {
SendHeartbeatForExecutor = false;
if (Job == null) {
currentException = new Exception("Stopping job " + this.JobId + ": Job is null");
Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
}
wasJobAborted = true;
if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) {
try {
Job.Stop();
pauseStopSem.WaitOne();
}
catch (Exception ex) {
currentException = new Exception("Error stopping job " + this.JobId + ": " + ex.ToString());
}
}
}
private void RegisterJobEvents() {
Job.JobStopped += new EventHandler(Job_JobStopped);
Job.JobFailed += new EventHandler(Job_JobFailed);
Job.NewChildJob += new EventHandler>(Job_NewChildJob);
Job.WaitForChildJobs += new EventHandler(Job_WaitForChildJobs);
Job.DeleteChildJobs += new EventHandler(Job_DeleteChildJobs);
Job.JobPaused += new EventHandler(Job_JobPaused);
Job.JobStarted += new EventHandler(Job_JobStarted);
}
private void DeregisterJobEvents() {
Job.JobStopped -= new EventHandler(Job_JobStopped);
Job.JobFailed -= new EventHandler(Job_JobFailed);
Job.NewChildJob -= new EventHandler>(Job_NewChildJob);
Job.WaitForChildJobs -= new EventHandler(Job_WaitForChildJobs);
Job.DeleteChildJobs -= new EventHandler(Job_DeleteChildJobs);
Job.JobPaused -= new EventHandler(Job_JobPaused);
Job.JobStarted -= new EventHandler(Job_JobStarted);
}
private List FindPluginsNeeded(IJob obj) {
List guids = new List();
foreach (IPluginDescription desc in PluginUtil.GetDeclaringPlugins(obj)) {
}
throw new NotImplementedException("FindPluginsNeeded for Job_NewChildJob");
return guids;
}
private void Job_NewChildJob(object sender, EventArgs e) {
JobData childJobData = new JobData();
childJobData.Data = PersistenceUtil.Serialize(e.Value);
Job childJob = new Job();
childJob.CoresNeeded = 1;
childJob.MemoryNeeded = 0;
childJob.PluginsNeededIds = FindPluginsNeeded(e.Value);
//TODO: is return value needed?
WcfService.Instance.AddChildJob(this.JobId, childJob, childJobData);
}
private void Job_WaitForChildJobs(object sender, EventArgs e) {
// Pause the job and send it back to the hive. The server will awake it when all child-jobs are finished
this.Job.CollectChildJobs = true;
JobData jdata = new JobData();
jdata.Data = PersistenceUtil.Serialize(Job);
jdata.JobId = this.JobId;
Core.PauseWaitJob(jdata);
}
private void Job_DeleteChildJobs(object sender, EventArgs e) {
WcfService.Instance.DeleteChildJobs(JobId);
}
private void Job_JobFailed(object sender, EventArgs e) {
HeuristicLab.Common.EventArgs ex = (HeuristicLab.Common.EventArgs)e;
currentException = ex.Value;
Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId);
Aborted = true;
}
private void Job_JobStopped(object sender, EventArgs e) {
if (wasJobAborted) {
pauseStopSem.Release();
Aborted = true;
} else {
//it's a clean and finished job, so send it
Core.EnqueueExecutorMessage(Core.SendFinishedJob, JobId);
}
}
public JobData GetFinishedJob() {
if (Job == null) {
if (currentException == null) {
currentException = new Exception("Getting finished job " + this.JobId + ": Job is null");
}
Core.EnqueueExecutorMessage(Core.KillAppDomain, JobId);
}
if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
try {
Job.Stop();
wasJobAborted = true;
pauseStopSem.WaitOne();
}
catch (Exception ex) {
currentException = new Exception("Error getting finished job " + this.JobId + ": " + ex.ToString());
}
}
return GetJob();
}
public JobData GetPausedJob() {
if (Job.ExecutionState != HeuristicLab.Core.ExecutionState.Paused) {
throw new Exception("Executor: Job has to be paused before fetching results.");
}
return GetJob();
}
private void Job_JobPaused(object sender, EventArgs e) {
pauseStopSem.Release();
}
void Job_JobStarted(object sender, EventArgs e) {
startJobSem.Release();
}
private JobData GetJob() {
if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
throw new InvalidStateException("Job is still running");
} else {
JobData jdata = new JobData();
jdata.Data = PersistenceUtil.Serialize(Job);
jdata.JobId = JobId;
return jdata;
}
}
public void Dispose() {
if (Job != null)
DeregisterJobEvents();
Job = null;
}
}
}