Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs @ 8995

Last change on this file since 8995 was 8995, checked in by ascheibe, 11 years ago

#1712 merged trunk into branch

File size: 8.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Services.Hive.DataTransfer;
27using DA = HeuristicLab.Services.Hive.DataAccess;
28
29namespace HeuristicLab.Services.Hive {
30  public class HeartbeatManager {
31    private const string MutexName = "HiveTaskSchedulingMutex";
32
33    private IHiveDao dao {
34      get { return ServiceLocator.Instance.HiveDao; }
35    }
36    private ITaskScheduler taskScheduler {
37      get { return ServiceLocator.Instance.TaskScheduler; }
38    }
39
40    /// <summary>
41    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
42    /// </summary>
43    /// <returns>a list of actions the slave should do</returns>
44    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
45      List<MessageContainer> actions = new List<MessageContainer>();
46      Slave slave = dao.GetSlave(heartbeat.SlaveId);
47      if (slave == null) {
48        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
49      } else {
50        if (heartbeat.HbInterval != slave.HbInterval) {
51          actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
52        }
53        if (ShutdownSlaveComputer(slave.Id)) {
54          actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
55        }
56
57        // update slave data
58        slave.FreeCores = heartbeat.FreeCores;
59        slave.FreeMemory = heartbeat.FreeMemory;
60        slave.CpuUtilization = heartbeat.CpuUtilization;
61        slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.Id);
62        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
63        slave.LastHeartbeat = DateTime.Now;
64        dao.UpdateSlave(slave);
65
66        // update task data
67        actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate));
68
69        // assign new task
70        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
71          bool mutexAquired = false;
72          var mutex = new Mutex(false, MutexName);
73          try {
74            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
75            if (!mutexAquired)
76              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
77            else {
78              var availableJobs = taskScheduler.Schedule(dao.GetWaitingTasks(slave));
79              if (availableJobs.Any()) {
80                var job = availableJobs.First();
81                if (AssignJob(slave, job))
82                  actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, job.Id));
83              }
84            }
85          }
86          catch (AbandonedMutexException) {
87            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
88          }
89          finally {
90            if (mutexAquired) mutex.ReleaseMutex();
91          }
92        }
93      }
94      return actions;
95    }
96
97    // returns true if assignment was successful
98    private bool AssignJob(Slave slave, Task task) {
99      // load task again and check if it is still available (this is an attempt to reduce the race condition which causes multiple heartbeats to get the same task assigned)
100      if (dao.GetTask(task.Id).State != TaskState.Waiting) return false;
101
102      task = dao.UpdateTaskState(task.Id, DataAccess.TaskState.Transferring, slave.Id, null, null);
103
104      // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
105      task.LastHeartbeat = DateTime.Now;
106      dao.UpdateTask(task);
107      return true;
108    }
109
110    /// <summary>
111    /// Update the progress of each task
112    /// Checks if all the task sent by heartbeat are supposed to be calculated by this slave
113    /// </summary>
114    private IEnumerable<MessageContainer> UpdateTasks(Heartbeat heartbeat, bool IsAllowedToCalculate) {
115      List<MessageContainer> actions = new List<MessageContainer>();
116
117      if (heartbeat.JobProgress == null)
118        return actions;
119
120      if (!IsAllowedToCalculate && heartbeat.JobProgress.Count != 0) {
121        actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll));
122      } else {
123        // process the jobProgresses
124        foreach (var jobProgress in heartbeat.JobProgress) {
125          Task curTask = dao.GetTask(jobProgress.Key);
126          if (curTask == null) {
127            // task does not exist in db
128            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key));
129            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
130          } else {
131            if (curTask.CurrentStateLog.SlaveId == Guid.Empty || curTask.CurrentStateLog.SlaveId != heartbeat.SlaveId) {
132              // assigned slave does not match heartbeat
133              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id));
134              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask);
135            } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) {
136              // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
137              actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id));
138            } else {
139              // save task execution time
140              curTask.ExecutionTime = jobProgress.Value;
141              curTask.LastHeartbeat = DateTime.Now;
142
143              switch (curTask.Command) {
144                case Command.Stop:
145                  actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.Id));
146                  break;
147                case Command.Pause:
148                  actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id));
149                  break;
150                case Command.Abort:
151                  actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id));
152                  break;
153              }
154              dao.UpdateTask(curTask);
155            }
156          }
157        }
158      }
159      return actions;
160    }
161
162    private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) {
163      var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id);
164      var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id);
165      return assignedResourceIds.Any(x => slaveResourceIds.Contains(x));
166    }
167
168    private bool SlaveIsAllowedToCalculate(Guid slaveId) {
169      // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also
170      return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0);
171    }
172
173    private bool ShutdownSlaveComputer(Guid slaveId) {
174      return dao.GetParentResources(slaveId).Any(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() != 0);
175    }
176  }
177}
Note: See TracBrowser for help on using the repository browser.