Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.ExecutionEngine/3.3/Executor.cs @ 4423

Last change on this file since 4423 was 4423, checked in by cneumuel, 14 years ago
  • Refactored HL.Hive.Experiment. JobItems are not called HiveJobs and OptimizerJobs do not contain a hierarchy anymore.
  • Dynamic generation of jobs on a slave are not reflected on the client user interface.
  • Optimizer-Trees are now strictly synchronized with the HiveJob-Trees (also the ComputeInParallel property is taken into account when the Child HiveJobs are created)
  • Improved the way a class can report progress and lock the UI (IProgressReporter, IProgress, Progress, ProgressView)
  • Changes were made to the config-files, so that server and clients work with blade12.hpc.fh-hagenberg.at
  • Lots of small changes and bugfixes
File size: 8.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using HeuristicLab.Hive.Slave.Common;
24using HeuristicLab.Hive.Contracts;
25using HeuristicLab.Hive.JobBase;
26using HeuristicLab.Persistence.Core;
27using HeuristicLab.Common;
28using System.Xml.Serialization;
29using HeuristicLab.Persistence.Default.Xml;
30using System.IO;
31using HeuristicLab.Core;
32using HeuristicLab.Hive.Contracts.BusinessObjects;
33using System.Collections.Generic;
34using System.Linq;
35
36namespace HeuristicLab.Hive.Slave.ExecutionEngine {
37  public class Executor : MarshalByRefObject, IDisposable {
38    public Guid JobId { get; set; }
39    public IJob Job { get; set; }
40    public MessageContainer.MessageType CurrentMessage { get; set; }
41
42    private Exception currentException;
43    public String CurrentException {
44      get {
45        if (currentException != null) {
46          return currentException.ToString();
47        } else {
48          return string.Empty;
49        }
50      }
51    }
52    public MessageQueue Queue { get; set; }
53
54    public bool JobIsFinished { get; set; }
55
56    public ExecutionState ExecutionState {
57      get {
58        return Job.ExecutionState;
59      }
60    }
61
62    public TimeSpan ExecutionTime {
63      get {
64        return Job.ExecutionTime;
65      }
66    }
67
68    public DateTime CreationTime { get; set; }
69
70    /// <summary>
71    ///
72    /// </summary>
73    /// <param name="serializedJob"></param>
74    /// <param name="collectChildJobs">if true, all child-jobs are downloaded and the job will be resumed.</param>
75    public void Start(byte[] serializedJob) {
76      try {
77        CreationTime = DateTime.Now;
78        Job = SerializedJob.Deserialize<IJob>(serializedJob);
79
80        RegisterJobEvents();
81
82        if (Job.CollectChildJobs) {
83          Queue.AddMessage(new MessageContainerWithCallback<SerializedJobList>(MessageContainer.MessageType.GetChildJobs, JobId,
84            (childjobs) => {
85              Job.Resume(childjobs.Select(j => SerializedJob.Deserialize<IJob>(j.SerializedJobData)));
86            }
87          ));
88        } else {
89          Job.Prepare();
90          Job.Start();
91        }
92      }
93      catch (OutOfMemoryException e) {
94        JobIsFinished = true;
95        this.currentException = e;
96        Queue.AddMessage(new MessageContainer(MessageContainer.MessageType.JobFailed, JobId));
97      }
98    }
99
100    public void StartOnlyJob() {
101      try {
102        Job.Start();
103      }
104      catch (OutOfMemoryException e) {
105        JobIsFinished = true;
106        this.currentException = e;
107        Queue.AddMessage(new MessageContainer(MessageContainer.MessageType.JobFailed, JobId));
108      }
109    }
110
111    public void Abort() {
112      CurrentMessage = MessageContainer.MessageType.AbortJob;
113      if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) {
114        Job.Stop();
115      }
116    }
117
118    private void RegisterJobEvents() {
119      Job.JobStopped += new EventHandler(Job_JobStopped);
120      Job.JobFailed += new EventHandler(Job_JobFailed);
121      Job.NewChildJob += new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
122      Job.WaitForChildJobs += new EventHandler(Job_WaitForChildJobs);
123      Job.DeleteChildJobs += new EventHandler(Job_DeleteChildJobs);
124    }
125
126    private void DeregisterJobEvents() {
127      Job.JobStopped -= new EventHandler(Job_JobStopped);
128      Job.JobFailed -= new EventHandler(Job_JobFailed);
129      Job.NewChildJob -= new EventHandler<EventArgs<IJob>>(Job_NewChildJob);
130      Job.WaitForChildJobs -= new EventHandler(Job_WaitForChildJobs);
131      Job.DeleteChildJobs -= new EventHandler(Job_DeleteChildJobs);
132    }
133
134    private void Job_NewChildJob(object sender, EventArgs<IJob> e) {
135      byte[] jobByteArray = SerializedJob.Serialize(e.Value);
136
137      SerializedJob serializedJob = new SerializedJob() {
138        JobInfo = new JobDto() {
139          State = JobState.Offline,
140          CoresNeeded = 1,
141          MemoryNeeded = 0,
142          PluginsNeeded = HivePluginInfoDto.FindPluginsNeeded(e.Value.GetType()),
143        },
144        SerializedJobData = jobByteArray
145      };
146
147      Queue.AddMessage(new MessageContainerWithJob(MessageContainer.MessageType.AddChildJob, this.JobId, serializedJob));
148    }
149
150    private void Job_WaitForChildJobs(object sender, EventArgs e) {
151      // Pause the job and send it back to the hive. The server will awake it when all child-jobs are finished
152      this.Job.CollectChildJobs = true;
153      byte[] jobByteArray = SerializedJob.Serialize(Job);
154
155      SerializedJob serializedJob = new SerializedJob() {
156        JobInfo = new JobDto() {
157          Id = JobId,
158          State = JobState.WaitForChildJobs
159        },
160        SerializedJobData = jobByteArray
161      };
162
163      JobIsFinished = true;
164      Queue.AddMessage(new MessageContainerWithJob(MessageContainer.MessageType.PauseJob, this.JobId, serializedJob));
165    }
166
167    private void Job_DeleteChildJobs(object sender, EventArgs e) {
168      Queue.AddMessage(new MessageContainer(MessageContainer.MessageType.DeleteChildJobs, this.JobId));
169    }
170
171    private void Job_JobFailed(object sender, EventArgs e) {
172      HeuristicLab.Common.EventArgs<Exception> ex = (HeuristicLab.Common.EventArgs<Exception>)e;
173      currentException = ex.Value;
174      Queue.AddMessage(new MessageContainer(MessageContainer.MessageType.FinishedJob, JobId));
175    }
176
177    private void Job_JobStopped(object sender, EventArgs e) {
178      if (CurrentMessage == MessageContainer.MessageType.NoMessage) {
179        Queue.AddMessage(new MessageContainer(MessageContainer.MessageType.FinishedJob, JobId));
180        JobIsFinished = true;
181      } else if (CurrentMessage == MessageContainer.MessageType.RequestSnapshot) {
182        Queue.AddMessage(new MessageContainer(MessageContainer.MessageType.SnapshotReady, JobId));
183      } else if (CurrentMessage == MessageContainer.MessageType.AbortJob) {
184        Queue.AddMessage(new MessageContainer(MessageContainer.MessageType.JobAborted, JobId));
185      }
186    }
187
188    public byte[] GetSnapshot() {
189      //if the job is still running, something went VERY bad.
190      if (Job.ExecutionState == Core.ExecutionState.Started) {
191        return null;
192      } else {
193        // Clear the Status message
194        CurrentMessage = MessageContainer.MessageType.NoMessage;
195        // Pack the whole job inside an xml document
196        byte[] job = SerializedJob.Serialize(Job);
197        // Restart the job
198        // Return the Snapshot
199        return job;
200      }
201    }
202
203    public byte[] GetFinishedJob() {
204      //Job isn't finished!
205      if (Job.ExecutionState == Core.ExecutionState.Started) {
206        throw new InvalidStateException("Job is still running");
207      } else {
208        byte[] jobArr = SerializedJob.Serialize(Job);
209        return jobArr;
210      }
211    }
212
213    public void RequestSnapshot() {
214      CurrentMessage = MessageContainer.MessageType.RequestSnapshot;
215      Job.Stop();
216    }
217
218    private void RestoreJobObject(byte[] sjob) {
219      Job = SerializedJob.Deserialize<IJob>(sjob);
220    }
221
222    public Executor() {
223      CurrentMessage = MessageContainer.MessageType.NoMessage;
224      JobIsFinished = false;
225    }
226
227    #region IDisposable Members
228
229    public void Dispose() {
230      DeregisterJobEvents();
231      Queue = null;
232      Job = null;
233    }
234
235    #endregion
236
237  }
238}
Note: See TracBrowser for help on using the repository browser.