Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/JobManager.cs @ 2117

Last change on this file since 2117 was 2117, checked in by svonolfe, 15 years ago

Streaming of Jobs and JobsResults directly from/to the DB (#680)

File size: 18.2 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.Interfaces;
27using HeuristicLab.Hive.Contracts.BusinessObjects;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Hive.Server.DataAccess;
30using HeuristicLab.Hive.Server.Core.InternalInterfaces;
31using HeuristicLab.DataAccess.Interfaces;
32using System.Data;
33using System.IO;
34
35namespace HeuristicLab.Hive.Server.Core {
36  class JobManager: IJobManager, IInternalJobManager {
37
38    ISessionFactory factory;
39    ILifecycleManager lifecycleManager;
40
41    #region IJobManager Members
42
43    public JobManager() {
44      factory = ServiceLocator.GetSessionFactory();
45      lifecycleManager = ServiceLocator.GetLifecycleManager();
46
47      lifecycleManager.RegisterStartup(new EventHandler(lifecycleManager_OnStartup));
48      lifecycleManager.RegisterStartup(new EventHandler(lifecycleManager_OnShutdown));
49    }
50
51    private JobResult GetLastJobResult(Guid jobId) {
52      ISession session = factory.GetSessionForCurrentThread();
53
54      try {
55        IJobResultsAdapter jobResultAdapter =
56            session.GetDataAdapter<JobResult, IJobResultsAdapter>();
57
58        return jobResultAdapter.GetLastResultOf(jobId);
59      }
60      finally {
61        if (session != null)
62          session.EndSession();
63      }
64    }
65
66    public void ResetJobsDependingOnResults(Job job) {
67      ISession session = factory.GetSessionForCurrentThread();
68      ITransaction tx = null;
69
70      try {
71        IJobAdapter jobAdapter =
72            session.GetDataAdapter<Job, IJobAdapter>();
73
74        IJobResultsAdapter jobResultsAdapter =
75          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
76
77        tx = session.BeginTransaction();
78
79        if (job != null) {
80          SerializedJob computableJob =
81              new SerializedJob();
82          computableJob.JobInfo =
83            job;
84
85          JobResult lastResult =
86            GetLastJobResult(job.Id);
87
88          if (lastResult != null) {
89            SerializedJobResult lastJobResult =
90              jobResultsAdapter.GetSerializedJobResult(lastResult.Id);
91
92            if (lastJobResult != null) {
93              computableJob.JobInfo.Percentage = lastJobResult.JobResult.Percentage;
94              computableJob.SerializedJobData = lastJobResult.SerializedJobResultData;
95
96              jobAdapter.UpdateSerializedJob(computableJob);
97            } else {
98              computableJob.JobInfo.Percentage = 0;
99            }
100          } else {
101            computableJob.JobInfo.Percentage = 0;
102          }
103
104          computableJob.JobInfo.Client = null;
105          computableJob.JobInfo.State = State.offline;
106
107          jobAdapter.Update(computableJob.JobInfo);
108        }
109
110        tx.Commit();
111      }
112      catch (Exception ex) {
113        if (tx != null)
114          tx.Rollback();
115        throw ex;
116      }
117      finally {
118        if (session != null)
119          session.EndSession();
120      }
121    }
122
123    void checkForDeadJobs() {
124       ISession session = factory.GetSessionForCurrentThread();
125
126       try {
127         IJobAdapter jobAdapter =
128             session.GetDataAdapter<Job, IJobAdapter>();
129
130         List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
131         foreach (Job curJob in allJobs) {
132           if (curJob.State == State.calculating) {
133             ResetJobsDependingOnResults(curJob);
134           }
135         }
136       }
137       finally {
138         if (session != null)
139           session.EndSession();
140       }
141    }
142
143    void lifecycleManager_OnStartup(object sender, EventArgs e) {
144      checkForDeadJobs();
145    }
146
147    void lifecycleManager_OnShutdown(object sender, EventArgs e) {
148      checkForDeadJobs();
149    }
150
151    /// <summary>
152    /// returns all jobs stored in the database
153    /// </summary>
154    /// <returns></returns>
155    public ResponseList<Job> GetAllJobs() {
156       ISession session = factory.GetSessionForCurrentThread();
157
158       try {
159         IJobAdapter jobAdapter =
160             session.GetDataAdapter<Job, IJobAdapter>();
161
162         ResponseList<Job> response = new ResponseList<Job>();
163
164         response.List = new List<Job>(jobAdapter.GetAll());
165         response.Success = true;
166         response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ALL_JOBS;
167
168         return response;
169       }
170       finally {
171         if (session != null)
172           session.EndSession();
173       }
174    }
175
176    /// <summary>
177    /// Gets the streamed job
178    /// </summary>
179    /// <param name="jobId"></param>
180    /// <returns></returns>
181    public Stream GetJobStreamById(Guid jobId) {
182      ISession session = factory.GetSessionForCurrentThread();
183      try {
184        IJobAdapter jobAdapter =
185          session.GetDataAdapter<Job, IJobAdapter>();
186
187        return jobAdapter.GetSerializedJobStream(jobId, false);
188      }
189      finally {
190        if (session != null)
191          session.EndSession();
192      }
193    }
194
195    /// <summary>
196    /// returns the job with the specified id
197    /// </summary>
198    /// <returns></returns>
199    public ResponseObject<Job> GetJobById(Guid jobId) {
200      ISession session = factory.GetSessionForCurrentThread();
201
202      try {
203        IJobAdapter jobAdapter =
204            session.GetDataAdapter<Job, IJobAdapter>();
205
206        ResponseObject<Job> response = new ResponseObject<Job>();
207
208        response.Obj = jobAdapter.GetById(jobId);
209        if (response.Obj != null) {
210          response.Success = true;
211          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_GET_JOB_BY_ID;
212        } else {
213          response.Success = false;
214          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
215        }
216
217        return response;
218      }
219      finally {
220        if (session != null)
221          session.EndSession();
222      }
223    }
224
225    /// <summary>
226    /// Adds a new job into the database
227    /// </summary>
228    /// <param name="job"></param>
229    /// <returns></returns>
230    public ResponseObject<Job> AddNewJob(SerializedJob job) {
231      ISession session = factory.GetSessionForCurrentThread();
232
233      try {
234        IJobAdapter jobAdapter =
235            session.GetDataAdapter<Job, IJobAdapter>();
236
237        ResponseObject<Job> response = new ResponseObject<Job>();
238
239        if (job != null && job.JobInfo != null) {
240          if (job.JobInfo.State != State.offline) {
241            response.Success = false;
242            response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOBSTATE_MUST_BE_OFFLINE;
243            return response;
244          }
245          if (job.JobInfo.Id != Guid.Empty) {
246            response.Success = false;
247            response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ID_MUST_NOT_BE_SET;
248            return response;
249          }
250          if (job.SerializedJobData == null) {
251            response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL;
252            response.Success = false;
253            return response;
254          }
255
256          job.JobInfo.DateCreated = DateTime.Now;
257          jobAdapter.UpdateSerializedJob(job);
258          response.Success = true;
259          response.Obj = job.JobInfo;
260          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_ADDED;
261        } else {
262          response.Success = false;
263          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL;
264        }
265
266        return response;
267      }
268      finally {
269        if (session != null)
270          session.EndSession();
271      }
272    }
273
274    /// <summary>
275    /// Removes a job from the database
276    /// </summary>
277    /// <param name="jobId"></param>
278    /// <returns></returns>
279    public Response RemoveJob(Guid jobId) {
280      ISession session = factory.GetSessionForCurrentThread();
281
282      try {
283        IJobAdapter jobAdapter =
284            session.GetDataAdapter<Job, IJobAdapter>();
285        Response response = new Response();
286
287        Job job = jobAdapter.GetById(jobId);
288        if (job == null) {
289          response.Success = false;
290          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
291          return response;
292        }
293        jobAdapter.Delete(job);
294        response.Success = false;
295        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_REMOVED;
296
297        return response;
298      }
299      finally {
300        if (session != null)
301          session.EndSession();
302      }
303    }
304
305    public ResponseObject<JobResult> GetLastJobResultOf(Guid jobId) {
306       ResponseObject<JobResult> result =
307        new ResponseObject<JobResult>();
308
309       result.Obj =
310         GetLastJobResult(jobId);
311       result.Success =
312         result.Obj != null;
313
314       return result;
315    }
316
317    public ResponseObject<SerializedJobResult>
318      GetLastSerializedJobResultOf(Guid jobId, bool requested) {
319      ISession session = factory.GetSessionForCurrentThread();
320
321      ITransaction tx = null;
322
323      try {
324        IJobAdapter jobAdapter =
325            session.GetDataAdapter<Job, IJobAdapter>();
326
327        IJobResultsAdapter jobResultsAdapter =
328          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
329
330        tx = session.BeginTransaction();
331
332        ResponseObject<SerializedJobResult> response =
333          new ResponseObject<SerializedJobResult>();
334
335        Job job = jobAdapter.GetById(jobId);
336        if (requested && (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent)) {
337          response.Success = true;
338          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE;
339
340          tx.Commit();
341         
342          return response;
343        }
344
345        JobResult lastResult =
346          jobResultsAdapter.GetLastResultOf(job.Id);
347
348        if (lastResult != null) {
349          response.Success = true;
350          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_RESULT_SENT;
351          response.Obj =
352            jobResultsAdapter.GetSerializedJobResult(
353              lastResult.Id);         
354        } else {
355          response.Success = false;
356        }
357
358        tx.Commit();
359        return response;
360      }
361      catch (Exception ex) {
362        if (tx != null)
363          tx.Rollback();
364        throw ex;
365      }
366      finally {
367        if (session != null)
368          session.EndSession();
369      }
370    }
371
372
373    public Response RequestSnapshot(Guid jobId) {
374      ISession session = factory.GetSessionForCurrentThread();
375      Response response = new Response();
376     
377      try {
378        IJobAdapter jobAdapter = session.GetDataAdapter<Job, IJobAdapter>();
379
380        Job job = jobAdapter.GetById(jobId);
381        if (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent) {
382          response.Success = true;
383          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_ALLREADY_SET;
384          return response; // no commit needed
385        }
386        if (job.State != State.calculating) {
387          response.Success = false;
388          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
389          return response; // no commit needed
390        }
391        // job is in correct state
392        job.State = State.requestSnapshot;
393        jobAdapter.Update(job);
394
395        response.Success = true;
396        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_SET;
397
398        return response;
399      }
400      finally {
401        if (session != null)
402          session.EndSession();
403      }
404    }
405
406    public Response AbortJob(Guid jobId) {
407      ISession session = factory.GetSessionForCurrentThread();
408      Response response = new Response();
409
410      try {
411        IJobAdapter jobAdapter = session.GetDataAdapter<Job, IJobAdapter>();
412
413        Job job = jobAdapter.GetById(jobId);
414        if (job == null) {
415          response.Success = false;
416          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
417          return response; // no commit needed
418        }
419        if (job.State == State.abort) {
420          response.Success = true;
421          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_ALLREADY_SET;
422          return response; // no commit needed
423        }
424        if (job.State != State.calculating && job.State != State.requestSnapshot && job.State != State.requestSnapshotSent) {
425          response.Success = false;
426          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
427          return response; // no commit needed
428        }
429        // job is in correct state
430        job.State = State.abort;
431        jobAdapter.Update(job);
432
433        response.Success = true;
434        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_SET;
435
436        return response;
437      }
438      finally {
439        if (session != null)
440          session.EndSession();
441      }
442    }
443
444    public ResponseList<JobResult> GetAllJobResults(Guid jobId) {
445      ISession session = factory.GetSessionForCurrentThread();
446      ResponseList<JobResult> response = new ResponseList<JobResult>();
447
448      try {
449        IJobResultsAdapter jobResultAdapter =
450            session.GetDataAdapter<JobResult, IJobResultsAdapter>();
451        IJobAdapter jobAdapter = session.GetDataAdapter<Job, IJobAdapter>();
452
453        Job job = jobAdapter.GetById(jobId);
454        if (job == null) {
455          response.Success = false;
456          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
457          return response;
458        }
459        response.List = new List<JobResult>(jobResultAdapter.GetResultsOf(job.Id));
460        response.Success = true;
461        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_RESULT_SENT;
462
463        return response;
464      }
465      finally {
466        if(session != null)
467          session.EndSession();
468      }
469    }
470
471    public ResponseList<Project> GetAllProjects() {
472      ISession session = factory.GetSessionForCurrentThread();
473      ResponseList<Project> response = new ResponseList<Project>();
474
475      try {
476        IProjectAdapter projectAdapter =
477          session.GetDataAdapter<Project, IProjectAdapter>();
478
479        List<Project> allProjects = new List<Project>(projectAdapter.GetAll());
480        response.List = allProjects;
481        response.Success = true;
482        return response;
483      }
484      finally {
485        if (session != null)
486          session.EndSession();
487      }
488    }
489
490    private Response createUpdateProject(Project project) {
491      ISession session = factory.GetSessionForCurrentThread();
492      Response response = new Response();
493      ITransaction tx = null;
494
495      try {
496        IProjectAdapter projectAdapter =
497          session.GetDataAdapter<Project, IProjectAdapter>();
498
499        if (project.Name == null || project.Name == "") {
500          response.Success = false;
501          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_PROJECT_NAME_EMPTY;
502          return response;
503        }
504        tx = session.BeginTransaction();
505        projectAdapter.Update(project);
506
507        tx.Commit();
508        response.Success = true;
509        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_PROJECT_ADDED;
510      } catch (ConstraintException ce) {
511        if (tx != null)
512          tx.Rollback();
513        response.Success = false;
514        response.StatusMessage = ce.Message;
515      }
516      catch (Exception ex) {
517        if (tx != null)
518          tx.Rollback();
519        throw ex;
520      }
521      finally {
522        if (session != null)
523          session.EndSession();
524      }
525      return response;
526    }
527
528    public Response CreateProject(Project project) {
529      return createUpdateProject(project);
530    }
531
532    public Response ChangeProject(Project project) {
533      return createUpdateProject(project);
534    }
535
536    public Response DeleteProject(Guid projectId) {
537      ISession session = factory.GetSessionForCurrentThread();
538      Response response = new Response();
539      ITransaction tx = null;
540
541      try {
542        IProjectAdapter projectAdapter =
543          session.GetDataAdapter<Project, IProjectAdapter>();
544
545        Project project = projectAdapter.GetById(projectId);
546        if (project == null) {
547          response.Success = false;
548          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_PROJECT_DOESNT_EXIST;
549          return response;
550        }
551        projectAdapter.Delete(project);
552        tx.Commit();
553        response.Success = true;
554        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_PROJECT_DELETED;
555      }
556      catch (Exception e) {
557        if (tx != null)
558          tx.Rollback();
559        response.Success = false;
560        response.StatusMessage = e.Message;
561      }
562      finally {
563        if (session != null)
564          session.EndSession();
565      }
566      return response;
567    }
568
569    public ResponseList<Job> GetJobsByProject(Guid projectId) {
570      ISession session = factory.GetSessionForCurrentThread();
571      ResponseList<Job> response = new ResponseList<Job>();
572
573      try {
574        IJobAdapter jobAdapter =
575          session.GetDataAdapter<Job, IJobAdapter>();
576        List<Job> jobsByProject = new List<Job>(jobAdapter.GetJobsByProject(projectId));
577        response.List = jobsByProject;
578        response.Success = true;
579      }
580      finally {
581        if (session != null)
582          session.EndSession();
583      }
584      return response;
585    }
586
587    #endregion
588  }
589}
Note: See TracBrowser for help on using the repository browser.