Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/JobManager.cs @ 2642

Last change on this file since 2642 was 2608, checked in by kgrading, 14 years ago

changed a ton of logging, changed minor job handling and changed the DB IP to localhost (#828)

File size: 18.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.Interfaces;
27using HeuristicLab.Hive.Contracts.BusinessObjects;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Hive.Server.DataAccess;
30using HeuristicLab.Hive.Server.Core.InternalInterfaces;
31using HeuristicLab.DataAccess.Interfaces;
32using System.Data;
33using System.IO;
34using HeuristicLab.Tracing;
35
36namespace HeuristicLab.Hive.Server.Core {
37  class JobManager: IJobManager, IInternalJobManager {
38
39    ISessionFactory factory;
40    ILifecycleManager lifecycleManager;
41
42    #region IJobManager Members
43
44    public JobManager() {
45      factory = ServiceLocator.GetSessionFactory();
46      lifecycleManager = ServiceLocator.GetLifecycleManager();
47
48      lifecycleManager.RegisterStartup(new EventHandler(lifecycleManager_OnStartup));
49      lifecycleManager.RegisterStartup(new EventHandler(lifecycleManager_OnShutdown));
50    }
51
52    private JobResult GetLastJobResult(Guid jobId) {
53      ISession session = factory.GetSessionForCurrentThread();
54
55      try {
56        IJobResultsAdapter jobResultAdapter =
57            session.GetDataAdapter<JobResult, IJobResultsAdapter>();
58
59        return jobResultAdapter.GetLastResultOf(jobId);
60      }
61      finally {
62        if (session != null)
63          session.EndSession();
64      }
65    }
66
67    public void ResetJobsDependingOnResults(Job job) {
68
69      HiveLogger.Info(this.ToString() + ": Setting job " + job.Id + " offline");
70
71      ISession session = factory.GetSessionForCurrentThread();
72      ITransaction tx = null;
73
74      try {
75        IJobAdapter jobAdapter =
76            session.GetDataAdapter<Job, IJobAdapter>();
77
78        IJobResultsAdapter jobResultsAdapter =
79          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
80
81        tx = session.BeginTransaction();
82
83        if (job != null) {
84          SerializedJob computableJob =
85              new SerializedJob();
86          computableJob.JobInfo =
87            job;
88
89          JobResult lastResult =
90            GetLastJobResult(job.Id);
91
92          if (lastResult != null) {
93            SerializedJobResult lastJobResult =
94              jobResultsAdapter.GetSerializedJobResult(lastResult.Id);
95
96            if (lastJobResult != null) {
97              computableJob.JobInfo.Percentage = lastJobResult.JobResult.Percentage;
98              computableJob.SerializedJobData = lastJobResult.SerializedJobResultData;
99
100              jobAdapter.UpdateSerializedJob(computableJob);
101            } else {
102              computableJob.JobInfo.Percentage = 0;
103            }
104          } else {
105            computableJob.JobInfo.Percentage = 0;
106          }
107
108          computableJob.JobInfo.Client = null;
109          computableJob.JobInfo.State = State.offline;
110
111          jobAdapter.Update(computableJob.JobInfo);
112        }
113
114        tx.Commit();
115      }
116      catch (Exception ex) {
117        if (tx != null)
118          tx.Rollback();
119        throw ex;
120      }
121      finally {
122        if (session != null)
123          session.EndSession();
124      }
125    }
126
127    void checkForDeadJobs() {
128      HiveLogger.Info(this.ToString() + " Searching for dead Jobs");
129       ISession session = factory.GetSessionForCurrentThread();
130
131       try {
132         IJobAdapter jobAdapter =
133             session.GetDataAdapter<Job, IJobAdapter>();
134
135         List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
136         foreach (Job curJob in allJobs) {
137           if (curJob.State == State.calculating) {
138             ResetJobsDependingOnResults(curJob);
139           }
140         }
141       }
142       finally {
143         if (session != null)
144           session.EndSession();
145       }
146    }
147
148    void lifecycleManager_OnStartup(object sender, EventArgs e) {
149      checkForDeadJobs();
150    }
151
152    void lifecycleManager_OnShutdown(object sender, EventArgs e) {
153      checkForDeadJobs();
154    }
155
156    /// <summary>
157    /// returns all jobs stored in the database
158    /// </summary>
159    /// <returns></returns>
160    public ResponseList<Job> GetAllJobs() {
161       ISession session = factory.GetSessionForCurrentThread();
162
163       try {
164         IJobAdapter jobAdapter =
165             session.GetDataAdapter<Job, IJobAdapter>();
166
167         ResponseList<Job> response = new ResponseList<Job>();
168
169         response.List = new List<Job>(jobAdapter.GetAll());
170         response.Success = true;
171         response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ALL_JOBS;
172
173         return response;
174       }
175       finally {
176         if (session != null)
177           session.EndSession();
178       }
179    }
180
181    /// <summary>
182    /// Gets the streamed job
183    /// </summary>
184    /// <param name="jobId"></param>
185    /// <returns></returns>
186    public Stream GetJobStreamById(Guid jobId) {
187      ISession session = factory.GetSessionForCurrentThread();
188      try {
189        IJobAdapter jobAdapter =
190          session.GetDataAdapter<Job, IJobAdapter>();
191
192        return jobAdapter.GetSerializedJobStream(jobId, false);
193      }
194      finally {
195        if (session != null)
196          session.EndSession();
197      }
198    }
199
200    /// <summary>
201    /// returns the job with the specified id
202    /// </summary>
203    /// <returns></returns>
204    public ResponseObject<Job> GetJobById(Guid jobId) {
205      ISession session = factory.GetSessionForCurrentThread();
206
207      try {
208        IJobAdapter jobAdapter =
209            session.GetDataAdapter<Job, IJobAdapter>();
210
211        ResponseObject<Job> response = new ResponseObject<Job>();
212
213        response.Obj = jobAdapter.GetById(jobId);
214        if (response.Obj != null) {
215          response.Success = true;
216          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_GET_JOB_BY_ID;
217        } else {
218          response.Success = false;
219          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
220        }
221
222        return response;
223      }
224      finally {
225        if (session != null)
226          session.EndSession();
227      }
228    }
229
230    /// <summary>
231    /// Adds a new job into the database
232    /// </summary>
233    /// <param name="job"></param>
234    /// <returns></returns>
235    public ResponseObject<Job> AddNewJob(SerializedJob job) {
236      ISession session = factory.GetSessionForCurrentThread();
237
238      try {
239        IJobAdapter jobAdapter =
240            session.GetDataAdapter<Job, IJobAdapter>();
241
242        ResponseObject<Job> response = new ResponseObject<Job>();
243
244        if (job != null && job.JobInfo != null) {
245          if (job.JobInfo.State != State.offline) {
246            response.Success = false;
247            response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOBSTATE_MUST_BE_OFFLINE;
248            return response;
249          }
250          if (job.JobInfo.Id != Guid.Empty) {
251            response.Success = false;
252            response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ID_MUST_NOT_BE_SET;
253            return response;
254          }
255          if (job.SerializedJobData == null) {
256            response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL;
257            response.Success = false;
258            return response;
259          }
260
261          job.JobInfo.DateCreated = DateTime.Now;
262          jobAdapter.UpdateSerializedJob(job);
263          response.Success = true;
264          response.Obj = job.JobInfo;
265          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_ADDED;
266        } else {
267          response.Success = false;
268          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL;
269        }
270
271        return response;
272      }
273      finally {
274        if (session != null)
275          session.EndSession();
276      }
277    }
278
279    /// <summary>
280    /// Removes a job from the database
281    /// </summary>
282    /// <param name="jobId"></param>
283    /// <returns></returns>
284    public Response RemoveJob(Guid jobId) {
285      ISession session = factory.GetSessionForCurrentThread();
286
287      try {
288        IJobAdapter jobAdapter =
289            session.GetDataAdapter<Job, IJobAdapter>();
290        Response response = new Response();
291
292        Job job = jobAdapter.GetById(jobId);
293        if (job == null) {
294          response.Success = false;
295          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
296          return response;
297        }
298        jobAdapter.Delete(job);
299        response.Success = false;
300        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_REMOVED;
301
302        return response;
303      }
304      finally {
305        if (session != null)
306          session.EndSession();
307      }
308    }
309
310    public ResponseObject<JobResult> GetLastJobResultOf(Guid jobId) {
311       ResponseObject<JobResult> result =
312        new ResponseObject<JobResult>();
313
314       result.Obj =
315         GetLastJobResult(jobId);
316       result.Success =
317         result.Obj != null;
318
319       return result;
320    }
321
322    public ResponseObject<SerializedJobResult>
323      GetLastSerializedJobResultOf(Guid jobId, bool requested) {
324      ISession session = factory.GetSessionForCurrentThread();
325
326      ITransaction tx = null;
327
328      try {
329        IJobAdapter jobAdapter =
330            session.GetDataAdapter<Job, IJobAdapter>();
331
332        IJobResultsAdapter jobResultsAdapter =
333          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
334
335        tx = session.BeginTransaction();
336
337        ResponseObject<SerializedJobResult> response =
338          new ResponseObject<SerializedJobResult>();
339
340        Job job = jobAdapter.GetById(jobId);
341        if (requested && (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent)) {
342          response.Success = true;
343          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE;
344
345          tx.Commit();
346         
347          return response;
348        }
349
350        JobResult lastResult =
351          jobResultsAdapter.GetLastResultOf(job.Id);
352
353        if (lastResult != null) {
354          response.Success = true;
355          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_RESULT_SENT;
356          response.Obj =
357            jobResultsAdapter.GetSerializedJobResult(
358              lastResult.Id);         
359        } else {
360          response.Success = false;
361        }
362
363        tx.Commit();
364        return response;
365      }
366      catch (Exception ex) {
367        if (tx != null)
368          tx.Rollback();
369        throw ex;
370      }
371      finally {
372        if (session != null)
373          session.EndSession();
374      }
375    }
376
377
378    public Response RequestSnapshot(Guid jobId) {
379      ISession session = factory.GetSessionForCurrentThread();
380      Response response = new Response();
381     
382      try {
383        IJobAdapter jobAdapter = session.GetDataAdapter<Job, IJobAdapter>();
384
385        Job job = jobAdapter.GetById(jobId);
386        if (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent) {
387          response.Success = true;
388          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_ALLREADY_SET;
389          return response; // no commit needed
390        }
391        if (job.State != State.calculating) {
392          response.Success = false;
393          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
394          return response; // no commit needed
395        }
396        // job is in correct state
397        job.State = State.requestSnapshot;
398        jobAdapter.Update(job);
399
400        response.Success = true;
401        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_SET;
402
403        return response;
404      }
405      finally {
406        if (session != null)
407          session.EndSession();
408      }
409    }
410
411    public Response AbortJob(Guid jobId) {
412      ISession session = factory.GetSessionForCurrentThread();
413      Response response = new Response();
414
415      try {
416        IJobAdapter jobAdapter = session.GetDataAdapter<Job, IJobAdapter>();
417
418        Job job = jobAdapter.GetById(jobId);
419        if (job == null) {
420          response.Success = false;
421          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
422          return response; // no commit needed
423        }
424        if (job.State == State.abort) {
425          response.Success = true;
426          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_ALLREADY_SET;
427          return response; // no commit needed
428        }
429        if (job.State != State.calculating && job.State != State.requestSnapshot && job.State != State.requestSnapshotSent) {
430          response.Success = false;
431          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
432          return response; // no commit needed
433        }
434        // job is in correct state
435        job.State = State.abort;
436        jobAdapter.Update(job);
437
438        response.Success = true;
439        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_SET;
440
441        return response;
442      }
443      finally {
444        if (session != null)
445          session.EndSession();
446      }
447    }
448
449    public ResponseList<JobResult> GetAllJobResults(Guid jobId) {
450      ISession session = factory.GetSessionForCurrentThread();
451      ResponseList<JobResult> response = new ResponseList<JobResult>();
452
453      try {
454        IJobResultsAdapter jobResultAdapter =
455            session.GetDataAdapter<JobResult, IJobResultsAdapter>();
456        IJobAdapter jobAdapter = session.GetDataAdapter<Job, IJobAdapter>();
457
458        Job job = jobAdapter.GetById(jobId);
459        if (job == null) {
460          response.Success = false;
461          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
462          return response;
463        }
464        response.List = new List<JobResult>(jobResultAdapter.GetResultsOf(job.Id));
465        response.Success = true;
466        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_RESULT_SENT;
467
468        return response;
469      }
470      finally {
471        if(session != null)
472          session.EndSession();
473      }
474    }
475
476    public ResponseList<Project> GetAllProjects() {
477      ISession session = factory.GetSessionForCurrentThread();
478      ResponseList<Project> response = new ResponseList<Project>();
479
480      try {
481        IProjectAdapter projectAdapter =
482          session.GetDataAdapter<Project, IProjectAdapter>();
483
484        List<Project> allProjects = new List<Project>(projectAdapter.GetAll());
485        response.List = allProjects;
486        response.Success = true;
487        return response;
488      }
489      finally {
490        if (session != null)
491          session.EndSession();
492      }
493    }
494
495    private Response createUpdateProject(Project project) {
496      ISession session = factory.GetSessionForCurrentThread();
497      Response response = new Response();
498      ITransaction tx = null;
499
500      try {
501        IProjectAdapter projectAdapter =
502          session.GetDataAdapter<Project, IProjectAdapter>();
503
504        if (project.Name == null || project.Name == "") {
505          response.Success = false;
506          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_PROJECT_NAME_EMPTY;
507          return response;
508        }
509        tx = session.BeginTransaction();
510        projectAdapter.Update(project);
511
512        tx.Commit();
513        response.Success = true;
514        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_PROJECT_ADDED;
515      } catch (ConstraintException ce) {
516        if (tx != null)
517          tx.Rollback();
518        response.Success = false;
519        response.StatusMessage = ce.Message;
520      }
521      catch (Exception ex) {
522        if (tx != null)
523          tx.Rollback();
524        throw ex;
525      }
526      finally {
527        if (session != null)
528          session.EndSession();
529      }
530      return response;
531    }
532
533    public Response CreateProject(Project project) {
534      return createUpdateProject(project);
535    }
536
537    public Response ChangeProject(Project project) {
538      return createUpdateProject(project);
539    }
540
541    public Response DeleteProject(Guid projectId) {
542      ISession session = factory.GetSessionForCurrentThread();
543      Response response = new Response();
544      ITransaction tx = null;
545
546      try {
547        IProjectAdapter projectAdapter =
548          session.GetDataAdapter<Project, IProjectAdapter>();
549
550        Project project = projectAdapter.GetById(projectId);
551        if (project == null) {
552          response.Success = false;
553          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_PROJECT_DOESNT_EXIST;
554          return response;
555        }
556        projectAdapter.Delete(project);
557        tx.Commit();
558        response.Success = true;
559        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_PROJECT_DELETED;
560      }
561      catch (Exception e) {
562        if (tx != null)
563          tx.Rollback();
564        response.Success = false;
565        response.StatusMessage = e.Message;
566      }
567      finally {
568        if (session != null)
569          session.EndSession();
570      }
571      return response;
572    }
573
574    public ResponseList<Job> GetJobsByProject(Guid projectId) {
575      ISession session = factory.GetSessionForCurrentThread();
576      ResponseList<Job> response = new ResponseList<Job>();
577
578      try {
579        IJobAdapter jobAdapter =
580          session.GetDataAdapter<Job, IJobAdapter>();
581        List<Job> jobsByProject = new List<Job>(jobAdapter.GetJobsByProject(projectId));
582        response.List = jobsByProject;
583        response.Success = true;
584      }
585      finally {
586        if (session != null)
587          session.EndSession();
588      }
589      return response;
590    }
591
592    #endregion
593  }
594}
Note: See TracBrowser for help on using the repository browser.