Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/JobManager.cs @ 4171

Last change on this file since 4171 was 4170, checked in by cneumuel, 14 years ago

refactoring of Result-Polling of HiveExperiment, polling is now much faster and code is cleaner (1092#)

File size: 13.2 KB
Line 
1#region License Information
2
3/* HeuristicLab
4 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
5 *
6 * This file is part of HeuristicLab.
7 *
8 * HeuristicLab is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
12 *
13 * HeuristicLab is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
20 */
21
22#endregion
23
24using System;
25using System.Collections.Generic;
26using System.Linq;
27using System.Text;
28using HeuristicLab.Hive.Contracts.Interfaces;
29using HeuristicLab.Hive.Contracts.BusinessObjects;
30using HeuristicLab.Hive.Contracts;
31using HeuristicLab.Hive.Server.DataAccess;
32using HeuristicLab.Hive.Server.Core.InternalInterfaces;
33using HeuristicLab.DataAccess.Interfaces;
34using System.Data;
35using System.IO;
36using HeuristicLab.Tracing;
37using System.Transactions;
38using HeuristicLab.Hive.Server.LINQDataAccess;
39using IsolationLevel = System.Transactions.IsolationLevel;
40
41namespace HeuristicLab.Hive.Server.Core {
42  internal class JobManager : IJobManager, IInternalJobManager {
43    //ISessionFactory factory;
44    private ILifecycleManager lifecycleManager;
45
46    #region IJobManager Members
47
48    public JobManager() {
49      //factory = ServiceLocator.GetSessionFactory();
50      lifecycleManager = ServiceLocator.GetLifecycleManager();
51
52      lifecycleManager.RegisterStartup(new EventHandler(lifecycleManager_OnStartup));
53      lifecycleManager.RegisterStartup(new EventHandler(lifecycleManager_OnShutdown));
54    }
55
56    private JobDto GetLastJobResult(Guid jobId) {
57      return DaoLocator.JobDao.FindById(jobId);
58    }
59
60    private void CheckForDeadJobs() {
61      Logger.Info("Searching for dead Jobs");
62
63      // [chn] why is transaction management done here?
64      using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
65        List<JobDto> allJobs = new List<JobDto>(DaoLocator.JobDao.FindAll());
66        foreach (JobDto curJob in allJobs) {
67          if (curJob.State != State.Calculating && curJob.State != State.Finished) {
68            DaoLocator.JobDao.SetJobOffline(curJob);
69          }
70        }
71        scope.Complete();
72      }
73      // DaoLocator.DestroyContext();
74    }
75
76    private void lifecycleManager_OnStartup(object sender, EventArgs e) {
77      Logger.Info("Startup Event Fired, Checking DB for consistency");
78      CheckForDeadJobs();
79      Logger.Info("Startup Event Done");
80    }
81
82    private void lifecycleManager_OnShutdown(object sender, EventArgs e) {
83      Logger.Info("Startup Event Fired, Checking DB for consistency");
84      CheckForDeadJobs();
85      Logger.Info("Startup Event Done");
86    }
87
88    /// <summary>
89    /// returns all jobs stored in the database
90    /// </summary>
91    /// <returns></returns>
92    public ResponseList<JobDto> GetAllJobs() {
93      ResponseList<JobDto> response = new ResponseList<JobDto>();
94
95      response.List = new List<JobDto>(DaoLocator.JobDao.FindAll());
96      response.Success = true;
97      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ALL_JOBS;
98
99      return response;
100    }
101
102    public ResponseList<JobDto> GetAllJobsWithFilter(State jobState, int offset, int count) {
103      ResponseList<JobDto> response = new ResponseList<JobDto>();
104      response.List = new List<JobDto>(DaoLocator.JobDao.FindWithLimitations(jobState, offset, count));
105      response.Success = true;
106      return response;
107    }
108
109    /// <summary>
110    /// Gets the streamed job
111    /// </summary>
112    /// <param name="jobId"></param>
113    /// <returns></returns>
114    public Stream GetJobStreamById(Guid jobId) {
115      return DaoLocator.JobDao.GetSerializedJobStream(jobId);
116    }
117
118    /// <summary>
119    /// returns the job with the specified id
120    /// </summary>
121    /// <returns></returns>
122    public ResponseObject<JobDto> GetJobById(Guid jobId) {
123      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
124
125      response.Obj = DaoLocator.JobDao.FindById(jobId);
126      if (response.Obj != null) {
127        response.Success = true;
128        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_GET_JOB_BY_ID;
129      } else {
130        response.Success = false;
131        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
132      }
133
134      return response;
135    }
136
137    /// <summary>
138    /// Returns JobDto object with Client-Object attached
139    /// </summary>
140    public ResponseObject<JobDto> GetJobByIdWithDetails(Guid jobId) {
141      ResponseObject<JobDto> job = new ResponseObject<JobDto>();
142      job.Obj = DaoLocator.JobDao.FindById(jobId);
143      if (job.Obj != null) {
144        job.Success = true;
145        job.StatusMessage = ApplicationConstants.RESPONSE_JOB_GET_JOB_BY_ID;
146
147        job.Obj.Client = DaoLocator.ClientDao.GetClientForJob(jobId);
148      } else {
149        job.Success = false;
150        job.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
151      }
152      return job;
153    }
154
155    public ResponseObject<JobDto> AddJobWithGroupStrings(SerializedJob job, IEnumerable<string> resources) {
156      IClientGroupDao cgd = DaoLocator.ClientGroupDao;
157      foreach (string res in resources) {
158        foreach (ClientGroupDto cg in cgd.FindByName(res)) {
159          job.JobInfo.AssignedResourceIds.Add(cg.Id);
160        }
161      }
162      return AddNewJob(job);
163    }
164
165    /// <summary>
166    /// Adds a new job into the database
167    /// </summary>
168    /// <param name="job"></param>
169    /// <returns></returns>
170    public ResponseObject<JobDto> AddNewJob(SerializedJob job) {
171      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
172
173      if (job != null && job.JobInfo != null) {
174        if (job.JobInfo.State != State.Offline) {
175          response.Success = false;
176          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOBSTATE_MUST_BE_OFFLINE;
177          return response;
178        }
179        if (job.JobInfo.Id != Guid.Empty) {
180          response.Success = false;
181          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ID_MUST_NOT_BE_SET;
182          return response;
183        }
184        if (job.SerializedJobData == null) {
185          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL;
186          response.Success = false;
187          return response;
188        }
189
190        job.JobInfo.DateCreated = DateTime.Now;
191        DaoLocator.JobDao.InsertWithAttachedJob(job);
192        DaoLocator.PluginInfoDao.InsertPluginDependenciesForJob(job.JobInfo);
193
194        response.Success = true;
195        response.Obj = job.JobInfo;
196        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_ADDED;
197      } else {
198        response.Success = false;
199        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL;
200      }
201
202      return response;
203    }
204
205    /// <summary>
206    /// Removes a job from the database
207    /// </summary>
208    /// <param name="jobId"></param>
209    /// <returns></returns>
210    public Response RemoveJob(Guid jobId) {
211      Response response = new Response();
212
213      JobDto job = DaoLocator.JobDao.FindById(jobId);
214      if (job == null) {
215        response.Success = false;
216        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
217        return response;
218      }
219      DaoLocator.JobDao.Delete(job);
220      response.Success = false;
221      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_REMOVED;
222
223      return response;
224    }
225
226    public ResponseObject<JobDto> GetLastJobResultOf(Guid jobId) {
227      ResponseObject<JobDto> result = new ResponseObject<JobDto>();
228
229      result.Obj = GetLastJobResult(jobId);
230      result.Success = result.Obj != null;
231
232      return result;
233    }
234
235    // [chn] [refactor] why does this method handle 3 different cases? wouldn't 3 methods be easier?
236    //Requested means: there MUST be a job result which gets sent back
237    public ResponseObject<SerializedJob> GetLastSerializedJobResultOf(Guid jobId, bool requested, bool snapshot) {
238      ResponseObject<SerializedJob> response = new ResponseObject<SerializedJob>();
239
240      JobDto job = DaoLocator.JobDao.FindById(jobId);
241
242      //if it's a snapshot but the result hasn't reached the server yet...
243      if (snapshot && (job.State == State.RequestSnapshot || job.State == State.RequestSnapshotSent)) {
244        response.Success = true;
245        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE;
246
247        return response;
248      }
249
250      //if it's NOT a snapshot, NEITHER request NOR is it finished
251      if (!requested && !snapshot && job.State != State.Finished) {
252        response.Success = true;
253        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE;
254
255        return response;
256      }
257
258      //every other case - snapshot, job finished or it's requested
259      response.Success = true;
260      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_RESULT_SENT;
261      response.Obj = new SerializedJob();
262      response.Obj.JobInfo = job;
263
264      response.Obj.SerializedJobData = DaoLocator.JobDao.GetBinaryJobFile(jobId);
265
266      return response;
267    }
268
269    public Response RequestSnapshot(Guid jobId) {
270      Response response = new Response();
271
272      JobDto job = DaoLocator.JobDao.FindById(jobId);
273      if (job.State == State.RequestSnapshot || job.State == State.RequestSnapshotSent) {
274        response.Success = true;
275        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_ALLREADY_SET;
276        return response;
277      }
278      if (job.State != State.Calculating) {
279        response.Success = false;
280        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
281        return response;
282      }
283      // job is in correct state
284      job.State = State.RequestSnapshot;
285      DaoLocator.JobDao.Update(job);
286
287      response.Success = true;
288      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_SET;
289
290      return response;
291    }
292
293    public Response AbortJob(Guid jobId) {
294      Response response = new Response();
295
296      JobDto job = DaoLocator.JobDao.FindById(jobId);
297      if (job == null) {
298        response.Success = false;
299        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
300        return response; // no commit needed
301      }
302      if (job.State == State.Abort) {
303        response.Success = true;
304        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_ALLREADY_SET;
305        return response; // no commit needed
306      }
307      if (job.State != State.Calculating && job.State != State.RequestSnapshot && job.State != State.RequestSnapshotSent) {
308        response.Success = false;
309        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
310        return response; // no commit needed
311      }
312      // job is in correct state
313      job.State = State.Abort;
314      DaoLocator.JobDao.Update(job);
315
316      response.Success = true;
317      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_SET;
318
319      return response;
320    }
321
322    public ResponseObject<JobResultList> GetAllJobResults(IEnumerable<Guid> jobIds) {
323      ResponseObject<JobResultList> response = new ResponseObject<JobResultList>();
324      JobResultList jobResultList = new JobResultList();
325      IEnumerable<JobDto> jobs = DaoLocator.JobDao.FindJobsById(jobIds);
326      foreach (JobDto job in jobs) {
327        jobResultList.Add(new JobResult() {
328          JobId = job.Id,
329          State = job.State,
330          DateCalculated = job.DateCalculated,
331          DateFinished = job.DateFinished,
332          Exception = job.Exception,
333          Percentage = job.Percentage
334        });       
335      }
336      response.Obj = jobResultList;
337      response.Success = true;
338      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ALL_JOBS;
339      return response;
340    }
341
342    public ResponseList<ProjectDto> GetAllProjects() {
343      return null;
344    }
345
346    private Response createUpdateProject(ProjectDto project) {
347      return null;
348    }
349
350    public Response CreateProject(ProjectDto project) {
351      return createUpdateProject(project);
352    }
353
354    public Response ChangeProject(ProjectDto project) {
355      return createUpdateProject(project);
356    }
357
358    public Response DeleteProject(Guid projectId) {
359      return null;
360    }
361
362    public ResponseList<JobDto> GetJobsByProject(Guid projectId) {
363      return null;
364    }
365
366    public byte[] GetSerializedJobDataById(Guid jobId) {
367      return DaoLocator.JobDao.GetBinaryJobFile(jobId);
368    }
369
370    public void SetSerializedJobDataById(Guid jobId, byte[] data) {
371      DaoLocator.JobDao.SetBinaryJobFile(jobId, data);
372    }
373
374    #endregion
375
376  }
377}
Note: See TracBrowser for help on using the repository browser.