- Timestamp:
- 06/19/10 09:17:24 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/3.2/sources/HeuristicLab.Hive.Server.Core/3.2/JobManager.cs
r3578 r3931 1 1 #region License Information 2 2 3 /* HeuristicLab 3 4 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL) … … 18 19 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>. 19 20 */ 21 20 22 #endregion 21 23 … … 38 40 39 41 namespace HeuristicLab.Hive.Server.Core { 40 class JobManager: IJobManager, IInternalJobManager { 41 42 internal class JobManager : IJobManager, IInternalJobManager { 42 43 //ISessionFactory factory; 43 ILifecycleManager lifecycleManager;44 private ILifecycleManager lifecycleManager; 44 45 45 46 #region IJobManager Members … … 53 54 } 54 55 55 private JobDto GetLastJobResult(Guid jobId) { 56 private JobDto GetLastJobResult(Guid jobId) { 56 57 return DaoLocator.JobDao.FindById(jobId); 57 58 } 58 59 59 public void ResetJobsDependingOnResults(JobDto job) { 60 Logger.Info("Setting job " + job.Id + " offline"); 61 if (job != null) { 62 DaoLocator.JobDao.SetJobOffline(job); 63 } 64 } 65 66 67 void checkForDeadJobs() { 60 61 private void checkForDeadJobs() { 68 62 Logger.Info("Searching for dead Jobs"); 69 using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) { 63 using ( 64 TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, 65 new TransactionOptions 66 {IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE})) { 70 67 List<JobDto> allJobs = new List<JobDto>(DaoLocator.JobDao.FindAll()); 71 68 foreach (JobDto curJob in allJobs) { 72 if (curJob.State != State.calculating ) {73 ResetJobsDependingOnResults(curJob);69 if (curJob.State != State.calculating && curJob.State != State.finished) { 70 DaoLocator.JobDao.SetJobOffline(curJob); 74 71 } 75 72 } … … 79 76 } 80 77 81 void lifecycleManager_OnStartup(object sender, EventArgs e) { 78 private void lifecycleManager_OnStartup(object sender, EventArgs e) { 79 Logger.Info("Startup Event Fired, Checking DB for consistency"); 82 80 checkForDeadJobs(); 83 } 84 85 void lifecycleManager_OnShutdown(object sender, EventArgs e) { 81 Logger.Info("Startup Event Done"); 82 } 83 84 private void lifecycleManager_OnShutdown(object sender, EventArgs e) { 85 Logger.Info("Startup Event Fired, Checking DB for consistency"); 86 86 checkForDeadJobs(); 87 Logger.Info("Startup Event Done"); 87 88 } 88 89 … … 92 93 /// <returns></returns> 93 94 public ResponseList<JobDto> GetAllJobs() { 94 95 96 97 98 99 100 95 ResponseList<JobDto> response = new ResponseList<JobDto>(); 96 97 response.List = new List<JobDto>(DaoLocator.JobDao.FindAll()); 98 response.Success = true; 99 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ALL_JOBS; 100 101 return response; 101 102 } 102 103 … … 105 106 response.List = new List<JobDto>(DaoLocator.JobDao.FindWithLimitations(jobState, offset, count)); 106 107 return response; 107 } 108 } 108 109 109 110 /// <summary> … … 112 113 /// <param name="jobId"></param> 113 114 /// <returns></returns> 114 public Stream GetJobStreamById(Guid jobId) { 115 public Stream GetJobStreamById(Guid jobId) { 115 116 return DaoLocator.JobDao.GetSerializedJobStream(jobId); 116 117 117 } 118 118 … … 122 122 /// <returns></returns> 123 123 public ResponseObject<JobDto> GetJobById(Guid jobId) { 124 124 ResponseObject<JobDto> response = new ResponseObject<JobDto>(); 125 125 126 126 response.Obj = DaoLocator.JobDao.FindById(jobId); 127 if (response.Obj != null) { 128 response.Success = true; 129 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_GET_JOB_BY_ID; 130 } else { 131 response.Success = false; 132 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST; 133 } 134 135 return response; 136 } 127 if (response.Obj != null) { 128 response.Success = true; 129 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_GET_JOB_BY_ID; 130 } 131 else { 132 response.Success = false; 133 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST; 134 } 135 136 return response; 137 } 137 138 138 139 public ResponseObject<JobDto> GetJobByIdWithDetails(Guid jobId) { … … 144 145 145 146 job.Obj.Client = DaoLocator.ClientDao.GetClientForJob(jobId); 146 } else { 147 } 148 else { 147 149 job.Success = false; 148 150 job.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST; 149 } 151 } 150 152 return job; 151 153 } … … 154 156 IClientGroupDao cgd = DaoLocator.ClientGroupDao; 155 157 foreach (string res in resources) { 156 foreach (ClientGroupDto cg in cgd.FindByName(res)) {158 foreach (ClientGroupDto cg in cgd.FindByName(res)) { 157 159 job.JobInfo.AssignedResourceIds.Add(cg.Id); 158 160 } … … 167 169 /// <returns></returns> 168 170 public ResponseObject<JobDto> AddNewJob(SerializedJob job) { 169 ResponseObject<JobDto> response = new ResponseObject<JobDto>(); 170 171 if (job != null && job.JobInfo != null) { 172 if (job.JobInfo.State != State.offline) { 173 response.Success = false; 174 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOBSTATE_MUST_BE_OFFLINE; 175 return response; 176 } 177 if (job.JobInfo.Id != Guid.Empty) { 178 response.Success = false; 179 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ID_MUST_NOT_BE_SET; 180 return response; 181 } 182 if (job.SerializedJobData == null) { 183 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL; 184 response.Success = false; 185 return response; 186 } 187 188 job.JobInfo.DateCreated = DateTime.Now; 189 DaoLocator.JobDao.InsertWithAttachedJob(job); 190 DaoLocator.PluginInfoDao.InsertPluginDependenciesForJob(job.JobInfo); 191 192 response.Success = true; 193 response.Obj = job.JobInfo; 194 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_ADDED; 195 } else { 171 ResponseObject<JobDto> response = new ResponseObject<JobDto>(); 172 173 if (job != null && job.JobInfo != null) { 174 if (job.JobInfo.State != State.offline) { 196 175 response.Success = false; 176 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOBSTATE_MUST_BE_OFFLINE; 177 return response; 178 } 179 if (job.JobInfo.Id != Guid.Empty) { 180 response.Success = false; 181 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ID_MUST_NOT_BE_SET; 182 return response; 183 } 184 if (job.SerializedJobData == null) { 197 185 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL; 198 } 199 186 response.Success = false; 187 return response; 188 } 189 190 job.JobInfo.DateCreated = DateTime.Now; 191 DaoLocator.JobDao.InsertWithAttachedJob(job); 192 DaoLocator.PluginInfoDao.InsertPluginDependenciesForJob(job.JobInfo); 193 194 response.Success = true; 195 response.Obj = job.JobInfo; 196 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_ADDED; 197 } 198 else { 199 response.Success = false; 200 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL; 201 } 202 203 return response; 204 } 205 206 /// <summary> 207 /// Removes a job from the database 208 /// </summary> 209 /// <param name="jobId"></param> 210 /// <returns></returns> 211 public Response RemoveJob(Guid jobId) { 212 Response response = new Response(); 213 214 JobDto job = DaoLocator.JobDao.FindById(jobId); 215 if (job == null) { 216 response.Success = false; 217 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST; 200 218 return response; 201 219 } 202 /* finally { 203 if (session != null) 204 session.EndSession(); 205 } 206 } */ 207 208 /// <summary> 209 /// Removes a job from the database 210 /// </summary> 211 /// <param name="jobId"></param> 212 /// <returns></returns> 213 public Response RemoveJob(Guid jobId) { 214 Response response = new Response(); 215 216 JobDto job = DaoLocator.JobDao.FindById(jobId); 217 if (job == null) { 218 response.Success = false; 219 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST; 220 return response; 221 } 222 DaoLocator.JobDao.Delete(job); 223 response.Success = false; 224 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_REMOVED; 225 226 return response; 227 } 220 DaoLocator.JobDao.Delete(job); 221 response.Success = false; 222 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_REMOVED; 223 224 return response; 225 } 228 226 229 227 public ResponseObject<JobDto> GetLastJobResultOf(Guid jobId) { … … 231 229 new ResponseObject<JobDto>(); 232 230 233 234 235 236 237 238 231 result.Obj = 232 GetLastJobResult(jobId); 233 result.Success = 234 result.Obj != null; 235 236 return result; 239 237 } 240 238 241 239 public ResponseObject<SerializedJob> 242 240 GetLastSerializedJobResultOf(Guid jobId, bool requested) { 243 244 ResponseObject<SerializedJob> response = 245 new ResponseObject<SerializedJob>(); 241 ResponseObject<SerializedJob> response = 242 new ResponseObject<SerializedJob>(); 246 243 247 244 JobDto job = DaoLocator.JobDao.FindById(jobId); 248 if (requested && (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent)) { 249 response.Success = true; 250 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE; 251 252 //tx.Commit(); 253 254 return response; 255 } 256 257 /*JobResult lastResult = 258 jobResultsAdapter.GetLastResultOf(job.Id);*/ 259 260 //if (lastResult != null) { 261 response.Success = true; 262 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_RESULT_SENT; 263 response.Obj = new SerializedJob(); 264 response.Obj.JobInfo = job; 265 response.Obj.SerializedJobData = 266 DaoLocator.JobDao.GetBinaryJobFile(jobId); 267 //} else { 268 // response.Success = false; 269 //} 270 271 //tx.Commit(); 245 if (requested && (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent)) { 246 response.Success = true; 247 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE; 248 272 249 return response; 273 250 } 274 /*catch (Exception ex) { 275 if (tx != null) 276 tx.Rollback(); 277 throw ex; 278 } 279 finally { 280 if (session != null) 281 session.EndSession(); 282 } 283 } */ 251 252 response.Success = true; 253 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_RESULT_SENT; 254 response.Obj = new SerializedJob(); 255 response.Obj.JobInfo = job; 256 response.Obj.SerializedJobData = 257 DaoLocator.JobDao.GetBinaryJobFile(jobId); 258 return response; 259 } 284 260 285 261 286 262 public Response RequestSnapshot(Guid jobId) { 287 // ISession session = factory.GetSessionForCurrentThread();288 263 Response response = new Response(); 289 290 /* try { 291 IJobAdapter jobAdapter = session.GetDataAdapter<JobDto, IJobAdapter>();*/ 292 293 JobDto job = DaoLocator.JobDao.FindById(jobId); 294 if (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent) { 295 response.Success = true; 296 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_ALLREADY_SET; 297 return response; // no commit needed 298 } 299 if (job.State != State.calculating) { 300 response.Success = false; 301 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED; 302 return response; // no commit needed 303 } 304 // job is in correct state 305 job.State = State.requestSnapshot; 306 DaoLocator.JobDao.Update(job); 307 308 response.Success = true; 309 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_SET; 310 264 265 266 JobDto job = DaoLocator.JobDao.FindById(jobId); 267 if (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent) { 268 response.Success = true; 269 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_ALLREADY_SET; 311 270 return response; 312 271 } 313 /* finally { 314 if (session != null) 315 session.EndSession(); 316 } 317 } */ 272 if (job.State != State.calculating) { 273 response.Success = false; 274 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED; 275 return response; 276 } 277 // job is in correct state 278 job.State = State.requestSnapshot; 279 DaoLocator.JobDao.Update(job); 280 281 response.Success = true; 282 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_SET; 283 284 return response; 285 } 318 286 319 287 public Response AbortJob(Guid jobId) { 320 //ISession session = factory.GetSessionForCurrentThread();321 288 Response response = new Response(); 322 289 323 /* try {324 IJobAdapter jobAdapter = session.GetDataAdapter<JobDto, IJobAdapter>();*/325 326 // JobDto job = jobAdapter.GetById(jobId);327 290 JobDto job = DaoLocator.JobDao.FindById(jobId); 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 291 if (job == null) { 292 response.Success = false; 293 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST; 294 return response; // no commit needed 295 } 296 if (job.State == State.abort) { 297 response.Success = true; 298 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_ALLREADY_SET; 299 return response; // no commit needed 300 } 301 if (job.State != State.calculating && job.State != State.requestSnapshot && job.State != State.requestSnapshotSent) { 302 response.Success = false; 303 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED; 304 return response; // no commit needed 305 } 306 // job is in correct state 307 job.State = State.abort; 345 308 DaoLocator.JobDao.Update(job); 346 309 347 response.Success = true; 348 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_SET; 349 350 return response; 351 } 352 /*finally { 353 if (session != null) 354 session.EndSession(); 355 } 356 } */ 310 response.Success = true; 311 response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_SET; 312 313 return response; 314 } 357 315 358 316 public ResponseList<JobResult> GetAllJobResults(Guid jobId) { … … 381 339 382 340 public ResponseList<JobDto> GetJobsByProject(Guid projectId) { 383 return null; 384 } 341 return null; 342 } 343 344 public byte[] GetSerializedJobDataById(Guid jobId) { 345 return DaoLocator.JobDao.GetBinaryJobFile(jobId); 346 } 347 348 public void SetSerializedJobDataById(Guid jobId, byte[] data) { 349 DaoLocator.JobDao.SetBinaryJobFile(jobId, data); 350 } 351 385 352 #endregion 386 353 }
Note: See TracChangeset
for help on using the changeset viewer.