Changeset 6725 for branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.3/Executor.cs
- Timestamp:
- 09/08/11 16:38:28 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.3/Executor.cs
r6721 r6725 29 29 namespace HeuristicLab.Clients.Hive.SlaveCore { 30 30 /// <summary> 31 /// The executor runs in the appdomain and handles the execution of an Hive job.31 /// The executor runs in the appdomain and handles the execution of an Hive task. 32 32 /// </summary> 33 33 public class Executor : MarshalByRefObject, IDisposable { 34 private bool was JobAborted = false;34 private bool wasTaskAborted = false; 35 35 private AutoResetEvent pauseStopSem = new AutoResetEvent(false); 36 private AutoResetEvent start JobSem = new AutoResetEvent(false); // block start method call37 private AutoResetEvent jobStartedSem = new AutoResetEvent(false); // make pause or stop wait until start is finished36 private AutoResetEvent startTaskSem = new AutoResetEvent(false); // block start method call 37 private AutoResetEvent taskStartedSem = new AutoResetEvent(false); // make pause or stop wait until start is finished 38 38 private ExecutorQueue executorQueue; 39 private bool jobDataInvalid = false; // if true, the jobdata is not sent when the jobis failed40 private I Job job;39 private bool taskDataInvalid = false; // if true, the jobdata is not sent when the task is failed 40 private ITask task; 41 41 private DateTime creationTime; 42 42 43 public Guid JobId { get; set; }43 public Guid TaskId { get; set; } 44 44 public int CoresNeeded { get; set; } 45 45 public int MemoryNeeded { get; set; } … … 63 63 64 64 private ExecutionState ExecutionState { 65 get { return job != null ? job.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped; }65 get { return task != null ? task.ExecutionState : HeuristicLab.Core.ExecutionState.Stopped; } 66 66 } 67 67 68 68 public TimeSpan ExecutionTime { 69 get { return job != null ? job.ExecutionTime : new TimeSpan(0, 0, 0); }69 get { return task != null ? task.ExecutionTime : new TimeSpan(0, 0, 0); } 70 70 } 71 71 … … 79 79 try { 80 80 creationTime = DateTime.Now; 81 job = PersistenceUtil.Deserialize<IJob>(serializedJob);81 task = PersistenceUtil.Deserialize<ITask>(serializedJob); 82 82 83 83 RegisterJobEvents(); 84 84 85 job.Start();86 if (!start JobSem.WaitOne(Settings.Default.ExecutorSemTimeouts)) {87 jobDataInvalid = true;88 throw new TimeoutException("Timeout when starting the job. JobStarted event was not fired.");85 task.Start(); 86 if (!startTaskSem.WaitOne(Settings.Default.ExecutorSemTimeouts)) { 87 taskDataInvalid = true; 88 throw new TimeoutException("Timeout when starting the task. TaskStarted event was not fired."); 89 89 } 90 90 } 91 91 catch (Exception e) { 92 92 this.CurrentException = e; 93 Job_JobFailed(this, new EventArgs<Exception>(e));93 Task_TaskFailed(this, new EventArgs<Exception>(e)); 94 94 } 95 95 finally { 96 jobStartedSem.Set();96 taskStartedSem.Set(); 97 97 } 98 98 } … … 100 100 public void Pause() { 101 101 IsPausing = true; 102 // wait until jobis started. if this does not happen, the Task is null an we give up103 jobStartedSem.WaitOne(Settings.Default.ExecutorSemTimeouts);104 if ( job== null) {105 CurrentException = new Exception("Pausing job " + this.JobId + ": Task is null");102 // wait until task is started. if this does not happen, the Task is null an we give up 103 taskStartedSem.WaitOne(Settings.Default.ExecutorSemTimeouts); 104 if (task == null) { 105 CurrentException = new Exception("Pausing task " + this.TaskId + ": Task is null"); 106 106 executorQueue.AddMessage(ExecutorMessageType.ExceptionOccured); 107 107 return; 108 108 } 109 109 110 if ( job.ExecutionState == ExecutionState.Started) {110 if (task.ExecutionState == ExecutionState.Started) { 111 111 try { 112 job.Pause();112 task.Pause(); 113 113 //we need to block the pause... 114 114 pauseStopSem.WaitOne(); 115 115 } 116 116 catch (Exception ex) { 117 CurrentException = new Exception("Error pausing job " + this.JobId + ": " + ex.ToString());117 CurrentException = new Exception("Error pausing task " + this.TaskId + ": " + ex.ToString()); 118 118 executorQueue.AddMessage(ExecutorMessageType.ExceptionOccured); 119 119 } … … 123 123 public void Stop() { 124 124 IsStopping = true; 125 // wait until jobis started. if this does not happen, the Task is null an we give up126 jobStartedSem.WaitOne(Settings.Default.ExecutorSemTimeouts);127 if ( job== null) {128 CurrentException = new Exception("Stopping job " + this.JobId + ": Task is null");125 // wait until task is started. if this does not happen, the Task is null an we give up 126 taskStartedSem.WaitOne(Settings.Default.ExecutorSemTimeouts); 127 if (task == null) { 128 CurrentException = new Exception("Stopping task " + this.TaskId + ": Task is null"); 129 129 executorQueue.AddMessage(ExecutorMessageType.ExceptionOccured); 130 130 } 131 was JobAborted = true;131 wasTaskAborted = true; 132 132 133 133 if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) { 134 134 try { 135 job.Stop();135 task.Stop(); 136 136 pauseStopSem.WaitOne(); 137 137 } 138 138 catch (Exception ex) { 139 CurrentException = new Exception("Error stopping job " + this.JobId + ": " + ex.ToString());139 CurrentException = new Exception("Error stopping task " + this.TaskId + ": " + ex.ToString()); 140 140 executorQueue.AddMessage(ExecutorMessageType.ExceptionOccured); 141 141 } … … 144 144 145 145 private void RegisterJobEvents() { 146 job.JobStopped += new EventHandler(Job_JobStopped);147 job.JobFailed += new EventHandler(Job_JobFailed);148 job.JobPaused += new EventHandler(Job_JobPaused);149 job.JobStarted += new EventHandler(Job_JobStarted);146 task.TaskStopped += new EventHandler(Task_TaskStopped); 147 task.TaskFailed += new EventHandler(Task_TaskFailed); 148 task.TaskPaused += new EventHandler(Task_TaskPaused); 149 task.TaskStarted += new EventHandler(Task_TaskStarted); 150 150 } 151 151 152 152 private void DeregisterJobEvents() { 153 job.JobStopped -= new EventHandler(Job_JobStopped);154 job.JobFailed -= new EventHandler(Job_JobFailed);155 job.JobPaused -= new EventHandler(Job_JobPaused);156 job.JobStarted -= new EventHandler(Job_JobStarted);153 task.TaskStopped -= new EventHandler(Task_TaskStopped); 154 task.TaskFailed -= new EventHandler(Task_TaskFailed); 155 task.TaskPaused -= new EventHandler(Task_TaskPaused); 156 task.TaskStarted -= new EventHandler(Task_TaskStarted); 157 157 } 158 158 159 159 #region Task Events 160 private void Job_JobFailed(object sender, EventArgs e) {160 private void Task_TaskFailed(object sender, EventArgs e) { 161 161 IsStopping = true; 162 162 EventArgs<Exception> ex = (EventArgs<Exception>)e; 163 163 CurrentException = ex.Value; 164 executorQueue.AddMessage(ExecutorMessageType. JobFailed);165 } 166 167 private void Job_JobStopped(object sender, EventArgs e) {164 executorQueue.AddMessage(ExecutorMessageType.TaskFailed); 165 } 166 167 private void Task_TaskStopped(object sender, EventArgs e) { 168 168 IsStopping = true; 169 if (was JobAborted) {169 if (wasTaskAborted) { 170 170 pauseStopSem.Set(); 171 171 } 172 executorQueue.AddMessage(ExecutorMessageType. JobStopped);173 } 174 175 private void Job_JobPaused(object sender, EventArgs e) {172 executorQueue.AddMessage(ExecutorMessageType.TaskStopped); 173 } 174 175 private void Task_TaskPaused(object sender, EventArgs e) { 176 176 IsPausing = true; 177 177 pauseStopSem.Set(); 178 executorQueue.AddMessage(ExecutorMessageType. JobPaused);179 } 180 181 private void Job_JobStarted(object sender, EventArgs e) {182 start JobSem.Set();183 executorQueue.AddMessage(ExecutorMessageType. JobStarted);178 executorQueue.AddMessage(ExecutorMessageType.TaskPaused); 179 } 180 181 private void Task_TaskStarted(object sender, EventArgs e) { 182 startTaskSem.Set(); 183 executorQueue.AddMessage(ExecutorMessageType.TaskStarted); 184 184 } 185 185 #endregion 186 186 187 public TaskData Get JobData() {188 if ( jobDataInvalid) return null;189 190 if ( job.ExecutionState == ExecutionState.Started) {187 public TaskData GetTaskData() { 188 if (taskDataInvalid) return null; 189 190 if (task.ExecutionState == ExecutionState.Started) { 191 191 throw new InvalidStateException("Task is still running"); 192 192 } else { 193 TaskData jobData = new TaskData();194 if ( job== null) {195 //send empty joband save exception196 jobData.Data = PersistenceUtil.Serialize(new TaskData());193 TaskData taskData = new TaskData(); 194 if (task == null) { 195 //send empty task and save exception 196 taskData.Data = PersistenceUtil.Serialize(new TaskData()); 197 197 if (CurrentException == null) { 198 CurrentException = new Exception("Task with id " + this. JobId + " is null, sending empty job");198 CurrentException = new Exception("Task with id " + this.TaskId + " is null, sending empty task"); 199 199 } 200 200 } else { 201 jobData.Data = PersistenceUtil.Serialize(job);202 } 203 jobData.TaskId = JobId;204 return jobData;201 taskData.Data = PersistenceUtil.Serialize(task); 202 } 203 taskData.TaskId = TaskId; 204 return taskData; 205 205 } 206 206 } 207 207 208 208 public void Dispose() { 209 if ( job!= null)209 if (task != null) 210 210 DeregisterJobEvents(); 211 job= null;211 task = null; 212 212 } 213 213 }
Note: See TracChangeset
for help on using the changeset viewer.