Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/JobManager.cs @ 4107

Last change on this file since 4107 was 4107, checked in by cneumuel, 14 years ago

migration from 3.2 to 3.3 completed. Hive Server and Client are now executable and as functional as they were in 3.2. (#1096)

File size: 12.4 KB
Line 
1#region License Information
2
3/* HeuristicLab
4 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
5 *
6 * This file is part of HeuristicLab.
7 *
8 * HeuristicLab is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
12 *
13 * HeuristicLab is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
20 */
21
22#endregion
23
24using System;
25using System.Collections.Generic;
26using System.Linq;
27using System.Text;
28using HeuristicLab.Hive.Contracts.Interfaces;
29using HeuristicLab.Hive.Contracts.BusinessObjects;
30using HeuristicLab.Hive.Contracts;
31using HeuristicLab.Hive.Server.DataAccess;
32using HeuristicLab.Hive.Server.Core.InternalInterfaces;
33using HeuristicLab.DataAccess.Interfaces;
34using System.Data;
35using System.IO;
36using HeuristicLab.Tracing;
37using System.Transactions;
38using HeuristicLab.Hive.Server.LINQDataAccess;
39using IsolationLevel = System.Transactions.IsolationLevel;
40
41namespace HeuristicLab.Hive.Server.Core {
42  internal class JobManager : IJobManager, IInternalJobManager {
43    //ISessionFactory factory;
44    private ILifecycleManager lifecycleManager;
45
46    #region IJobManager Members
47
48    public JobManager() {
49      //factory = ServiceLocator.GetSessionFactory();
50      lifecycleManager = ServiceLocator.GetLifecycleManager();
51
52      lifecycleManager.RegisterStartup(new EventHandler(lifecycleManager_OnStartup));
53      lifecycleManager.RegisterStartup(new EventHandler(lifecycleManager_OnShutdown));
54    }
55
56    private JobDto GetLastJobResult(Guid jobId) {
57      return DaoLocator.JobDao.FindById(jobId);
58    }
59
60
61    private void CheckForDeadJobs() {
62      Logger.Info("Searching for dead Jobs");
63
64      // [chn] why is transaction management done here?
65      using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
66        List<JobDto> allJobs = new List<JobDto>(DaoLocator.JobDao.FindAll());
67        foreach (JobDto curJob in allJobs) {
68          if (curJob.State != State.calculating && curJob.State != State.finished) {
69            DaoLocator.JobDao.SetJobOffline(curJob);
70          }
71        }
72        scope.Complete();
73      }
74      // DaoLocator.DestroyContext();
75    }
76
77    private void lifecycleManager_OnStartup(object sender, EventArgs e) {
78      Logger.Info("Startup Event Fired, Checking DB for consistency");
79      CheckForDeadJobs();
80      Logger.Info("Startup Event Done");
81    }
82
83    private void lifecycleManager_OnShutdown(object sender, EventArgs e) {
84      Logger.Info("Startup Event Fired, Checking DB for consistency");
85      CheckForDeadJobs();
86      Logger.Info("Startup Event Done");
87    }
88
89    /// <summary>
90    /// returns all jobs stored in the database
91    /// </summary>
92    /// <returns></returns>
93    public ResponseList<JobDto> GetAllJobs() {
94      ResponseList<JobDto> response = new ResponseList<JobDto>();
95
96      response.List = new List<JobDto>(DaoLocator.JobDao.FindAll());
97      response.Success = true;
98      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ALL_JOBS;
99
100      return response;
101    }
102
103    public ResponseList<JobDto> GetAllJobsWithFilter(State jobState, int offset, int count) {
104      ResponseList<JobDto> response = new ResponseList<JobDto>();
105      response.List = new List<JobDto>(DaoLocator.JobDao.FindWithLimitations(jobState, offset, count));
106      response.Success = true;
107      return response;
108    }
109
110    /// <summary>
111    /// Gets the streamed job
112    /// </summary>
113    /// <param name="jobId"></param>
114    /// <returns></returns>
115    public Stream GetJobStreamById(Guid jobId) {
116      return DaoLocator.JobDao.GetSerializedJobStream(jobId);
117    }
118
119    /// <summary>
120    /// returns the job with the specified id
121    /// </summary>
122    /// <returns></returns>
123    public ResponseObject<JobDto> GetJobById(Guid jobId) {
124      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
125
126      response.Obj = DaoLocator.JobDao.FindById(jobId);
127      if (response.Obj != null) {
128        response.Success = true;
129        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_GET_JOB_BY_ID;
130      } else {
131        response.Success = false;
132        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
133      }
134
135      return response;
136    }
137
138    public ResponseObject<JobDto> GetJobByIdWithDetails(Guid jobId) {
139      ResponseObject<JobDto> job = new ResponseObject<JobDto>();
140      job.Obj = DaoLocator.JobDao.FindById(jobId);
141      if (job.Obj != null) {
142        job.Success = true;
143        job.StatusMessage = ApplicationConstants.RESPONSE_JOB_GET_JOB_BY_ID;
144
145        job.Obj.Client = DaoLocator.ClientDao.GetClientForJob(jobId);
146      } else {
147        job.Success = false;
148        job.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
149      }
150      return job;
151    }
152
153    public ResponseObject<JobDto> AddJobWithGroupStrings(SerializedJob job, IEnumerable<string> resources) {
154      IClientGroupDao cgd = DaoLocator.ClientGroupDao;
155      foreach (string res in resources) {
156        foreach (ClientGroupDto cg in cgd.FindByName(res)) {
157          job.JobInfo.AssignedResourceIds.Add(cg.Id);
158        }
159      }
160      return AddNewJob(job);
161    }
162
163    /// <summary>
164    /// Adds a new job into the database
165    /// </summary>
166    /// <param name="job"></param>
167    /// <returns></returns>
168    public ResponseObject<JobDto> AddNewJob(SerializedJob job) {
169      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
170
171      if (job != null && job.JobInfo != null) {
172        if (job.JobInfo.State != State.offline) {
173          response.Success = false;
174          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOBSTATE_MUST_BE_OFFLINE;
175          return response;
176        }
177        if (job.JobInfo.Id != Guid.Empty) {
178          response.Success = false;
179          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ID_MUST_NOT_BE_SET;
180          return response;
181        }
182        if (job.SerializedJobData == null) {
183          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL;
184          response.Success = false;
185          return response;
186        }
187
188        job.JobInfo.DateCreated = DateTime.Now;
189        DaoLocator.JobDao.InsertWithAttachedJob(job);
190        DaoLocator.PluginInfoDao.InsertPluginDependenciesForJob(job.JobInfo);
191
192        response.Success = true;
193        response.Obj = job.JobInfo;
194        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_ADDED;
195      } else {
196        response.Success = false;
197        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL;
198      }
199
200      return response;
201    }
202
203    /// <summary>
204    /// Removes a job from the database
205    /// </summary>
206    /// <param name="jobId"></param>
207    /// <returns></returns>
208    public Response RemoveJob(Guid jobId) {
209      Response response = new Response();
210
211      JobDto job = DaoLocator.JobDao.FindById(jobId);
212      if (job == null) {
213        response.Success = false;
214        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
215        return response;
216      }
217      DaoLocator.JobDao.Delete(job);
218      response.Success = false;
219      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_REMOVED;
220
221      return response;
222    }
223
224    public ResponseObject<JobDto> GetLastJobResultOf(Guid jobId) {
225      ResponseObject<JobDto> result =
226        new ResponseObject<JobDto>();
227
228      result.Obj =
229        GetLastJobResult(jobId);
230      result.Success =
231        result.Obj != null;
232
233      return result;
234    }
235
236    //Requested means: there MUST be a job result which gets sent back
237    public ResponseObject<SerializedJob> GetLastSerializedJobResultOf(Guid jobId, bool requested, bool snapshot) {
238      ResponseObject<SerializedJob> response =
239        new ResponseObject<SerializedJob>();
240
241      JobDto job = DaoLocator.JobDao.FindById(jobId);
242
243      //if it's a snapshot but the result hasn't reached the server yet...
244      if (snapshot && (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent)) {
245        response.Success = true;
246        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE;
247
248        return response;
249      }
250
251      //if it's NOT a snapshot, NEITHER request NOR is it finished
252      if (!requested && !snapshot && job.State != State.finished) {
253        response.Success = true;
254        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE;
255
256        return response;
257      }
258
259      //every other case - snapshot, job finished or it's requested
260      response.Success = true;
261      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_RESULT_SENT;
262      response.Obj = new SerializedJob();
263      response.Obj.JobInfo = job;
264
265      response.Obj.SerializedJobData =
266        DaoLocator.JobDao.GetBinaryJobFile(jobId);
267
268      return response;
269    }
270
271    public Response RequestSnapshot(Guid jobId) {
272      Response response = new Response();
273
274      JobDto job = DaoLocator.JobDao.FindById(jobId);
275      if (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent) {
276        response.Success = true;
277        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_ALLREADY_SET;
278        return response;
279      }
280      if (job.State != State.calculating) {
281        response.Success = false;
282        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
283        return response;
284      }
285      // job is in correct state
286      job.State = State.requestSnapshot;
287      DaoLocator.JobDao.Update(job);
288
289      response.Success = true;
290      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_SET;
291
292      return response;
293    }
294
295    public Response AbortJob(Guid jobId) {
296      Response response = new Response();
297
298      JobDto job = DaoLocator.JobDao.FindById(jobId);
299      if (job == null) {
300        response.Success = false;
301        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
302        return response; // no commit needed
303      }
304      if (job.State == State.abort) {
305        response.Success = true;
306        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_ALLREADY_SET;
307        return response; // no commit needed
308      }
309      if (job.State != State.calculating && job.State != State.requestSnapshot && job.State != State.requestSnapshotSent) {
310        response.Success = false;
311        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
312        return response; // no commit needed
313      }
314      // job is in correct state
315      job.State = State.abort;
316      DaoLocator.JobDao.Update(job);
317
318      response.Success = true;
319      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_SET;
320
321      return response;
322    }
323
324    public ResponseList<JobResult> GetAllJobResults(Guid jobId) {
325      return new ResponseList<JobResult>();
326    }
327
328    public ResponseList<ProjectDto> GetAllProjects() {
329      return null;
330    }
331
332    private Response createUpdateProject(ProjectDto project) {
333      return null;
334    }
335
336    public Response CreateProject(ProjectDto project) {
337      return createUpdateProject(project);
338    }
339
340    public Response ChangeProject(ProjectDto project) {
341      return createUpdateProject(project);
342    }
343
344    public Response DeleteProject(Guid projectId) {
345      return null;
346    }
347
348    public ResponseList<JobDto> GetJobsByProject(Guid projectId) {
349      return null;
350    }
351
352    public byte[] GetSerializedJobDataById(Guid jobId) {
353      return DaoLocator.JobDao.GetBinaryJobFile(jobId);
354    }
355
356    public void SetSerializedJobDataById(Guid jobId, byte[] data) {
357      DaoLocator.JobDao.SetBinaryJobFile(jobId, data);
358    }
359
360    #endregion
361  }
362}
Note: See TracBrowser for help on using the repository browser.