Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/JobManager.cs @ 2211

Last change on this file since 2211 was 2117, checked in by svonolfe, 15 years ago

Streaming of Jobs and JobsResults directly from/to the DB (#680)

File size: 18.2 KB
RevLine 
[907]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
[800]23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.Interfaces;
27using HeuristicLab.Hive.Contracts.BusinessObjects;
[902]28using HeuristicLab.Hive.Contracts;
[1377]29using HeuristicLab.Hive.Server.DataAccess;
[1141]30using HeuristicLab.Hive.Server.Core.InternalInterfaces;
[1468]31using HeuristicLab.DataAccess.Interfaces;
[1948]32using System.Data;
[2117]33using System.IO;
[800]34
35namespace HeuristicLab.Hive.Server.Core {
[1141]36  class JobManager: IJobManager, IInternalJobManager {
[820]37
[1468]38    ISessionFactory factory;
[1133]39    ILifecycleManager lifecycleManager;
[820]40
[800]41    #region IJobManager Members
42
[820]43    public JobManager() {
[1468]44      factory = ServiceLocator.GetSessionFactory();
[1133]45      lifecycleManager = ServiceLocator.GetLifecycleManager();
46
47      lifecycleManager.RegisterStartup(new EventHandler(lifecycleManager_OnStartup));
48      lifecycleManager.RegisterStartup(new EventHandler(lifecycleManager_OnShutdown));
[820]49    }
50
[2099]51    private JobResult GetLastJobResult(Guid jobId) {
[1468]52      ISession session = factory.GetSessionForCurrentThread();
53
54      try {
55        IJobResultsAdapter jobResultAdapter =
56            session.GetDataAdapter<JobResult, IJobResultsAdapter>();
57
[2099]58        return jobResultAdapter.GetLastResultOf(jobId);
[1139]59      }
[1468]60      finally {
61        if (session != null)
62          session.EndSession();
63      }
[1170]64    }
65
66    public void ResetJobsDependingOnResults(Job job) {
[1468]67      ISession session = factory.GetSessionForCurrentThread();
[2082]68      ITransaction tx = null;
[1160]69
[1468]70      try {
71        IJobAdapter jobAdapter =
72            session.GetDataAdapter<Job, IJobAdapter>();
[1160]73
[2099]74        IJobResultsAdapter jobResultsAdapter =
75          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
76
[2082]77        tx = session.BeginTransaction();
78
79        if (job != null) {
[2092]80          SerializedJob computableJob =
81              new SerializedJob();
[2082]82          computableJob.JobInfo =
83            job;
84
[2099]85          JobResult lastResult =
86            GetLastJobResult(job.Id);
[2082]87
[2099]88          if (lastResult != null) {
89            SerializedJobResult lastJobResult =
90              jobResultsAdapter.GetSerializedJobResult(lastResult.Id);
91
92            if (lastJobResult != null) {
93              computableJob.JobInfo.Percentage = lastJobResult.JobResult.Percentage;
94              computableJob.SerializedJobData = lastJobResult.SerializedJobResultData;
95
96              jobAdapter.UpdateSerializedJob(computableJob);
97            } else {
98              computableJob.JobInfo.Percentage = 0;
99            }
[2082]100          } else {
101            computableJob.JobInfo.Percentage = 0;
102          }
103
104          computableJob.JobInfo.Client = null;
105          computableJob.JobInfo.State = State.offline;
106
107          jobAdapter.Update(computableJob.JobInfo);
[1468]108        }
109
[2082]110        tx.Commit();
[1468]111      }
[2082]112      catch (Exception ex) {
113        if (tx != null)
114          tx.Rollback();
115        throw ex;
116      }
[1468]117      finally {
118        if (session != null)
119          session.EndSession();
120      }
[1139]121    }
122
[1133]123    void checkForDeadJobs() {
[1468]124       ISession session = factory.GetSessionForCurrentThread();
125
126       try {
127         IJobAdapter jobAdapter =
128             session.GetDataAdapter<Job, IJobAdapter>();
129
130         List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
131         foreach (Job curJob in allJobs) {
132           if (curJob.State == State.calculating) {
133             ResetJobsDependingOnResults(curJob);
134           }
135         }
136       }
137       finally {
138         if (session != null)
139           session.EndSession();
140       }
[1133]141    }
142
143    void lifecycleManager_OnStartup(object sender, EventArgs e) {
144      checkForDeadJobs();
145    }
146
147    void lifecycleManager_OnShutdown(object sender, EventArgs e) {
148      checkForDeadJobs();
149    }
150
[1121]151    /// <summary>
152    /// returns all jobs stored in the database
153    /// </summary>
154    /// <returns></returns>
[902]155    public ResponseList<Job> GetAllJobs() {
[1468]156       ISession session = factory.GetSessionForCurrentThread();
[967]157
[1468]158       try {
159         IJobAdapter jobAdapter =
160             session.GetDataAdapter<Job, IJobAdapter>();
161
162         ResponseList<Job> response = new ResponseList<Job>();
163
164         response.List = new List<Job>(jobAdapter.GetAll());
165         response.Success = true;
166         response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ALL_JOBS;
167
168         return response;
169       }
170       finally {
171         if (session != null)
172           session.EndSession();
173       }
[800]174    }
175
[1121]176    /// <summary>
[2117]177    /// Gets the streamed job
178    /// </summary>
179    /// <param name="jobId"></param>
180    /// <returns></returns>
181    public Stream GetJobStreamById(Guid jobId) {
182      ISession session = factory.GetSessionForCurrentThread();
183      try {
184        IJobAdapter jobAdapter =
185          session.GetDataAdapter<Job, IJobAdapter>();
186
187        return jobAdapter.GetSerializedJobStream(jobId, false);
188      }
189      finally {
190        if (session != null)
191          session.EndSession();
192      }
193    }
194
195    /// <summary>
[2005]196    /// returns the job with the specified id
197    /// </summary>
198    /// <returns></returns>
199    public ResponseObject<Job> GetJobById(Guid jobId) {
200      ISession session = factory.GetSessionForCurrentThread();
201
202      try {
203        IJobAdapter jobAdapter =
204            session.GetDataAdapter<Job, IJobAdapter>();
205
206        ResponseObject<Job> response = new ResponseObject<Job>();
207
208        response.Obj = jobAdapter.GetById(jobId);
209        if (response.Obj != null) {
210          response.Success = true;
211          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_GET_JOB_BY_ID;
212        } else {
213          response.Success = false;
214          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
215        }
216
217        return response;
218      }
219      finally {
220        if (session != null)
221          session.EndSession();
222      }
223    }
224
225    /// <summary>
[1121]226    /// Adds a new job into the database
227    /// </summary>
228    /// <param name="job"></param>
229    /// <returns></returns>
[2092]230    public ResponseObject<Job> AddNewJob(SerializedJob job) {
[1468]231      ISession session = factory.GetSessionForCurrentThread();
[967]232
[1468]233      try {
234        IJobAdapter jobAdapter =
235            session.GetDataAdapter<Job, IJobAdapter>();
236
237        ResponseObject<Job> response = new ResponseObject<Job>();
238
[2082]239        if (job != null && job.JobInfo != null) {
240          if (job.JobInfo.State != State.offline) {
[1468]241            response.Success = false;
242            response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOBSTATE_MUST_BE_OFFLINE;
243            return response;
244          }
[2082]245          if (job.JobInfo.Id != Guid.Empty) {
[1468]246            response.Success = false;
247            response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ID_MUST_NOT_BE_SET;
248            return response;
249          }
[2092]250          if (job.SerializedJobData == null) {
[1468]251            response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL;
252            response.Success = false;
253            return response;
254          }
255
[2082]256          job.JobInfo.DateCreated = DateTime.Now;
[2099]257          jobAdapter.UpdateSerializedJob(job);
[1468]258          response.Success = true;
[2082]259          response.Obj = job.JobInfo;
[1468]260          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_ADDED;
261        } else {
[1024]262          response.Success = false;
[1120]263          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL;
264        }
265
[1468]266        return response;
[967]267      }
[1468]268      finally {
269        if (session != null)
270          session.EndSession();
271      }
[967]272    }
273
[1121]274    /// <summary>
275    /// Removes a job from the database
276    /// </summary>
277    /// <param name="jobId"></param>
278    /// <returns></returns>
[1449]279    public Response RemoveJob(Guid jobId) {
[1468]280      ISession session = factory.GetSessionForCurrentThread();
[967]281
[1468]282      try {
283        IJobAdapter jobAdapter =
284            session.GetDataAdapter<Job, IJobAdapter>();
285        Response response = new Response();
286
287        Job job = jobAdapter.GetById(jobId);
288        if (job == null) {
289          response.Success = false;
290          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
291          return response;
292        }
293        jobAdapter.Delete(job);
[967]294        response.Success = false;
[1468]295        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_REMOVED;
296
[967]297        return response;
298      }
[1468]299      finally {
300        if (session != null)
301          session.EndSession();
302      }
[967]303    }
304
[2099]305    public ResponseObject<JobResult> GetLastJobResultOf(Guid jobId) {
306       ResponseObject<JobResult> result =
307        new ResponseObject<JobResult>();
308
309       result.Obj =
310         GetLastJobResult(jobId);
311       result.Success =
312         result.Obj != null;
313
314       return result;
315    }
316
317    public ResponseObject<SerializedJobResult>
318      GetLastSerializedJobResultOf(Guid jobId, bool requested) {
[1468]319      ISession session = factory.GetSessionForCurrentThread();
[1170]320
[2099]321      ITransaction tx = null;
322
[1468]323      try {
324        IJobAdapter jobAdapter =
325            session.GetDataAdapter<Job, IJobAdapter>();
326
[2099]327        IJobResultsAdapter jobResultsAdapter =
328          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
[1770]329
[2099]330        tx = session.BeginTransaction();
331
332        ResponseObject<SerializedJobResult> response =
333          new ResponseObject<SerializedJobResult>();
334
[1770]335        Job job = jobAdapter.GetById(jobId);
[1772]336        if (requested && (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent)) {
[1770]337          response.Success = true;
338          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE;
[2099]339
340          tx.Commit();
341         
[1770]342          return response;
343        }
[1468]344
[2099]345        JobResult lastResult =
346          jobResultsAdapter.GetLastResultOf(job.Id);
347
348        if (lastResult != null) {
349          response.Success = true;
350          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_RESULT_SENT;
351          response.Obj =
352            jobResultsAdapter.GetSerializedJobResult(
353              lastResult.Id);         
354        } else {
355          response.Success = false;
356        }
357
358        tx.Commit();
[1468]359        return response;
360      }
[2099]361      catch (Exception ex) {
362        if (tx != null)
363          tx.Rollback();
364        throw ex;
365      }
[1468]366      finally {
[2099]367        if (session != null)
[1468]368          session.EndSession();
369      }
[1170]370    }
371
[2099]372
[1509]373    public Response RequestSnapshot(Guid jobId) {
374      ISession session = factory.GetSessionForCurrentThread();
375      Response response = new Response();
376     
377      try {
[1577]378        IJobAdapter jobAdapter = session.GetDataAdapter<Job, IJobAdapter>();
[1509]379
[1577]380        Job job = jobAdapter.GetById(jobId);
[1831]381        if (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent) {
[1577]382          response.Success = true;
383          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_ALLREADY_SET;
384          return response; // no commit needed
385        }
386        if (job.State != State.calculating) {
387          response.Success = false;
388          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
389          return response; // no commit needed
390        }
391        // job is in correct state
392        job.State = State.requestSnapshot;
393        jobAdapter.Update(job);
394
395        response.Success = true;
396        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_SET;
397
[1509]398        return response;
399      }
400      finally {
401        if (session != null)
402          session.EndSession();
403      }
404    }
405
406    public Response AbortJob(Guid jobId) {
[1577]407      ISession session = factory.GetSessionForCurrentThread();
408      Response response = new Response();
409
410      try {
411        IJobAdapter jobAdapter = session.GetDataAdapter<Job, IJobAdapter>();
412
413        Job job = jobAdapter.GetById(jobId);
[1762]414        if (job == null) {
415          response.Success = false;
416          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
417          return response; // no commit needed
418        }
[1577]419        if (job.State == State.abort) {
420          response.Success = true;
421          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_ALLREADY_SET;
422          return response; // no commit needed
423        }
[1811]424        if (job.State != State.calculating && job.State != State.requestSnapshot && job.State != State.requestSnapshotSent) {
[1577]425          response.Success = false;
426          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
427          return response; // no commit needed
428        }
429        // job is in correct state
430        job.State = State.abort;
431        jobAdapter.Update(job);
432
433        response.Success = true;
434        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_SET;
435
436        return response;
437      }
438      finally {
439        if (session != null)
440          session.EndSession();
441      }
[1509]442    }
443
[1799]444    public ResponseList<JobResult> GetAllJobResults(Guid jobId) {
[1627]445      ISession session = factory.GetSessionForCurrentThread();
[1799]446      ResponseList<JobResult> response = new ResponseList<JobResult>();
[1627]447
448      try {
449        IJobResultsAdapter jobResultAdapter =
450            session.GetDataAdapter<JobResult, IJobResultsAdapter>();
451        IJobAdapter jobAdapter = session.GetDataAdapter<Job, IJobAdapter>();
452
453        Job job = jobAdapter.GetById(jobId);
454        if (job == null) {
455          response.Success = false;
456          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
457          return response;
458        }
[2099]459        response.List = new List<JobResult>(jobResultAdapter.GetResultsOf(job.Id));
[1627]460        response.Success = true;
461        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_RESULT_SENT;
462
463        return response;
464      }
465      finally {
466        if(session != null)
467          session.EndSession();
468      }
469    }
[1932]470
471    public ResponseList<Project> GetAllProjects() {
[1948]472      ISession session = factory.GetSessionForCurrentThread();
473      ResponseList<Project> response = new ResponseList<Project>();
[1934]474
[1948]475      try {
476        IProjectAdapter projectAdapter =
477          session.GetDataAdapter<Project, IProjectAdapter>();
[1934]478
[1948]479        List<Project> allProjects = new List<Project>(projectAdapter.GetAll());
480        response.List = allProjects;
481        response.Success = true;
482        return response;
483      }
484      finally {
485        if (session != null)
486          session.EndSession();
487      }
488    }
[1934]489
[1948]490    private Response createUpdateProject(Project project) {
491      ISession session = factory.GetSessionForCurrentThread();
492      Response response = new Response();
493      ITransaction tx = null;
494
495      try {
496        IProjectAdapter projectAdapter =
497          session.GetDataAdapter<Project, IProjectAdapter>();
498
499        if (project.Name == null || project.Name == "") {
500          response.Success = false;
501          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_PROJECT_NAME_EMPTY;
502          return response;
503        }
504        tx = session.BeginTransaction();
505        projectAdapter.Update(project);
506
507        tx.Commit();
508        response.Success = true;
509        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_PROJECT_ADDED;
510      } catch (ConstraintException ce) {
511        if (tx != null)
512          tx.Rollback();
513        response.Success = false;
514        response.StatusMessage = ce.Message;
515      }
516      catch (Exception ex) {
517        if (tx != null)
518          tx.Rollback();
519        throw ex;
520      }
521      finally {
522        if (session != null)
523          session.EndSession();
524      }
[1934]525      return response;
[1932]526    }
527
528    public Response CreateProject(Project project) {
[1948]529      return createUpdateProject(project);
[1932]530    }
531
532    public Response ChangeProject(Project project) {
[1948]533      return createUpdateProject(project);
[1932]534    }
535
536    public Response DeleteProject(Guid projectId) {
[1948]537      ISession session = factory.GetSessionForCurrentThread();
538      Response response = new Response();
539      ITransaction tx = null;
540
541      try {
542        IProjectAdapter projectAdapter =
543          session.GetDataAdapter<Project, IProjectAdapter>();
544
545        Project project = projectAdapter.GetById(projectId);
546        if (project == null) {
547          response.Success = false;
548          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_PROJECT_DOESNT_EXIST;
549          return response;
550        }
551        projectAdapter.Delete(project);
552        tx.Commit();
553        response.Success = true;
554        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_PROJECT_DELETED;
555      }
556      catch (Exception e) {
557        if (tx != null)
558          tx.Rollback();
559        response.Success = false;
560        response.StatusMessage = e.Message;
561      }
562      finally {
563        if (session != null)
564          session.EndSession();
565      }
566      return response;
[1932]567    }
568
569    public ResponseList<Job> GetJobsByProject(Guid projectId) {
[1948]570      ISession session = factory.GetSessionForCurrentThread();
571      ResponseList<Job> response = new ResponseList<Job>();
572
573      try {
574        IJobAdapter jobAdapter =
575          session.GetDataAdapter<Job, IJobAdapter>();
576        List<Job> jobsByProject = new List<Job>(jobAdapter.GetJobsByProject(projectId));
577        response.List = jobsByProject;
578        response.Success = true;
579      }
580      finally {
581        if (session != null)
582          session.EndSession();
583      }
584      return response;
[1932]585    }
586
[800]587    #endregion
588  }
589}
Note: See TracBrowser for help on using the repository browser.