Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs @ 8997

Last change on this file since 8997 was 8997, checked in by ascheibe, 10 years ago

#1712 removed unnecessary query because scheduling is now serialized

File size: 8.0 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Services.Hive.DataTransfer;
27using DA = HeuristicLab.Services.Hive.DataAccess;
28
29namespace HeuristicLab.Services.Hive {
30  public class HeartbeatManager {
31    private const string MutexName = "HiveTaskSchedulingMutex";
32
33    private IHiveDao dao {
34      get { return ServiceLocator.Instance.HiveDao; }
35    }
36    private ITaskScheduler taskScheduler {
37      get { return ServiceLocator.Instance.TaskScheduler; }
38    }
39
40    /// <summary>
41    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
42    /// </summary>
43    /// <returns>a list of actions the slave should do</returns>
44    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
45      List<MessageContainer> actions = new List<MessageContainer>();
46      Slave slave = dao.GetSlave(heartbeat.SlaveId);
47      if (slave == null) {
48        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
49      } else {
50        if (heartbeat.HbInterval != slave.HbInterval) {
51          actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
52        }
53        if (ShutdownSlaveComputer(slave.Id)) {
54          actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
55        }
56
57        // update slave data
58        slave.FreeCores = heartbeat.FreeCores;
59        slave.FreeMemory = heartbeat.FreeMemory;
60        slave.CpuUtilization = heartbeat.CpuUtilization;
61        slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.Id);
62        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
63        slave.LastHeartbeat = DateTime.Now;
64        dao.UpdateSlave(slave);
65
66        // update task data
67        actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate));
68
69        // assign new task
70        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
71          bool mutexAquired = false;
72          var mutex = new Mutex(false, MutexName);
73          try {
74            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
75            if (!mutexAquired)
76              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
77            else {
78              var availableJobs = taskScheduler.Schedule(dao.GetWaitingTasks(slave));
79              if (availableJobs.Any()) {
80                var job = availableJobs.First();
81                AssignJob(slave, job);
82                actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, job.Id));
83              }
84            }
85          }
86          catch (AbandonedMutexException) {
87            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
88          }
89          finally {
90            if (mutexAquired) mutex.ReleaseMutex();
91          }
92        }
93      }
94      return actions;
95    }
96
97    private void AssignJob(Slave slave, Task task) {
98      task = dao.UpdateTaskState(task.Id, DataAccess.TaskState.Transferring, slave.Id, null, null);
99
100      // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
101      task.LastHeartbeat = DateTime.Now;
102      dao.UpdateTask(task);
103    }
104
105    /// <summary>
106    /// Update the progress of each task
107    /// Checks if all the task sent by heartbeat are supposed to be calculated by this slave
108    /// </summary>
109    private IEnumerable<MessageContainer> UpdateTasks(Heartbeat heartbeat, bool IsAllowedToCalculate) {
110      List<MessageContainer> actions = new List<MessageContainer>();
111
112      if (heartbeat.JobProgress == null)
113        return actions;
114
115      if (!IsAllowedToCalculate && heartbeat.JobProgress.Count != 0) {
116        actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll));
117      } else {
118        // process the jobProgresses
119        foreach (var jobProgress in heartbeat.JobProgress) {
120          Task curTask = dao.GetTask(jobProgress.Key);
121          if (curTask == null) {
122            // task does not exist in db
123            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key));
124            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
125          } else {
126            if (curTask.CurrentStateLog.SlaveId == Guid.Empty || curTask.CurrentStateLog.SlaveId != heartbeat.SlaveId) {
127              // assigned slave does not match heartbeat
128              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id));
129              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask);
130            } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) {
131              // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
132              actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id));
133            } else {
134              // save task execution time
135              curTask.ExecutionTime = jobProgress.Value;
136              curTask.LastHeartbeat = DateTime.Now;
137
138              switch (curTask.Command) {
139                case Command.Stop:
140                  actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.Id));
141                  break;
142                case Command.Pause:
143                  actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id));
144                  break;
145                case Command.Abort:
146                  actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id));
147                  break;
148              }
149              dao.UpdateTask(curTask);
150            }
151          }
152        }
153      }
154      return actions;
155    }
156
157    private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) {
158      var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id);
159      var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id);
160      return assignedResourceIds.Any(x => slaveResourceIds.Contains(x));
161    }
162
163    private bool SlaveIsAllowedToCalculate(Guid slaveId) {
164      // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also
165      return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0);
166    }
167
168    private bool ShutdownSlaveComputer(Guid slaveId) {
169      return dao.GetParentResources(slaveId).Any(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() != 0);
170    }
171  }
172}
Note: See TracBrowser for help on using the repository browser.