Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.2/sources/HeuristicLab.Hive.Server.Core/3.2/JobManager.cs @ 4042

Last change on this file since 4042 was 4042, checked in by kgrading, 14 years ago

#828 added various improvements to the plugin cache manager, the execution engine, the transaction handling on the serverside and the server console

File size: 12.5 KB
Line 
1#region License Information
2
3/* HeuristicLab
4 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
5 *
6 * This file is part of HeuristicLab.
7 *
8 * HeuristicLab is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
12 *
13 * HeuristicLab is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
20 */
21
22#endregion
23
24using System;
25using System.Collections.Generic;
26using System.Linq;
27using System.Text;
28using HeuristicLab.Hive.Contracts.Interfaces;
29using HeuristicLab.Hive.Contracts.BusinessObjects;
30using HeuristicLab.Hive.Contracts;
31using HeuristicLab.Hive.Server.DataAccess;
32using HeuristicLab.Hive.Server.Core.InternalInterfaces;
33using HeuristicLab.DataAccess.Interfaces;
34using System.Data;
35using System.IO;
36using HeuristicLab.Tracing;
37using System.Transactions;
38using HeuristicLab.Hive.Server.LINQDataAccess;
39using IsolationLevel=System.Transactions.IsolationLevel;
40
41namespace HeuristicLab.Hive.Server.Core {
42  internal class JobManager : IJobManager, IInternalJobManager {
43    //ISessionFactory factory;
44    private ILifecycleManager lifecycleManager;
45
46    #region IJobManager Members
47
48    public JobManager() {
49      //factory = ServiceLocator.GetSessionFactory();
50      lifecycleManager = ServiceLocator.GetLifecycleManager();
51
52      lifecycleManager.RegisterStartup(new EventHandler(lifecycleManager_OnStartup));
53      lifecycleManager.RegisterStartup(new EventHandler(lifecycleManager_OnShutdown));
54    }
55
56    private JobDto GetLastJobResult(Guid jobId) {
57      return DaoLocator.JobDao.FindById(jobId);
58    }
59
60
61    private void checkForDeadJobs() {
62      Logger.Info("Searching for dead Jobs");
63      using (
64        TransactionScope scope = new TransactionScope(TransactionScopeOption.Required,
65                                                      new TransactionOptions
66                                                      {IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE})) {
67        List<JobDto> allJobs = new List<JobDto>(DaoLocator.JobDao.FindAll());
68        foreach (JobDto curJob in allJobs) {
69          if (curJob.State != State.calculating && curJob.State != State.finished) {
70            DaoLocator.JobDao.SetJobOffline(curJob);
71          }
72        }
73        scope.Complete();
74      }
75      DaoLocator.DestroyContext();
76    }
77
78    private void lifecycleManager_OnStartup(object sender, EventArgs e) {
79      Logger.Info("Startup Event Fired, Checking DB for consistency");
80      checkForDeadJobs();
81      Logger.Info("Startup Event Done");
82    }
83
84    private void lifecycleManager_OnShutdown(object sender, EventArgs e) {
85      Logger.Info("Startup Event Fired, Checking DB for consistency");
86      checkForDeadJobs();
87      Logger.Info("Startup Event Done");
88    }
89
90    /// <summary>
91    /// returns all jobs stored in the database
92    /// </summary>
93    /// <returns></returns>
94    public ResponseList<JobDto> GetAllJobs() {
95      ResponseList<JobDto> response = new ResponseList<JobDto>();
96
97      response.List = new List<JobDto>(DaoLocator.JobDao.FindAll());
98      response.Success = true;
99      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ALL_JOBS;
100
101      return response;
102    }
103
104    public ResponseList<JobDto> GetAllJobsWithFilter(State jobState, int offset, int count) {
105      ResponseList<JobDto> response = new ResponseList<JobDto>();
106      response.List = new List<JobDto>(DaoLocator.JobDao.FindWithLimitations(jobState, offset, count));
107      response.Success = true;
108      return response;
109    }
110
111    /// <summary>
112    /// Gets the streamed job
113    /// </summary>
114    /// <param name="jobId"></param>
115    /// <returns></returns>
116    public Stream GetJobStreamById(Guid jobId) {
117      return DaoLocator.JobDao.GetSerializedJobStream(jobId);
118    }
119
120    /// <summary>
121    /// returns the job with the specified id
122    /// </summary>
123    /// <returns></returns>
124    public ResponseObject<JobDto> GetJobById(Guid jobId) {
125      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
126
127      response.Obj = DaoLocator.JobDao.FindById(jobId);
128      if (response.Obj != null) {
129        response.Success = true;
130        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_GET_JOB_BY_ID;
131      }
132      else {
133        response.Success = false;
134        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
135      }
136
137      return response;
138    }
139
140    public ResponseObject<JobDto> GetJobByIdWithDetails(Guid jobId) {
141      ResponseObject<JobDto> job = new ResponseObject<JobDto>();
142      job.Obj = DaoLocator.JobDao.FindById(jobId);
143      if (job.Obj != null) {
144        job.Success = true;
145        job.StatusMessage = ApplicationConstants.RESPONSE_JOB_GET_JOB_BY_ID;
146
147        job.Obj.Client = DaoLocator.ClientDao.GetClientForJob(jobId);
148      }
149      else {
150        job.Success = false;
151        job.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
152      }
153      return job;
154    }
155
156    public ResponseObject<JobDto> AddJobWithGroupStrings(SerializedJob job, IEnumerable<string> resources) {
157      IClientGroupDao cgd = DaoLocator.ClientGroupDao;
158      foreach (string res in resources) {
159        foreach (ClientGroupDto cg in cgd.FindByName(res)) {
160          job.JobInfo.AssignedResourceIds.Add(cg.Id);
161        }
162      }
163      return AddNewJob(job);
164    }
165
166    /// <summary>
167    /// Adds a new job into the database
168    /// </summary>
169    /// <param name="job"></param>
170    /// <returns></returns>
171    public ResponseObject<JobDto> AddNewJob(SerializedJob job) {
172      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
173
174      if (job != null && job.JobInfo != null) {
175        if (job.JobInfo.State != State.offline) {
176          response.Success = false;
177          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOBSTATE_MUST_BE_OFFLINE;
178          return response;
179        }
180        if (job.JobInfo.Id != Guid.Empty) {
181          response.Success = false;
182          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ID_MUST_NOT_BE_SET;
183          return response;
184        }
185        if (job.SerializedJobData == null) {
186          response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL;
187          response.Success = false;
188          return response;
189        }
190
191        job.JobInfo.DateCreated = DateTime.Now;
192        DaoLocator.JobDao.InsertWithAttachedJob(job);
193        DaoLocator.PluginInfoDao.InsertPluginDependenciesForJob(job.JobInfo);
194
195        response.Success = true;
196        response.Obj = job.JobInfo;
197        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_ADDED;
198      }
199      else {
200        response.Success = false;
201        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_NULL;
202      }
203
204      return response;
205    }
206
207    /// <summary>
208    /// Removes a job from the database
209    /// </summary>
210    /// <param name="jobId"></param>
211    /// <returns></returns>
212    public Response RemoveJob(Guid jobId) {
213      Response response = new Response();
214
215      JobDto job = DaoLocator.JobDao.FindById(jobId);
216      if (job == null) {
217        response.Success = false;
218        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
219        return response;
220      }
221      DaoLocator.JobDao.Delete(job);
222      response.Success = false;
223      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_REMOVED;
224
225      return response;
226    }
227
228    public ResponseObject<JobDto> GetLastJobResultOf(Guid jobId) {
229      ResponseObject<JobDto> result =
230        new ResponseObject<JobDto>();
231
232      result.Obj =
233        GetLastJobResult(jobId);
234      result.Success =
235        result.Obj != null;
236
237      return result;
238    }
239
240    //Requested means: there MUST be a job result which gets sent back
241    public ResponseObject<SerializedJob>
242      GetLastSerializedJobResultOf(Guid jobId, bool requested, bool snapshot) {
243      ResponseObject<SerializedJob> response =
244        new ResponseObject<SerializedJob>();
245
246      JobDto job = DaoLocator.JobDao.FindById(jobId);
247
248      //if it's a snapshot but the result hasn't reached the server yet...
249      if (snapshot && (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent)) {
250        response.Success = true;
251        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE;
252
253        return response;
254      }
255
256      //if it's NOT a snapshot, NEITHER request NOR is it finished
257      if(!requested && !snapshot && job.State != State.finished) {     
258        response.Success = true;
259        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE;
260
261        return response;
262      }
263
264      //every other case - snapshot, job finished or it's requested
265      response.Success = true;
266      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_RESULT_SENT;
267      response.Obj = new SerializedJob();
268      response.Obj.JobInfo = job;
269
270      response.Obj.SerializedJobData =
271        DaoLocator.JobDao.GetBinaryJobFile(jobId);
272
273      return response;
274    }
275
276
277    public Response RequestSnapshot(Guid jobId) {
278      Response response = new Response();
279
280
281      JobDto job = DaoLocator.JobDao.FindById(jobId);
282      if (job.State == State.requestSnapshot || job.State == State.requestSnapshotSent) {
283        response.Success = true;
284        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_ALLREADY_SET;
285        return response;
286      }
287      if (job.State != State.calculating) {
288        response.Success = false;
289        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
290        return response;
291      }
292      // job is in correct state
293      job.State = State.requestSnapshot;
294      DaoLocator.JobDao.Update(job);
295
296      response.Success = true;
297      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_REQUEST_SET;
298
299      return response;
300    }
301
302    public Response AbortJob(Guid jobId) {
303      Response response = new Response();
304
305      JobDto job = DaoLocator.JobDao.FindById(jobId);
306      if (job == null) {
307        response.Success = false;
308        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_JOB_DOESNT_EXIST;
309        return response; // no commit needed
310      }
311      if (job.State == State.abort) {
312        response.Success = true;
313        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_ALLREADY_SET;
314        return response; // no commit needed
315      }
316      if (job.State != State.calculating && job.State != State.requestSnapshot && job.State != State.requestSnapshotSent) {
317        response.Success = false;
318        response.StatusMessage = ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED;
319        return response; // no commit needed
320      }
321      // job is in correct state
322      job.State = State.abort;
323      DaoLocator.JobDao.Update(job);
324
325      response.Success = true;
326      response.StatusMessage = ApplicationConstants.RESPONSE_JOB_ABORT_REQUEST_SET;
327
328      return response;
329    }
330
331    public ResponseList<JobResult> GetAllJobResults(Guid jobId) {
332      return new ResponseList<JobResult>();
333    }
334
335    public ResponseList<ProjectDto> GetAllProjects() {
336      return null;
337    }
338
339    private Response createUpdateProject(ProjectDto project) {
340      return null;
341    }
342
343    public Response CreateProject(ProjectDto project) {
344      return createUpdateProject(project);
345    }
346
347    public Response ChangeProject(ProjectDto project) {
348      return createUpdateProject(project);
349    }
350
351    public Response DeleteProject(Guid projectId) {
352      return null;
353    }
354
355    public ResponseList<JobDto> GetJobsByProject(Guid projectId) {
356      return null;
357    }
358
359    public byte[] GetSerializedJobDataById(Guid jobId) {
360      return DaoLocator.JobDao.GetBinaryJobFile(jobId);
361    }
362
363    public void SetSerializedJobDataById(Guid jobId, byte[] data) {
364      DaoLocator.JobDao.SetBinaryJobFile(jobId, data);
365    }
366
367    #endregion
368  }
369}
Note: See TracBrowser for help on using the repository browser.