1 | using System;
2 | using System.Collections.Generic;
3 | using System.Linq;
4 | using HeuristicLab.Services.Hive.Common;
5 | using HeuristicLab.Services.Hive.Common.DataTransfer;
6 | using HeuristicLab.Tracing;
7 |
8 | namespace HeuristicLab.Services.Hive {
9 | public class HeartbeatManager {
10 | private DataAccess.IHiveDao dao {
11 | get { return ServiceLocator.Instance.HiveDao; }
12 | }
13 | private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans {
14 | get { return ServiceLocator.Instance.TransactionManager; }
15 | }
16 | private IAuthorizationManager auth {
17 | get { return ServiceLocator.Instance.AuthorizationManager; }
18 | }
19 |
20 | /// <summary>
21 | /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
22 | /// </summary>
23 | /// <returns>a list of actions the slave should do</returns>
24 | public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
25 | List<MessageContainer> actions = new List<MessageContainer>();
26 | Slave slave = dao.GetSlave(heartbeat.SlaveId);
27 | if (slave == null) {
28 | actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
29 | } else {
30 | // update slave data
31 | slave.FreeCores = heartbeat.FreeCores;
32 | slave.FreeMemory = heartbeat.FreeMemory;
33 | slave.IsAllowedToCalculate = true; // Todo: look into calendar
34 | slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
35 | slave.LastHeartbeat = DateTime.Now;
36 | dao.UpdateSlave(slave);
37 |
38 | // update job data
39 | actions.AddRange(UpdateJobs(heartbeat));
40 |
41 | // assign new job
42 | if (heartbeat.AssignJob && this.IsAllowedToSendJobs() && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
43 | var availableJobs = dao.GetWaitingJobs(slave, 1);
44 | if (availableJobs.Count() > 0) {
45 | var job = availableJobs.First();
46 | actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateJob, job.Id));
47 | AssignJob(slave, job);
48 | }
49 | }
50 | }
51 | return actions;
52 | }
53 |
54 | private void AssignJob(Slave slave, Job job) {
55 | job.SetState(JobState.Transferring, slave.Id, "");
56 | dao.UpdateJob(job);
57 | dao.UpdateSlave(slave);
58 | }
59 |
60 | /// <summary>
61 | /// Update the progress of each job
62 | /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
63 | /// </summary>
64 | private IEnumerable<MessageContainer> UpdateJobs(Heartbeat heartbeat) {
65 | List<MessageContainer> actions = new List<MessageContainer>();
66 |
67 | if (heartbeat.JobProgress == null)
68 | return actions;
69 |
70 | // process the jobProgresses
71 | foreach (var jobProgress in heartbeat.JobProgress) {
72 | Job curJob = dao.GetJob(jobProgress.Key);
73 | if (curJob == null) {
74 | // job does not exist in db
75 | actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
76 | Logger.Error("Job does not exist in DB: " + jobProgress.Key);
77 | } else {
78 | if (curJob.CurrentStateLog.SlaveId == Guid.Empty || curJob.CurrentStateLog.SlaveId != heartbeat.SlaveId) {
79 | // assigned slave does not match heartbeat
80 | actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
81 | Logger.Error("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
82 | } else {
83 | // save job execution time
84 | curJob.ExecutionTime = jobProgress.Value;
85 | curJob.LastHeartbeat = DateTime.Now;
86 |
87 | if (curJob.State == JobState.Aborted) {
88 | // a request to abort the job has been set
89 | actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
90 | } else if (curJob.State != JobState.Calculating) {
91 | // jobstate was 'Transferring' before, now calculating
92 | curJob.SetState(JobState.Calculating, heartbeat.SlaveId, "");
93 | }
94 | dao.UpdateJob(curJob);
95 | }
96 | }
97 | }
98 | return actions;
99 | }
100 |
101 | /// <summary>
102 | /// Returns true if there are enough resources to send a job
103 | /// There should not be too many jobs sent simultaniously
104 | /// </summary>
105 | private bool IsAllowedToSendJobs() {
106 | return true; // JobsCurrentlyTransferring < ApplicationConstants.MaxJobTransferCount;
107 | // Todo: see if unlimited job transfer count works. if not, look into db and count jobs in state Transferring
108 | }
109 | }
110 | }