Changeset 4423 for branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.ExecutionEngine
- Timestamp:
- 09/17/10 10:26:55 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.ExecutionEngine/3.3/Executor.cs
r4368 r4423 39 39 public IJob Job { get; set; } 40 40 public MessageContainer.MessageType CurrentMessage { get; set; } 41 public Exception CurrentException { get; set; } 41 42 private Exception currentException; 43 public String CurrentException { 44 get { 45 if (currentException != null) { 46 return currentException.ToString(); 47 } else { 48 return string.Empty; 49 } 50 } 51 } 42 52 public MessageQueue Queue { get; set; } 43 53 … … 50 60 } 51 61 52 public double Progress{62 public TimeSpan ExecutionTime { 53 63 get { 54 return Job. Progress;64 return Job.ExecutionTime; 55 65 } 56 66 } … … 58 68 public DateTime CreationTime { get; set; } 59 69 70 /// <summary> 71 /// 72 /// </summary> 73 /// <param name="serializedJob"></param> 74 /// <param name="collectChildJobs">if true, all child-jobs are downloaded and the job will be resumed.</param> 60 75 public void Start(byte[] serializedJob) { 61 CreationTime = DateTime.Now; 62 Job = DeserializeJobObject(serializedJob); 63 76 try { 77 CreationTime = DateTime.Now; 78 Job = SerializedJob.Deserialize<IJob>(serializedJob); 79 80 RegisterJobEvents(); 81 82 if (Job.CollectChildJobs) { 83 Queue.AddMessage(new MessageContainerWithCallback<SerializedJobList>(MessageContainer.MessageType.GetChildJobs, JobId, 84 (childjobs) => { 85 Job.Resume(childjobs.Select(j => SerializedJob.Deserialize<IJob>(j.SerializedJobData))); 86 } 87 )); 88 } else { 89 Job.Prepare(); 90 Job.Start(); 91 } 92 } 93 catch (OutOfMemoryException e) { 94 JobIsFinished = true; 95 this.currentException = e; 96 Queue.AddMessage(new MessageContainer(MessageContainer.MessageType.JobFailed, JobId)); 97 } 98 } 99 100 public void StartOnlyJob() { 101 try { 102 Job.Start(); 103 } 104 catch (OutOfMemoryException e) { 105 JobIsFinished = true; 106 this.currentException = e; 107 Queue.AddMessage(new MessageContainer(MessageContainer.MessageType.JobFailed, JobId)); 108 } 109 } 110 111 public void Abort() { 112 CurrentMessage = MessageContainer.MessageType.AbortJob; 113 if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) { 114 Job.Stop(); 115 } 116 } 117 118 private void RegisterJobEvents() { 64 119 Job.JobStopped += new EventHandler(Job_JobStopped); 65 120 Job.JobFailed += new EventHandler(Job_JobFailed); 66 121 Job.NewChildJob += new EventHandler<EventArgs<IJob>>(Job_NewChildJob); 67 122 Job.WaitForChildJobs += new EventHandler(Job_WaitForChildJobs); 68 69 if (Job.ExecutionState == Core.ExecutionState.Paused) { 70 Queue.AddMessage(new MessageContainerWithCallback<SerializedJobList>(MessageContainer.MessageType.GetChildJobs, JobId, 71 (childjobs) => { 72 Job.Resume(childjobs.Select(j => DeserializeJobObject(j.SerializedJobData))); 73 } 74 )); 75 } else { 76 Job.Prepare(); 77 Job.Start(); 78 } 79 } 80 81 void Job_NewChildJob(object sender, EventArgs<IJob> e) { 82 byte[] jobByteArray = SerializeJobObject(e.Value); 123 Job.DeleteChildJobs += new EventHandler(Job_DeleteChildJobs); 124 } 125 126 private void DeregisterJobEvents() { 127 Job.JobStopped -= new EventHandler(Job_JobStopped); 128 Job.JobFailed -= new EventHandler(Job_JobFailed); 129 Job.NewChildJob -= new EventHandler<EventArgs<IJob>>(Job_NewChildJob); 130 Job.WaitForChildJobs -= new EventHandler(Job_WaitForChildJobs); 131 Job.DeleteChildJobs -= new EventHandler(Job_DeleteChildJobs); 132 } 133 134 private void Job_NewChildJob(object sender, EventArgs<IJob> e) { 135 byte[] jobByteArray = SerializedJob.Serialize(e.Value); 83 136 84 137 SerializedJob serializedJob = new SerializedJob() { 85 138 JobInfo = new JobDto() { 86 139 State = JobState.Offline, 87 CoresNeeded = e.Value.CoresNeeded,88 MemoryNeeded = e.Value.MemoryNeeded,140 CoresNeeded = 1, 141 MemoryNeeded = 0, 89 142 PluginsNeeded = HivePluginInfoDto.FindPluginsNeeded(e.Value.GetType()), 90 143 }, … … 95 148 } 96 149 97 void Job_WaitForChildJobs(object sender, EventArgs e) {150 private void Job_WaitForChildJobs(object sender, EventArgs e) { 98 151 // Pause the job and send it back to the hive. The server will awake it when all child-jobs are finished 99 byte[] jobByteArray = SerializeJobObject(Job); 152 this.Job.CollectChildJobs = true; 153 byte[] jobByteArray = SerializedJob.Serialize(Job); 100 154 101 155 SerializedJob serializedJob = new SerializedJob() { … … 111 165 } 112 166 113 void Job_JobFailed(object sender, EventArgs e) { 167 private void Job_DeleteChildJobs(object sender, EventArgs e) { 168 Queue.AddMessage(new MessageContainer(MessageContainer.MessageType.DeleteChildJobs, this.JobId)); 169 } 170 171 private void Job_JobFailed(object sender, EventArgs e) { 114 172 HeuristicLab.Common.EventArgs<Exception> ex = (HeuristicLab.Common.EventArgs<Exception>)e; 115 CurrentException = ex.Value;173 currentException = ex.Value; 116 174 Queue.AddMessage(new MessageContainer(MessageContainer.MessageType.FinishedJob, JobId)); 117 175 } 118 176 119 public void StartOnlyJob() { 120 Job.Start(); 121 } 122 123 public void Abort() { 124 CurrentMessage = MessageContainer.MessageType.AbortJob; 125 if (Job.ExecutionState != Core.ExecutionState.Stopped) { 126 Job.Stop(); 127 } 128 } 129 130 void Job_JobStopped(object sender, EventArgs e) { 177 private void Job_JobStopped(object sender, EventArgs e) { 131 178 if (CurrentMessage == MessageContainer.MessageType.NoMessage) { 132 179 Queue.AddMessage(new MessageContainer(MessageContainer.MessageType.FinishedJob, JobId)); … … 147 194 CurrentMessage = MessageContainer.MessageType.NoMessage; 148 195 // Pack the whole job inside an xml document 149 byte[] job = Serialize JobObject(Job);196 byte[] job = SerializedJob.Serialize(Job); 150 197 // Restart the job 151 198 // Return the Snapshot … … 159 206 throw new InvalidStateException("Job is still running"); 160 207 } else { 161 byte[] jobArr = Serialize JobObject(Job);208 byte[] jobArr = SerializedJob.Serialize(Job); 162 209 return jobArr; 163 210 } … … 169 216 } 170 217 171 private byte[] SerializeJobObject(IJob job) {172 MemoryStream memStream = new MemoryStream();173 XmlGenerator.Serialize(job, memStream);174 byte[] jobByteArray = memStream.ToArray();175 memStream.Dispose();176 return jobByteArray;177 }178 179 private IJob DeserializeJobObject(byte[] sjob) {180 MemoryStream memStream = new MemoryStream(sjob);181 IJob job = XmlParser.Deserialize<IJob>(memStream);182 memStream.Dispose();183 return job;184 }185 186 218 private void RestoreJobObject(byte[] sjob) { 187 Job = DeserializeJobObject(sjob);219 Job = SerializedJob.Deserialize<IJob>(sjob); 188 220 } 189 221 … … 191 223 CurrentMessage = MessageContainer.MessageType.NoMessage; 192 224 JobIsFinished = false; 193 //Job = new TestJob();194 225 } 195 226 … … 197 228 198 229 public void Dispose() { 199 Job.JobFailed -= new EventHandler(Job_JobFailed); 200 Job.JobFailed -= new EventHandler(Job_JobStopped); 230 DeregisterJobEvents(); 201 231 Queue = null; 202 232 Job = null; … … 204 234 205 235 #endregion 236 206 237 } 207 238 }
Note: See TracChangeset
for help on using the changeset viewer.