Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/JobManager.cs @ 5153

Last change on this file since 5153 was 5153, checked in by cneumuel, 13 years ago

#1260

  • increased timeouts for sent jobs (which are needed if the jobs take long time to deserialize on slave)
  • added DeleteJob to ClientService
  • made optimizer actually Pause instead of Stop when stop is called explicitly (so they can be resumed later)
  • temporarily disabled job-abortion from server because it aborted jobs which took too long to deserialize on slaves (this issue needs to be investigated)
  • reduced locking of engines on slave so that the deserialization does not block heartbeats

#1347

  • worked on HiveEngine
  • added test project for HiveEngine
File size: 16.4 KB
Line 
1#region License Information
2
3/* HeuristicLab
4 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
5 *
6 * This file is part of HeuristicLab.
7 *
8 * HeuristicLab is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
12 *
13 * HeuristicLab is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
20 */
21
22#endregion
23
24using System;
25using System.Collections.Generic;
26using System.IO;
27using System.Linq;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Hive.Contracts.BusinessObjects;
30using HeuristicLab.Hive.Contracts.Interfaces;
31using HeuristicLab.Hive.Contracts.ResponseObjects;
32using HeuristicLab.Hive.Server.Core.InternalInterfaces;
33using HeuristicLab.Hive.Server.DataAccess;
34using HeuristicLab.Hive.Tracing;
35
36namespace HeuristicLab.Hive.Server.Core {
37  internal class JobManager : IJobManager, IInternalJobManager {
38    private ILifecycleManager lifecycleManager;
39
40    #region IJobManager Members
41
42    public JobManager() {
43      lifecycleManager = ServiceLocator.GetLifecycleManager();
44
45      lifecycleManager.Started += new EventHandler(lifecycleManager_Started);
46      lifecycleManager.Stopped += new EventHandler(lifecycleManager_Stopped);
47    }
48
49    private void CheckForDeadJobs() {
50      Logger.Info("Searching for dead Jobs");
51
52      IEnumerable<JobDto> allJobs = DaoLocator.JobDao.FindAll();
53      foreach (JobDto curJob in allJobs) {
54        if (curJob.State != JobState.Calculating &&
55            curJob.State != JobState.Finished &&
56            curJob.State != JobState.Aborted &&
57            curJob.State != JobState.Failed &&
58            curJob.State != JobState.WaitForChildJobs) {
59          DaoLocator.JobDao.SetJobOffline(curJob);
60        }
61      }
62    }
63
64    private void lifecycleManager_Started(object sender, EventArgs e) {
65      Logger.Info("Startup Event Fired, Checking DB for consistency");
66      CheckForDeadJobs();
67      Logger.Info("Startup Event Done");
68    }
69
70    private void lifecycleManager_Stopped(object sender, EventArgs e) {
71      Logger.Info("Stopped Event Fired, Checking DB for consistency");
72      CheckForDeadJobs();
73      Logger.Info("Stopped Event Done");
74    }
75
76    /// <summary>
77    /// returns all jobs stored in the database
78    /// </summary>
79    /// <returns></returns>
80    public ResponseList<JobDto> GetAllJobs() {
81      ResponseList<JobDto> response = new ResponseList<JobDto>();
82      response.List = new List<JobDto>(DaoLocator.JobDao.FindAll());
83      return response;
84    }
85
86    public ResponseList<JobDto> GetAllJobsWithFilter(JobState jobState, int offset, int count) {
87      ResponseList<JobDto> response = new ResponseList<JobDto>();
88      response.List = new List<JobDto>(DaoLocator.JobDao.FindWithLimitations(jobState, offset, count));
89      return response;
90    }
91
92    /// <summary>
93    /// Gets the streamed job
94    /// </summary>
95    /// <param name="jobId"></param>
96    /// <returns></returns>
97    public Stream GetJobStreamById(Guid jobId) {
98      return DaoLocator.JobDao.GetSerializedJobStream(jobId);
99    }
100
101    /// <summary>
102    /// returns the job with the specified id
103    /// </summary>
104    /// <returns></returns>
105    public ResponseObject<JobDto> GetJobById(Guid jobId) {
106      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
107
108      response.Obj = DaoLocator.JobDao.FindById(jobId);
109      if (response.Obj != null) {
110        response.StatusMessage = ResponseStatus.Ok;
111      } else {
112        response.StatusMessage = ResponseStatus.GetJobById_JobDoesNotExist;
113      }
114
115      return response;
116    }
117
118    /// <summary>
119    /// Returns JobDto object with Slave-Object attached
120    /// </summary>
121    public ResponseObject<JobDto> GetJobByIdWithDetails(Guid jobId) {
122      ResponseObject<JobDto> job = new ResponseObject<JobDto>();
123      job.Obj = DaoLocator.JobDao.FindById(jobId);
124      if (job.Obj != null) {
125        job.StatusMessage = ResponseStatus.Ok;
126
127        job.Obj.Slave = DaoLocator.SlaveDao.GetSlaveForJob(jobId);
128      } else {
129        job.StatusMessage = ResponseStatus.GetJobByIdWithDetails_JobDoesNotExist;
130      }
131      return job;
132    }
133
134    public ResponseObject<JobDto> AddJobWithGroupStrings(SerializedJob job, IEnumerable<string> resources) {
135      ISlaveGroupDao cgd = DaoLocator.SlaveGroupDao;
136      foreach (string res in resources) {
137        foreach (SlaveGroupDto cg in cgd.FindByName(res)) {
138          job.JobInfo.AssignedResourceIds.Add(cg.Id);
139        }
140      }
141      return AddNewJob(job, true);
142    }
143
144    /// <summary>
145    /// Adds a new job into the database
146    /// </summary>
147    /// <param name="job"></param>
148    /// <param name="overrideUserid">if false, job.JobInfo.UserId will not be overridden</param>
149    /// <returns></returns>
150    public ResponseObject<JobDto> AddNewJob(SerializedJob job, bool overrideUserid) {
151      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
152
153      if (job != null && job.JobInfo != null) {
154        // only Offline and WaitForChildJobs are allowed
155        if (job.JobInfo.State != JobState.Offline && job.JobInfo.State != JobState.WaitForChildJobs) {
156          response.StatusMessage = ResponseStatus.AddNewJob_InvalidJobState;
157          return response;
158        }
159        if (job.JobInfo.Id != Guid.Empty) {
160          response.StatusMessage = ResponseStatus.AddNewJob_JobIdMustNotBeSet;
161          return response;
162        }
163        if (job.SerializedJobData == null) {
164          response.StatusMessage = ResponseStatus.AddNewJob_JobNull;
165          return response;
166        }
167
168        job.JobInfo.DateCreated = DateTime.Now;
169        if (overrideUserid) {
170          job.JobInfo.UserId = ServiceLocator.GetAuthorizationManager().UserId;
171        }
172        DaoLocator.JobDao.InsertWithAttachedJob(job);
173        DaoLocator.PluginInfoDao.InsertPluginDependenciesForJob(job.JobInfo);
174
175        response.Obj = job.JobInfo;
176      } else {
177        response.StatusMessage = ResponseStatus.AddNewJob_JobNull;
178      }
179
180      return response;
181    }
182
183    public ResponseObject<JobDto> AddNewJob(SerializedJob job) {
184      return AddNewJob(job, true);
185    }
186
187    /// <summary>
188    /// Removes a job from the database
189    ///
190    /// [chn] this is currently not used anywhere -> check redundancy with AbortJob
191    /// </summary>
192    /// <param name="jobId"></param>
193    /// <returns></returns>
194    //public Response RemoveJob(Guid jobId) {
195    //  Response response = new Response();
196
197    //  JobDto job = DaoLocator.JobDao.FindById(jobId);
198    //  if (job == null) {
199    //    response.StatusMessage = ResponseStatusMessage.RemoveJob_JobDoesNotExist;
200    //    return response;
201    //  }
202    //  DaoLocator.JobDao.Delete(job);
203
204    //  return response;
205    //}
206
207    ///
208    /// [chn] currently not used anywhere
209    ///
210    //public ResponseObject<JobDto> GetLastJobResultOf(Guid jobId) {
211    //  ResponseObject<JobDto> result = new ResponseObject<JobDto>();
212
213    //  result.Obj = DaoLocator.JobDao.FindById(jobId);
214    //  if (result.Obj == null) {
215    //    result.StatusMessage = ResponseStatusMessage.
216    //  }
217
218    //  return result;
219    //}
220
221    public ResponseObject<SerializedJob> GetLastSerializedResult(Guid jobId) {
222      ResponseObject<SerializedJob> response = new ResponseObject<SerializedJob>();
223      JobDto job = DaoLocator.JobDao.FindById(jobId);
224
225      response.Obj = new SerializedJob();
226      response.Obj.JobInfo = job;
227
228      if (job != null) {
229        response.Obj.SerializedJobData = DaoLocator.JobDao.GetBinaryJobFile(jobId);
230      } else {
231        response.StatusMessage = ResponseStatus.GetLastSerializedResult_JobDoesNotExist;
232      }
233
234      return response;
235    }
236
237    public ResponseObject<SerializedJob> GetSnapshotResult(Guid jobId) {
238      ResponseObject<SerializedJob> response = new ResponseObject<SerializedJob>();
239
240      JobDto job = DaoLocator.JobDao.FindById(jobId);
241
242      //if it's a snapshot but the result hasn't reached the server yet...
243      if (job.State == JobState.SnapshotRequested || job.State == JobState.SnapshotSent) {
244        response.StatusMessage = ResponseStatus.GetSnapshotResult_JobResultNotYetThere;
245        return response;
246      }
247
248      response.Obj = new SerializedJob();
249      response.Obj.JobInfo = job;
250      response.Obj.SerializedJobData = DaoLocator.JobDao.GetBinaryJobFile(jobId);
251
252      return response;
253    }
254
255    public Response RequestSnapshot(Guid jobId) {
256      Response response = new Response();
257
258      JobDto job = DaoLocator.JobDao.FindById(jobId);
259      if (job.State == JobState.SnapshotRequested || job.State == JobState.SnapshotSent) {
260        response.StatusMessage = ResponseStatus.RequestSnapshot_SnapshotAlreadyRequested;
261        return response;
262      }
263      if (job.State != JobState.Calculating) {
264        response.StatusMessage = ResponseStatus.RequestSnapshot_JobIsNotBeeingCalculated;
265        return response;
266      }
267      // job is in correct state
268      job.State = JobState.SnapshotRequested;
269      DaoLocator.JobDao.Update(job);
270
271      return response;
272    }
273
274    /// <summary>
275    /// [chn] rename to RequestAbortJob?
276    /// </summary>
277    public Response AbortJob(Guid jobId) {
278      Logger.Debug("JobManager.AbortJob: " + jobId);
279      Response response = new Response();
280
281      JobDto job = DaoLocator.JobDao.FindById(jobId);
282      if (job == null) {
283        //response.Success = false;
284        response.StatusMessage = ResponseStatus.AbortJob_JobDoesNotExist;
285        return response; // no commit needed
286      }
287      if (job.State == JobState.Aborted) {
288        //response.Success = true;
289        response.StatusMessage = ResponseStatus.AbortJob_AbortAlreadyRequested;
290        return response; // no commit needed
291      }
292      // job is in correct state
293      job.State = JobState.Aborted;
294      DaoLocator.JobDao.Update(job);
295
296      return response;
297    }
298
299    /// <summary>
300    /// Returns the current state for all jobs requested
301    /// </summary>
302    /// <param name="jobIds"></param>
303    /// <returns></returns>
304    public ResponseObject<JobResultList> GetJobResults(IEnumerable<Guid> jobIds) {
305      ResponseObject<JobResultList> response = new ResponseObject<JobResultList>();
306      JobResultList jobResultList = new JobResultList();
307      IEnumerable<JobDto> jobs = DaoLocator.JobDao.FindJobsById(jobIds);
308      foreach (JobDto job in jobs) {
309        jobResultList.Add(new JobResult() {
310          Id = job.Id,
311          State = job.State,
312          DateCalculated = job.DateCalculated,
313          DateFinished = job.DateFinished,
314          Exception = job.Exception,
315          ExecutionTime = job.ExecutionTime
316        });
317      }
318      response.Obj = jobResultList;
319      return response;
320    }
321
322    public byte[] GetSerializedJobDataById(Guid jobId) {
323      return DaoLocator.JobDao.GetBinaryJobFile(jobId);
324    }
325
326    public void SetSerializedJobDataById(Guid jobId, byte[] data) {
327      DaoLocator.JobDao.SetBinaryJobFile(jobId, data);
328    }
329
330    public ResponseObject<JobResultList> GetChildJobResults(Guid? parentJobId, bool recursive, bool includeParent) {
331      ResponseObject<JobResultList> response = new ResponseObject<JobResultList>();
332      JobResultList jobResultList = new JobResultList();
333
334      IList<JobDto> jobs = DaoLocator.JobDao.FindJobsByParentId(parentJobId, recursive).ToList();
335      if (!parentJobId.HasValue) {
336        jobs = jobs.Where(job => job.UserId == ServiceLocator.GetAuthorizationManager().UserId).ToList();
337      }
338
339      if (includeParent && parentJobId.HasValue) {
340        jobs.Add(DaoLocator.JobDao.FindById(parentJobId.Value));
341      }
342
343      foreach (JobDto job in jobs) {
344        if (job != null) {
345          jobResultList.Add(new JobResult() {
346            Id = job.Id,
347            State = job.State,
348            DateCreated = job.DateCreated,
349            DateCalculated = job.DateCalculated,
350            DateFinished = job.DateFinished,
351            Exception = job.Exception,
352            ExecutionTime = job.ExecutionTime,
353            ParentJobId = job.ParentJobId
354          });
355        }
356      }
357
358      response.Obj = jobResultList;
359      return response;
360    }
361
362    public ResponseObject<JobDto> AddChildJob(Guid parentJobId, SerializedJob serializedJob) {
363      JobDto parentJob = DaoLocator.JobDao.FindById(parentJobId);
364
365      serializedJob.JobInfo.ParentJobId = parentJob.Id;
366      serializedJob.JobInfo.UserId = parentJob.UserId;
367      serializedJob.JobInfo.AssignedResourceIds = parentJob.AssignedResourceIds;
368
369      return AddNewJob(serializedJob, false);
370    }
371
372    public ResponseObject<JobDto> PauseJob(SerializedJob serializedJob) {
373      JobDto jobDto = DaoLocator.JobDao.FindById(serializedJob.JobInfo.Id);
374
375      jobDto.State = serializedJob.JobInfo.State;
376      jobDto.ExecutionTime = serializedJob.JobInfo.ExecutionTime;
377
378      DaoLocator.JobDao.SetBinaryJobFile(serializedJob.JobInfo.Id, serializedJob.SerializedJobData);
379      DaoLocator.JobDao.Update(jobDto);
380
381      DaoLocator.JobDao.UnAssignSlaveToJob(jobDto.Id);
382
383      return new ResponseObject<JobDto>() {
384        Obj = jobDto,
385        StatusMessage = ResponseStatus.Ok
386      };
387    }
388
389
390    public ResponseObject<HiveExperimentDtoList> GetHiveExperiments() {
391      ResponseObject<HiveExperimentDtoList> response = new ResponseObject<HiveExperimentDtoList>();
392      IEnumerable<HiveExperimentDto> hiveExperiments = DaoLocator.HiveExperimentDao.FindAllByUserId(ServiceLocator.GetAuthorizationManager().UserId);
393      response.Obj = new HiveExperimentDtoList(hiveExperiments);
394      response.StatusMessage = ResponseStatus.Ok;
395      return response;
396    }
397
398    public ResponseObject<HiveExperimentDto> UpdateHiveExperiment(HiveExperimentDto hiveExperimentDto) {
399      ResponseObject<HiveExperimentDto> response = new ResponseObject<HiveExperimentDto>();
400      if (hiveExperimentDto.Id == Guid.Empty) {
401        response.Obj = DaoLocator.HiveExperimentDao.Insert(hiveExperimentDto);
402      } else {
403        DaoLocator.HiveExperimentDao.Update(hiveExperimentDto);
404        response.Obj = hiveExperimentDto;
405      }
406      response.StatusMessage = ResponseStatus.Ok;
407      return response;
408    }
409
410    public Response DeleteHiveExperiment(Guid hiveExperimentId) {
411      Response response = new Response();
412      try {
413        DaoLocator.HiveExperimentDao.Delete(hiveExperimentId);
414      }
415      catch (Exception) {
416        response.StatusMessage = ResponseStatus.DeleteHiveExperiment_Failed;
417      }
418      return response;
419    }
420
421    public Response DeleteChildJobs(Guid jobId) {
422      Response response = new Response();
423      try {
424        IEnumerable<JobDto> childs = DaoLocator.JobDao.FindJobsByParentId(jobId, true);
425        foreach (JobDto childJob in childs) {
426          DaoLocator.JobDao.Delete(childJob.Id);
427        }
428      }
429      catch (Exception) {
430        response.StatusMessage = ResponseStatus.DeleteChildJobs_Failed;
431      }
432      return response;
433    }
434
435    public Response DeleteJob(Guid jobId) {
436      Response response = new Response();
437      try {
438        DaoLocator.JobDao.Delete(jobId);
439      }
440      catch (Exception) {
441        response.StatusMessage = ResponseStatus.DeleteJob_Failed;
442      }
443      return response;
444    }
445
446    #endregion
447
448    #region Project handling (currently unimplemented)
449
450    public ResponseList<ProjectDto> GetAllProjects() {
451      throw new NotImplementedException();
452    }
453
454    private Response CreateUpdateProject(ProjectDto project) {
455      throw new NotImplementedException();
456    }
457
458    public Response CreateProject(ProjectDto project) {
459      return CreateUpdateProject(project);
460    }
461
462    public Response ChangeProject(ProjectDto project) {
463      return CreateUpdateProject(project);
464    }
465
466    public Response DeleteProject(Guid projectId) {
467      throw new NotImplementedException();
468    }
469
470    public ResponseList<JobDto> GetJobsByProject(Guid projectId) {
471      throw new NotImplementedException();
472    }
473    #endregion
474
475  }
476}
Note: See TracBrowser for help on using the repository browser.