Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs @ 6983

Last change on this file since 6983 was 6983, checked in by ascheibe, 12 years ago

#1672

  • added the Hive Services and Slave projects
  • added missing svn ignores
File size: 7.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using HeuristicLab.Services.Hive.DataTransfer;
26using DA = HeuristicLab.Services.Hive.DataAccess;
27
28namespace HeuristicLab.Services.Hive {
29  public class HeartbeatManager {
30    private IHiveDao dao {
31      get { return ServiceLocator.Instance.HiveDao; }
32    }
33    private IAuthorizationManager auth {
34      get { return ServiceLocator.Instance.AuthorizationManager; }
35    }
36
37    /// <summary>
38    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
39    /// </summary>
40    /// <returns>a list of actions the slave should do</returns>
41    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
42      List<MessageContainer> actions = new List<MessageContainer>();
43      Slave slave = dao.GetSlave(heartbeat.SlaveId);
44      if (slave == null) {
45        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
46      } else {
47        Slave s = dao.GetSlave(heartbeat.SlaveId);
48        if (s != null) {
49          if (heartbeat.HbInterval != s.HbInterval) {
50            actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
51          }
52        }
53
54        // update slave data
55        slave.FreeCores = heartbeat.FreeCores;
56        slave.FreeMemory = heartbeat.FreeMemory;
57        slave.CpuUtilization = heartbeat.CpuUtilization;
58        slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.Id);
59        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
60        slave.LastHeartbeat = DateTime.Now;
61        dao.UpdateSlave(slave);
62
63        // update task data
64        actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate));
65
66        // assign new task
67        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
68          var availableJobs = dao.GetWaitingTasks(slave, 1);
69          if (availableJobs.Count() > 0) {
70            var job = availableJobs.First();
71            if (AssignJob(slave, job))
72              actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, job.Id));
73          }
74        }
75      }
76      return actions;
77    }
78
79    // returns true if assignment was successful
80    private bool AssignJob(Slave slave, Task task) {
81      // load task again and check if it is still available (this is an attempt to reduce the race condition which causes multiple heartbeats to get the same task assigned)
82      if (dao.GetTask(task.Id).State != TaskState.Waiting) return false;
83
84      task = dao.UpdateTaskState(task.Id, DataAccess.TaskState.Transferring, slave.Id, null, null);
85
86      // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
87      task.LastHeartbeat = DateTime.Now;
88      dao.UpdateTask(task);
89      return true;
90    }
91
92    /// <summary>
93    /// Update the progress of each task
94    /// Checks if all the task sent by heartbeat are supposed to be calculated by this slave
95    /// </summary>
96    private IEnumerable<MessageContainer> UpdateTasks(Heartbeat heartbeat, bool IsAllowedToCalculate) {
97      List<MessageContainer> actions = new List<MessageContainer>();
98
99      if (heartbeat.JobProgress == null)
100        return actions;
101
102      if (!IsAllowedToCalculate && heartbeat.JobProgress.Count != 0) {
103        actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll));
104      } else {
105        // process the jobProgresses
106        foreach (var jobProgress in heartbeat.JobProgress) {
107          Task curJob = dao.GetTask(jobProgress.Key);
108          if (curJob == null) {
109            // task does not exist in db
110            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key));
111            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Job does not exist in DB: " + jobProgress.Key);
112          } else {
113            if (curJob.CurrentStateLog.SlaveId == Guid.Empty || curJob.CurrentStateLog.SlaveId != heartbeat.SlaveId) {
114              // assigned slave does not match heartbeat
115              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curJob.Id));
116              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
117            } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curJob)) {
118              // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
119              actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curJob.Id));
120            } else {
121              // save task execution time
122              curJob.ExecutionTime = jobProgress.Value;
123              curJob.LastHeartbeat = DateTime.Now;
124
125              switch (curJob.Command) {
126                case Command.Stop:
127                  actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curJob.Id));
128                  break;
129                case Command.Pause:
130                  actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curJob.Id));
131                  break;
132                case Command.Abort:
133                  actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curJob.Id));
134                  break;
135              }
136              dao.UpdateTask(curJob);
137            }
138          }
139        }
140      }
141      return actions;
142    }
143
144    private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) {
145      var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id);
146      var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id);
147      return assignedResourceIds.Any(x => slaveResourceIds.Contains(x));
148    }
149
150    private bool SlaveIsAllowedToCalculate(Guid slaveId) {
151      // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also
152      return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0);
153    }
154  }
155}
Note: See TracBrowser for help on using the repository browser.