Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs @ 6725

Last change on this file since 6725 was 6725, checked in by ascheibe, 13 years ago

#1233 more renaming for more consistency

File size: 6.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using HeuristicLab.Services.Hive.DataTransfer;
26using DA = HeuristicLab.Services.Hive.DataAccess;
27
28namespace HeuristicLab.Services.Hive {
29  public class HeartbeatManager {
30    private IHiveDao dao {
31      get { return ServiceLocator.Instance.HiveDao; }
32    }
33    private IAuthorizationManager auth {
34      get { return ServiceLocator.Instance.AuthorizationManager; }
35    }
36
37    /// <summary>
38    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
39    /// </summary>
40    /// <returns>a list of actions the slave should do</returns>
41    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
42      List<MessageContainer> actions = new List<MessageContainer>();
43      Slave slave = dao.GetSlave(heartbeat.SlaveId);
44      if (slave == null) {
45        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
46      } else {
47        // update slave data
48        slave.FreeCores = heartbeat.FreeCores;
49        slave.FreeMemory = heartbeat.FreeMemory;
50        slave.CpuUtilization = heartbeat.CpuUtilization;
51        slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.Id);
52        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
53        slave.LastHeartbeat = DateTime.Now;
54        dao.UpdateSlave(slave);
55
56        // update job data
57        actions.AddRange(UpdateJobs(heartbeat, slave.IsAllowedToCalculate));
58
59        // assign new job
60        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
61          var availableJobs = dao.GetWaitingJobs(slave, 1);
62          if (availableJobs.Count() > 0) {
63            var job = availableJobs.First();
64            if (AssignJob(slave, job))
65              actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, job.Id));
66          }
67        }
68      }
69      return actions;
70    }
71
72    // returns true if assignment was successful
73    private bool AssignJob(Slave slave, Task job) {
74      // load job again and check if it is still available (this is an attempt to reduce the race condition which causes multiple heartbeats to get the same job assigned)
75      if (dao.GetJob(job.Id).State != TaskState.Waiting) return false;
76
77      job = dao.UpdateJobState(job.Id, DataAccess.TaskState.Transferring, slave.Id, null, null);
78
79      // from now on the job has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
80      job.LastHeartbeat = DateTime.Now;
81      dao.UpdateJob(job);
82      return true;
83    }
84
85    /// <summary>
86    /// Update the progress of each job
87    /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
88    /// </summary>
89    private IEnumerable<MessageContainer> UpdateJobs(Heartbeat heartbeat, bool IsAllowedToCalculate) {
90      List<MessageContainer> actions = new List<MessageContainer>();
91
92      if (heartbeat.JobProgress == null)
93        return actions;
94
95      if (!IsAllowedToCalculate) {
96        actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll));
97      } else {
98        // process the jobProgresses
99        foreach (var jobProgress in heartbeat.JobProgress) {
100          Task curJob = dao.GetJob(jobProgress.Key);
101          if (curJob == null) {
102            // job does not exist in db
103            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key));
104            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Job does not exist in DB: " + jobProgress.Key);
105          } else {
106            if (curJob.CurrentStateLog.SlaveId == Guid.Empty || curJob.CurrentStateLog.SlaveId != heartbeat.SlaveId) {
107              // assigned slave does not match heartbeat
108              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curJob.Id));
109              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
110            } else if (!JobIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curJob)) {
111              // assigned resources ids of job do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
112              actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curJob.Id));
113            } else {
114              // save job execution time
115              curJob.ExecutionTime = jobProgress.Value;
116              curJob.LastHeartbeat = DateTime.Now;
117
118              switch (curJob.Command) {
119                case Command.Stop:
120                  actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curJob.Id));
121                  break;
122                case Command.Pause:
123                  actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curJob.Id));
124                  break;
125                case Command.Abort:
126                  actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curJob.Id));
127                  break;
128              }
129              dao.UpdateJob(curJob);
130            }
131          }
132        }
133      }
134      return actions;
135    }
136
137    private bool JobIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curJob) {
138      var assignedResourceIds = dao.GetAssignedResources(curJob.Id).Select(x => x.Id);
139      var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id);
140      return assignedResourceIds.Any(x => slaveResourceIds.Contains(x));
141    }
142
143    private bool SlaveIsAllowedToCalculate(Guid slaveId) {
144      // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also
145      return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0);
146    }
147  }
148}
Note: See TracBrowser for help on using the repository browser.