Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/HeartbeatManager.cs @ 6347

Last change on this file since 6347 was 6269, checked in by cneumuel, 14 years ago

#1233

  • added CpuUtilization to heartbeats
File size: 5.2 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using HeuristicLab.Services.Hive.Common;
5using HeuristicLab.Services.Hive.Common.DataTransfer;
6
7namespace HeuristicLab.Services.Hive {
8  public class HeartbeatManager {
9    private DataAccess.IHiveDao dao {
10      get { return ServiceLocator.Instance.HiveDao; }
11    }
12    private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans {
13      get { return ServiceLocator.Instance.TransactionManager; }
14    }
15    private IAuthorizationManager auth {
16      get { return ServiceLocator.Instance.AuthorizationManager; }
17    }
18
19    /// <summary>
20    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
21    /// </summary>
22    /// <returns>a list of actions the slave should do</returns>
23    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
24      List<MessageContainer> actions = new List<MessageContainer>();
25      Slave slave = dao.GetSlave(heartbeat.SlaveId);
26      if (slave == null) {
27        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
28      } else {
29        // update slave data
30        slave.FreeCores = heartbeat.FreeCores;
31        slave.FreeMemory = heartbeat.FreeMemory;
32        slave.CpuUtilization = heartbeat.CpuUtilization;
33        slave.IsAllowedToCalculate = true; // Todo: look into calendar
34        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
35        slave.LastHeartbeat = DateTime.Now;
36        dao.UpdateSlave(slave);
37
38        // update job data
39        actions.AddRange(UpdateJobs(heartbeat));
40
41        // assign new job
42        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
43          var availableJobs = dao.GetWaitingJobs(slave, 1);
44          if (availableJobs.Count() > 0) {
45            var job = availableJobs.First();
46            actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateJob, job.Id));
47            AssignJob(slave, job);
48          }
49        }
50      }
51      return actions;
52    }
53
54    private void AssignJob(Slave slave, Job job) {
55      job = dao.UpdateJobState(job.Id, JobState.Transferring, slave.Id, null, null);
56      dao.UpdateSlave(slave);
57
58      // from now on the job has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
59      job.LastHeartbeat = DateTime.Now;
60      dao.UpdateJob(job);
61    }
62
63    /// <summary>
64    /// Update the progress of each job
65    /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
66    /// </summary>
67    private IEnumerable<MessageContainer> UpdateJobs(Heartbeat heartbeat) {
68      List<MessageContainer> actions = new List<MessageContainer>();
69
70      if (heartbeat.JobProgress == null)
71        return actions;
72
73      // process the jobProgresses
74      foreach (var jobProgress in heartbeat.JobProgress) {
75        Job curJob = dao.GetJob(jobProgress.Key);
76        if (curJob == null) {
77          // job does not exist in db
78          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
79          LogFactory.GetLogger(this.GetType().Namespace).Log("Job does not exist in DB: " + jobProgress.Key);
80        } else {
81          if (curJob.CurrentStateLog.SlaveId == Guid.Empty || curJob.CurrentStateLog.SlaveId != heartbeat.SlaveId) {
82            // assigned slave does not match heartbeat
83            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
84            LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
85          } else if (!JobIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curJob)) {
86            // assigned resources ids of job do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
87            actions.Add(new MessageContainer(MessageContainer.MessageType.PauseJob, curJob.Id));
88          } else {
89            // save job execution time
90            curJob.ExecutionTime = jobProgress.Value;
91            curJob.LastHeartbeat = DateTime.Now;
92
93            switch (curJob.Command) {
94              case Command.Stop:
95                actions.Add(new MessageContainer(MessageContainer.MessageType.StopJob, curJob.Id));
96                break;
97              case Command.Pause:
98                actions.Add(new MessageContainer(MessageContainer.MessageType.PauseJob, curJob.Id));
99                break;
100              case Command.Abort:
101                actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
102                break;
103            }
104            dao.UpdateJob(curJob);
105          }
106        }
107      }
108      return actions;
109    }
110
111    private bool JobIsAllowedToBeCalculatedBySlave(Guid slaveId, Job curJob) {
112      var assignedResourceIds = dao.GetAssignedResources(curJob.Id).Select(x => x.Id);
113      var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id);
114      return assignedResourceIds.Any(x => slaveResourceIds.Contains(x));
115    }
116  }
117}
Note: See TracBrowser for help on using the repository browser.