[5105] | 1 | #region License Information
|
---|
| 2 | /* HeuristicLab
|
---|
| 3 | * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
|
---|
| 4 | *
|
---|
| 5 | * This file is part of HeuristicLab.
|
---|
| 6 | *
|
---|
| 7 | * HeuristicLab is free software: you can redistribute it and/or modify
|
---|
| 8 | * it under the terms of the GNU General Public License as published by
|
---|
| 9 | * the Free Software Foundation, either version 3 of the License, or
|
---|
| 10 | * (at your option) any later version.
|
---|
| 11 | *
|
---|
| 12 | * HeuristicLab is distributed in the hope that it will be useful,
|
---|
| 13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of
|
---|
| 14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
---|
| 15 | * GNU General Public License for more details.
|
---|
| 16 | *
|
---|
| 17 | * You should have received a copy of the GNU General Public License
|
---|
| 18 | * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
|
---|
| 19 | */
|
---|
| 20 | #endregion
|
---|
| 21 |
|
---|
| 22 | using System;
|
---|
[5137] | 23 | using System.Collections.Generic;
|
---|
[5105] | 24 | using System.Linq;
|
---|
[5782] | 25 | using System.Threading;
|
---|
[5105] | 26 | using HeuristicLab.Common;
|
---|
| 27 | using HeuristicLab.Core;
|
---|
| 28 | using HeuristicLab.Hive;
|
---|
[5137] | 29 | using HeuristicLab.PluginInfrastructure;
|
---|
[5105] | 30 |
|
---|
[5137] | 31 |
|
---|
[5599] | 32 | namespace HeuristicLab.Clients.Hive.SlaveCore {
|
---|
[5105] | 33 | public class Executor : MarshalByRefObject, IDisposable {
|
---|
| 34 | public Guid JobId { get; set; }
|
---|
| 35 | public IJob Job { get; set; }
|
---|
[6168] | 36 | public int CoresNeeded { get; set; }
|
---|
| 37 | public int MemoryNeeded { get; set; }
|
---|
[5137] | 38 | private bool wasJobAborted = false;
|
---|
[5450] | 39 | public Core Core { get; set; }
|
---|
[5782] | 40 | private Semaphore pauseStopSem = new Semaphore(0, 1);
|
---|
[6112] | 41 | private Semaphore startJobSem = new Semaphore(0, 1);
|
---|
[6203] | 42 | //make pause or stop wait until start is finished
|
---|
| 43 | private Semaphore jobStartedSem = new Semaphore(0, 1);
|
---|
[5137] | 44 |
|
---|
[6203] | 45 | public ExecutorQueue executorQueue;
|
---|
| 46 |
|
---|
[6004] | 47 | public bool SendHeartbeatForExecutor { get; set; }
|
---|
| 48 |
|
---|
[5826] | 49 | public bool Aborted { get; set; }
|
---|
| 50 |
|
---|
[6004] | 51 | public DateTime CreationTime { get; set; }
|
---|
| 52 |
|
---|
[5105] | 53 | private Exception currentException;
|
---|
| 54 | public String CurrentException {
|
---|
| 55 | get {
|
---|
| 56 | if (currentException != null) {
|
---|
| 57 | return currentException.ToString();
|
---|
| 58 | } else {
|
---|
| 59 | return string.Empty;
|
---|
| 60 | }
|
---|
| 61 | }
|
---|
| 62 | }
|
---|
[5137] | 63 |
|
---|
[5105] | 64 | public ExecutionState ExecutionState {
|
---|
| 65 | get {
|
---|
| 66 | return Job != null ? Job.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped;
|
---|
| 67 | }
|
---|
| 68 | }
|
---|
| 69 |
|
---|
| 70 | public TimeSpan ExecutionTime {
|
---|
| 71 | get {
|
---|
| 72 | return Job != null ? Job.ExecutionTime : new TimeSpan(0, 0, 0);
|
---|
| 73 | }
|
---|
| 74 | }
|
---|
| 75 |
|
---|
[6004] | 76 | public Executor() {
|
---|
| 77 | SendHeartbeatForExecutor = true;
|
---|
[6203] | 78 | executorQueue = new ExecutorQueue();
|
---|
[6004] | 79 | }
|
---|
[5105] | 80 |
|
---|
| 81 | /// <param name="serializedJob"></param>
|
---|
| 82 | /// <param name="collectChildJobs">if true, all child-jobs are downloaded and the job will be resumed.</param>
|
---|
| 83 | public void Start(byte[] serializedJob) {
|
---|
| 84 | try {
|
---|
| 85 | CreationTime = DateTime.Now;
|
---|
[5826] | 86 | Aborted = false;
|
---|
[5105] | 87 | Job = PersistenceUtil.Deserialize<IJob>(serializedJob);
|
---|
| 88 |
|
---|
| 89 | RegisterJobEvents();
|
---|
| 90 |
|
---|
[5137] | 91 | if (Job.CollectChildJobs) {
|
---|
[5105] | 92 | IEnumerable<JobData> childjobs = WcfService.Instance.GetChildJobs(JobId);
|
---|
| 93 | Job.Resume(childjobs.Select(j => PersistenceUtil.Deserialize<IJob>(j.Data)));
|
---|
| 94 | } else {
|
---|
| 95 | Job.Start();
|
---|
[6203] | 96 | if (!jobStartedSem.WaitOne(TimeSpan.FromSeconds(15))) {
|
---|
[6178] | 97 | throw new TimeoutException("Timeout when starting the job. JobStarted event was not fired.");
|
---|
| 98 | }
|
---|
[6203] | 99 | jobStartedSem.Release();
|
---|
[5105] | 100 | }
|
---|
| 101 | }
|
---|
| 102 | catch (Exception e) {
|
---|
[5137] | 103 | this.currentException = e;
|
---|
[6178] | 104 | Job_JobFailed(this, new EventArgs<Exception>(e));
|
---|
[5105] | 105 | }
|
---|
| 106 | }
|
---|
| 107 |
|
---|
[5314] | 108 | public void Pause() {
|
---|
[6004] | 109 | SendHeartbeatForExecutor = false;
|
---|
[6203] | 110 | // wait until job is started. if this does not happen, the Job is null an we give up
|
---|
| 111 | jobStartedSem.WaitOne(TimeSpan.FromSeconds(15));
|
---|
[5782] | 112 | if (Job == null) {
|
---|
[6100] | 113 | currentException = new Exception("Pausing job " + this.JobId + ": Job is null");
|
---|
[6203] | 114 | return;
|
---|
[5782] | 115 | }
|
---|
[5469] | 116 |
|
---|
[5782] | 117 | if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
|
---|
| 118 | try {
|
---|
| 119 | Job.Pause();
|
---|
| 120 | //we need to block the pause...
|
---|
| 121 | pauseStopSem.WaitOne();
|
---|
| 122 | }
|
---|
| 123 | catch (Exception ex) {
|
---|
[6100] | 124 | currentException = new Exception("Error pausing job " + this.JobId + ": " + ex.ToString());
|
---|
[5782] | 125 | }
|
---|
| 126 | }
|
---|
[5314] | 127 | }
|
---|
| 128 |
|
---|
[5450] | 129 | public void Stop() {
|
---|
[6004] | 130 | SendHeartbeatForExecutor = false;
|
---|
[6203] | 131 | // wait until job is started. if this does not happen, the Job is null an we give up
|
---|
| 132 | jobStartedSem.WaitOne(TimeSpan.FromSeconds(15));
|
---|
[5782] | 133 | if (Job == null) {
|
---|
[6100] | 134 | currentException = new Exception("Stopping job " + this.JobId + ": Job is null");
|
---|
[5782] | 135 | }
|
---|
[5137] | 136 | wasJobAborted = true;
|
---|
[5782] | 137 |
|
---|
[5105] | 138 | if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) {
|
---|
[5782] | 139 | try {
|
---|
| 140 | Job.Stop();
|
---|
| 141 | pauseStopSem.WaitOne();
|
---|
| 142 | }
|
---|
| 143 | catch (Exception ex) {
|
---|
[6100] | 144 | currentException = new Exception("Error stopping job " + this.JobId + ": " + ex.ToString());
|
---|
[5782] | 145 | }
|
---|
[5105] | 146 | }
|
---|
| 147 | }
|
---|
| 148 |
|
---|
| 149 | private void RegisterJobEvents() {
|
---|
| 150 | Job.JobStopped += new EventHandler(Job_JobStopped);
|
---|
| 151 | Job.JobFailed += new EventHandler(Job_JobFailed);
|
---|
| 152 | Job.NewChildJob += new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
|
---|
| 153 | Job.WaitForChildJobs += new EventHandler(Job_WaitForChildJobs);
|
---|
| 154 | Job.DeleteChildJobs += new EventHandler(Job_DeleteChildJobs);
|
---|
[5782] | 155 | Job.JobPaused += new EventHandler(Job_JobPaused);
|
---|
[6112] | 156 | Job.JobStarted += new EventHandler(Job_JobStarted);
|
---|
[5105] | 157 | }
|
---|
| 158 |
|
---|
| 159 | private void DeregisterJobEvents() {
|
---|
| 160 | Job.JobStopped -= new EventHandler(Job_JobStopped);
|
---|
| 161 | Job.JobFailed -= new EventHandler(Job_JobFailed);
|
---|
| 162 | Job.NewChildJob -= new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
|
---|
| 163 | Job.WaitForChildJobs -= new EventHandler(Job_WaitForChildJobs);
|
---|
| 164 | Job.DeleteChildJobs -= new EventHandler(Job_DeleteChildJobs);
|
---|
[5782] | 165 | Job.JobPaused -= new EventHandler(Job_JobPaused);
|
---|
[6112] | 166 | Job.JobStarted -= new EventHandler(Job_JobStarted);
|
---|
[5105] | 167 | }
|
---|
| 168 |
|
---|
| 169 | private List<Guid> FindPluginsNeeded(IJob obj) {
|
---|
| 170 | List<Guid> guids = new List<Guid>();
|
---|
[5137] | 171 | foreach (IPluginDescription desc in PluginUtil.GetDeclaringPlugins(obj)) {
|
---|
[5105] | 172 | }
|
---|
| 173 | throw new NotImplementedException("FindPluginsNeeded for Job_NewChildJob");
|
---|
| 174 |
|
---|
| 175 | return guids;
|
---|
| 176 | }
|
---|
| 177 |
|
---|
| 178 | private void Job_NewChildJob(object sender, EventArgs<IJob> e) {
|
---|
| 179 | JobData childJobData = new JobData();
|
---|
| 180 | childJobData.Data = PersistenceUtil.Serialize(e.Value);
|
---|
| 181 |
|
---|
| 182 | Job childJob = new Job();
|
---|
| 183 | childJob.CoresNeeded = 1;
|
---|
| 184 | childJob.MemoryNeeded = 0;
|
---|
| 185 | childJob.PluginsNeededIds = FindPluginsNeeded(e.Value);
|
---|
| 186 |
|
---|
[6203] | 187 | ExecutorMessage msg = new ExecutorMessage(ExecutorMessageType.NewChildJob);
|
---|
| 188 | msg.MsgData = childJobData;
|
---|
| 189 | msg.MsgJob = childJob;
|
---|
| 190 |
|
---|
| 191 | executorQueue.AddMessage(msg);
|
---|
[5105] | 192 | }
|
---|
| 193 |
|
---|
| 194 | private void Job_WaitForChildJobs(object sender, EventArgs e) {
|
---|
| 195 | // Pause the job and send it back to the hive. The server will awake it when all child-jobs are finished
|
---|
| 196 | this.Job.CollectChildJobs = true;
|
---|
[5137] | 197 |
|
---|
[5105] | 198 | JobData jdata = new JobData();
|
---|
| 199 | jdata.Data = PersistenceUtil.Serialize(Job);
|
---|
| 200 | jdata.JobId = this.JobId;
|
---|
[5137] | 201 |
|
---|
[6203] | 202 | ExecutorMessage msg = new ExecutorMessage(ExecutorMessageType.WaitForChildJobs);
|
---|
| 203 | msg.MsgData = jdata;
|
---|
| 204 | executorQueue.AddMessage(msg);
|
---|
[5105] | 205 | }
|
---|
| 206 |
|
---|
| 207 | private void Job_DeleteChildJobs(object sender, EventArgs e) {
|
---|
[6203] | 208 | executorQueue.AddMessage(ExecutorMessageType.DeleteChildJobs);
|
---|
[5105] | 209 | }
|
---|
| 210 |
|
---|
| 211 | private void Job_JobFailed(object sender, EventArgs e) {
|
---|
| 212 | HeuristicLab.Common.EventArgs<Exception> ex = (HeuristicLab.Common.EventArgs<Exception>)e;
|
---|
| 213 | currentException = ex.Value;
|
---|
[5826] | 214 | Aborted = true;
|
---|
[6203] | 215 |
|
---|
| 216 | executorQueue.AddMessage(ExecutorMessageType.JobFailed);
|
---|
[5105] | 217 | }
|
---|
| 218 |
|
---|
| 219 | private void Job_JobStopped(object sender, EventArgs e) {
|
---|
[5137] | 220 | if (wasJobAborted) {
|
---|
[5782] | 221 | pauseStopSem.Release();
|
---|
[5826] | 222 | Aborted = true;
|
---|
[5105] | 223 | } else {
|
---|
[6110] | 224 | //it's a clean and finished job, so send it
|
---|
[6203] | 225 | executorQueue.AddMessage(ExecutorMessageType.JobStopped);
|
---|
[5105] | 226 | }
|
---|
| 227 | }
|
---|
| 228 |
|
---|
| 229 | public JobData GetFinishedJob() {
|
---|
| 230 | if (Job == null) {
|
---|
[6100] | 231 | if (currentException == null) {
|
---|
| 232 | currentException = new Exception("Getting finished job " + this.JobId + ": Job is null");
|
---|
[6203] | 233 | return GetJob();
|
---|
[6100] | 234 | }
|
---|
[5137] | 235 | }
|
---|
[5105] | 236 |
|
---|
| 237 | if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
|
---|
[5782] | 238 | try {
|
---|
| 239 | Job.Stop();
|
---|
| 240 | wasJobAborted = true;
|
---|
| 241 | pauseStopSem.WaitOne();
|
---|
| 242 | }
|
---|
| 243 | catch (Exception ex) {
|
---|
[6100] | 244 | currentException = new Exception("Error getting finished job " + this.JobId + ": " + ex.ToString());
|
---|
[5782] | 245 | }
|
---|
[5105] | 246 | }
|
---|
| 247 |
|
---|
[5778] | 248 | return GetJob();
|
---|
| 249 | }
|
---|
| 250 |
|
---|
| 251 | public JobData GetPausedJob() {
|
---|
[5782] | 252 | if (Job.ExecutionState != HeuristicLab.Core.ExecutionState.Paused) {
|
---|
| 253 | throw new Exception("Executor: Job has to be paused before fetching results.");
|
---|
[5778] | 254 | }
|
---|
| 255 | return GetJob();
|
---|
| 256 | }
|
---|
| 257 |
|
---|
[5782] | 258 | private void Job_JobPaused(object sender, EventArgs e) {
|
---|
| 259 | pauseStopSem.Release();
|
---|
| 260 | }
|
---|
| 261 |
|
---|
[6112] | 262 | void Job_JobStarted(object sender, EventArgs e) {
|
---|
[6203] | 263 | jobStartedSem.Release();
|
---|
[6112] | 264 | }
|
---|
| 265 |
|
---|
[5778] | 266 | private JobData GetJob() {
|
---|
| 267 | if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
|
---|
[5137] | 268 | throw new InvalidStateException("Job is still running");
|
---|
[5105] | 269 | } else {
|
---|
| 270 | JobData jdata = new JobData();
|
---|
[6203] | 271 | if (Job == null) {
|
---|
| 272 | //send empty job and save exception
|
---|
| 273 | jdata.Data = PersistenceUtil.Serialize(new JobData());
|
---|
| 274 | if (currentException == null) {
|
---|
| 275 | currentException = new Exception("Job with id " + this.JobId + " is null, sending empty job");
|
---|
| 276 | }
|
---|
| 277 | } else {
|
---|
| 278 | jdata.Data = PersistenceUtil.Serialize(Job);
|
---|
| 279 | }
|
---|
[5105] | 280 | jdata.JobId = JobId;
|
---|
| 281 | return jdata;
|
---|
| 282 | }
|
---|
[5137] | 283 | }
|
---|
| 284 |
|
---|
[5105] | 285 | public void Dispose() {
|
---|
| 286 | if (Job != null)
|
---|
| 287 | DeregisterJobEvents();
|
---|
| 288 | Job = null;
|
---|
| 289 | }
|
---|
| 290 | }
|
---|
| 291 | }
|
---|