Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/HeartbeatManager.cs @ 5718

Last change on this file since 5718 was 5718, checked in by cneumuel, 13 years ago

#1233

  • fixed statelog when time on server differs from slave or client
  • fixed wrong creation of childjobs in experiment manager
  • made ganttchardview the default view for statelogs
File size: 4.4 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using HeuristicLab.Services.Hive.Common;
5using HeuristicLab.Services.Hive.Common.DataTransfer;
6
7namespace HeuristicLab.Services.Hive {
8  public class HeartbeatManager {
9    private DataAccess.IHiveDao dao {
10      get { return ServiceLocator.Instance.HiveDao; }
11    }
12    private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans {
13      get { return ServiceLocator.Instance.TransactionManager; }
14    }
15    private IAuthorizationManager auth {
16      get { return ServiceLocator.Instance.AuthorizationManager; }
17    }
18
19    /// <summary>
20    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
21    /// </summary>
22    /// <returns>a list of actions the slave should do</returns>
23    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
24      List<MessageContainer> actions = new List<MessageContainer>();
25      Slave slave = dao.GetSlave(heartbeat.SlaveId);
26      if (slave == null) {
27        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
28      } else {
29        // update slave data
30        slave.FreeCores = heartbeat.FreeCores;
31        slave.FreeMemory = heartbeat.FreeMemory;
32        slave.IsAllowedToCalculate = true; // Todo: look into calendar
33        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
34        slave.LastHeartbeat = DateTime.Now;
35        dao.UpdateSlave(slave);
36
37        // update job data
38        actions.AddRange(UpdateJobs(heartbeat));
39
40        // assign new job
41        if (heartbeat.AssignJob && this.IsAllowedToSendJobs() && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
42          var availableJobs = dao.GetWaitingJobs(slave, 1);
43          if (availableJobs.Count() > 0) {
44            var job = availableJobs.First();
45            actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateJob, job.Id));
46            AssignJob(slave, job);
47          }
48        }
49      }
50      return actions;
51    }
52
53    private void AssignJob(Slave slave, Job job) {
54      dao.UpdateJobState(job.Id, JobState.Transferring, slave.Id, null, null);
55      dao.UpdateSlave(slave);
56    }
57
58    /// <summary>
59    /// Update the progress of each job
60    /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
61    /// </summary>
62    private IEnumerable<MessageContainer> UpdateJobs(Heartbeat heartbeat) {
63      List<MessageContainer> actions = new List<MessageContainer>();
64
65      if (heartbeat.JobProgress == null)
66        return actions;
67
68      // process the jobProgresses
69      foreach (var jobProgress in heartbeat.JobProgress) {
70        Job curJob = dao.GetJob(jobProgress.Key);
71        if (curJob == null) {
72          // job does not exist in db
73          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
74          LogFactory.GetLogger(this.GetType().Namespace).Log("Job does not exist in DB: " + jobProgress.Key);
75        } else {
76          if (curJob.CurrentStateLog.SlaveId == Guid.Empty || curJob.CurrentStateLog.SlaveId != heartbeat.SlaveId) {
77            // assigned slave does not match heartbeat
78            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
79            LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
80          } else {
81            // save job execution time
82            curJob.ExecutionTime = jobProgress.Value;
83            curJob.LastHeartbeat = DateTime.Now;
84
85            if (curJob.State == JobState.Aborted) {
86              // a request to abort the job has been set
87              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
88            }
89            dao.UpdateJob(curJob);
90          }
91        }
92      }
93      return actions;
94    }
95
96    /// <summary>
97    /// Returns true if there are enough resources to send a job
98    /// There should not be too many jobs sent simultaniously
99    /// </summary>
100    private bool IsAllowedToSendJobs() {
101      return true; // JobsCurrentlyTransferring < ApplicationConstants.MaxJobTransferCount;
102      // Todo: see if unlimited job transfer count works. if not, look into db and count jobs in state Transferring
103    }
104  }
105}
Note: See TracBrowser for help on using the repository browser.