Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs @ 9391

Last change on this file since 9391 was 9391, checked in by pfleck, 11 years ago

#2030
Separated old DTO-Dao from new Dao. DTO-Dao should be replaced completely.
Heartbeat and UpdateTaskState uses new Dao.
DataContext is now closed on ServiceOperation end.

File size: 8.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Services.Hive.DataAccess;
27using Heartbeat = HeuristicLab.Services.Hive.DataTransfer.Heartbeat;
28
29namespace HeuristicLab.Services.Hive {
30  public class HeartbeatManager {
31    private const string MutexName = "HiveTaskSchedulingMutex";
32
33    private IHiveDao dao {
34      get { return ServiceLocator.Instance.HiveDao; }
35    }
36    private ITaskScheduler taskScheduler {
37      get { return ServiceLocator.Instance.TaskScheduler; }
38    }
39    private DataAccess.ITransactionManager trans {
40      get { return ServiceLocator.Instance.TransactionManager; }
41    }
42
43    /// <summary>
44    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
45    /// </summary>
46    /// <returns>a list of actions the slave should do</returns>
47    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
48      List<MessageContainer> actions = new List<MessageContainer>();
49
50      Slave slave = null;
51      trans.UseTransaction(() => {
52        slave = dao.GetSlaveById(heartbeat.SlaveId);
53
54        if (slave == null) {
55          actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
56        } else {
57          if (heartbeat.HbInterval != slave.HbInterval) {
58            actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
59          }
60          if (ShutdownSlaveComputer(slave.ResourceId)) {
61            actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
62          }
63
64          // update slave data
65          slave.FreeCores = heartbeat.FreeCores;
66          slave.FreeMemory = heartbeat.FreeMemory;
67          slave.CpuUtilization = heartbeat.CpuUtilization;
68          slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.ResourceId);
69          slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
70          slave.LastHeartbeat = DateTime.Now;
71
72          dao.UpdateSlave(slave);
73        }
74      });
75
76      if (slave != null) {
77        // update task data
78        actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate));
79
80        // assign new task
81        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
82          bool mutexAquired = false;
83          var mutex = new Mutex(false, MutexName);
84          try {
85
86            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
87            if (!mutexAquired)
88              LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
89            else {
90              trans.UseTransaction(() => {
91                IEnumerable<TaskInfoForScheduler> availableTasks = null;
92                availableTasks = taskScheduler.Schedule(dao.GetWaitingTasks(slave).ToArray());
93                if (availableTasks.Any()) {
94                  var task = availableTasks.First();
95                  AssignJob(slave, task.TaskId);
96                  actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
97                }
98              });
99            }
100          }
101          catch (AbandonedMutexException) {
102            LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
103          }
104          catch (Exception ex) {
105            LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());
106          }
107          finally {
108            if (mutexAquired) mutex.ReleaseMutex();
109          }
110        }
111      }
112      return actions;
113    }
114
115    private void AssignJob(Slave slave, Guid taskId) {
116      var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.ResourceId, null, null);
117
118      // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
119      task.LastHeartbeat = DateTime.Now;
120      dao.UpdateTask(task);
121    }
122
123    /// <summary>
124    /// Update the progress of each task
125    /// Checks if all the task sent by heartbeat are supposed to be calculated by this slave
126    /// </summary>
127    private IEnumerable<MessageContainer> UpdateTasks(Heartbeat heartbeat, bool IsAllowedToCalculate) {
128      List<MessageContainer> actions = new List<MessageContainer>();
129
130      if (heartbeat.JobProgress == null)
131        return actions;
132
133      if (!IsAllowedToCalculate && heartbeat.JobProgress.Count != 0) {
134        actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll));
135      } else {
136        // process the jobProgresses
137        foreach (var jobProgress in heartbeat.JobProgress) {
138          trans.UseTransaction(() => {
139            var curTask = dao.GetTaskById(jobProgress.Key);
140            if (curTask == null) {
141              // task does not exist in db
142              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key));
143              LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
144            } else {
145              var currentStateLog = curTask.StateLogs.Last();
146              if (currentStateLog.SlaveId == Guid.Empty || currentStateLog.SlaveId != heartbeat.SlaveId) {
147                // assigned slave does not match heartbeat
148                actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
149                LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask);
150              } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) {
151                // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
152                actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
153              } else {
154                // save task execution time
155                curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds;
156                curTask.LastHeartbeat = DateTime.Now;
157
158                switch (curTask.Command) {
159                  case Command.Stop:
160                    actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId));
161                    break;
162                  case Command.Pause:
163                    actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
164                    break;
165                  case Command.Abort:
166                    actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
167                    break;
168                }
169                dao.UpdateTask(curTask);
170              }
171            }
172          });
173        }
174      }
175      return actions;
176    }
177
178    private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) {
179      var assignedResourceIds = dao.GetAssignedResourcesIds(curTask.TaskId);
180      var slaveResourceIds = dao.GetParentResourcesIDs(slaveId).ToArray();
181      return assignedResourceIds.Any(r => slaveResourceIds.Contains(r));
182    }
183
184    private bool SlaveIsAllowedToCalculate(Guid slaveId) {
185      var downtimes = dao.GetNumberOfDowntimesFromParentResourcesAtCurrentTime(slaveId, DowntimeType.Offline);
186      // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also
187      return downtimes.All(x => x == 0);
188    }
189
190    private bool ShutdownSlaveComputer(Guid slaveId) {
191      var downtimes = dao.GetNumberOfDowntimesFromParentResourcesAtCurrentTime(slaveId, DowntimeType.Shutdown);
192      return downtimes.Any(x => x != 0);
193    }
194  }
195}
Note: See TracBrowser for help on using the repository browser.