Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3107_LearningALPS/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs @ 17912

Last change on this file since 17912 was 17575, checked in by jkarder, 5 years ago

#3072: fixed statelog updates

  • fixed UpdateTasks so that it selects the correct last slave id for a specified task
  • introduced check if a slave is allowed to update a task's state (IsAuthorizedForTask)
File size: 9.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Services.Hive.DataAccess;
27using HeuristicLab.Services.Hive.DataAccess.Interfaces;
28using HeuristicLab.Services.Hive.DataTransfer;
29using DA = HeuristicLab.Services.Hive.DataAccess;
30
31namespace HeuristicLab.Services.Hive.Manager {
32  public class HeartbeatManager {
33    private const string MutexName = "HiveTaskSchedulingMutex";
34
35    private IPersistenceManager PersistenceManager {
36      get { return ServiceLocator.Instance.PersistenceManager; }
37    }
38
39    private ITaskScheduler TaskScheduler {
40      get { return ServiceLocator.Instance.TaskScheduler; }
41    }
42
43    /// <summary>
44    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
45    /// </summary>
46    /// <returns>a list of actions the slave should do</returns>
47    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
48      List<MessageContainer> actions = new List<MessageContainer>();
49      var pm = PersistenceManager;
50      var slaveDao = pm.SlaveDao;
51      var taskDao = pm.TaskDao;
52      var slave = pm.UseTransaction(() => slaveDao.GetById(heartbeat.SlaveId));
53      if (slave == null) {
54        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
55      } else {
56        if (heartbeat.HbInterval != slave.HbInterval) {
57          actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
58        }
59        if (slaveDao.SlaveHasToShutdownComputer(slave.ResourceId)) {
60          actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
61        }
62        // update slave data 
63        slave.FreeCores = heartbeat.FreeCores;
64        slave.FreeMemory = heartbeat.FreeMemory;
65        slave.CpuUtilization = heartbeat.CpuUtilization;
66        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0)
67          ? DA.SlaveState.Calculating
68          : DA.SlaveState.Idle;
69        slave.LastHeartbeat = DateTime.Now;
70        pm.UseTransaction(() => {
71          slave.IsAllowedToCalculate = slaveDao.SlaveIsAllowedToCalculate(slave.ResourceId);
72          pm.SubmitChanges();
73        });
74
75        // update task data
76        actions.AddRange(UpdateTasks(pm, heartbeat, slave.IsAllowedToCalculate));
77
78        // assign new task
79        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
80          bool mutexAquired = false;
81          var mutex = new Mutex(false, MutexName);
82          try {
83            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
84            if (mutexAquired) {
85              var scheduledTaskIds = TaskScheduler.Schedule(slave, 1).ToArray();
86              foreach (var id in scheduledTaskIds) {
87                actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, id));
88              }
89            } else {
90              LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager: The mutex used for scheduling could not be aquired. (HB from Slave {slave.ResourceId})");
91            }
92          } catch (AbandonedMutexException) {
93            LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager: The mutex used for scheduling has been abandoned. (HB from Slave {slave.ResourceId})");
94          } catch (Exception ex) {
95            LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager threw an exception in ProcessHeartbeat (HB from Slave {slave.ResourceId}): {ex}");
96          } finally {
97            if (mutexAquired) mutex.ReleaseMutex();
98          }
99        }
100      }
101      return actions;
102    }
103
104    /// <summary>
105    /// Update the progress of each task
106    /// Checks if all the task sent by heartbeat are supposed to be calculated by this slave
107    /// </summary>
108    private IEnumerable<MessageContainer> UpdateTasks(IPersistenceManager pm, Heartbeat heartbeat, bool isAllowedToCalculate) {
109      var taskDao = pm.TaskDao;
110      var jobDao = pm.JobDao;
111      var assignedJobResourceDao = pm.AssignedJobResourceDao;
112      var actions = new List<MessageContainer>();
113      if (heartbeat.JobProgress == null || !heartbeat.JobProgress.Any())
114        return actions;
115
116      var jobIdsWithStatisticsPending = jobDao.GetJobIdsByState(DA.JobState.StatisticsPending).ToList();
117
118      // select all tasks and statelogs with one query
119      var taskIds = heartbeat.JobProgress.Select(x => x.Key).ToList();
120      var taskInfos = pm.UseTransaction(() =>
121        (from task in taskDao.GetAll()
122         where taskIds.Contains(task.TaskId)
123         let lastStateLog = task.StateLogs.OrderByDescending(x => x.DateTime).FirstOrDefault(x => x.State == DA.TaskState.Transferring)
124         select new {
125           TaskId = task.TaskId,
126           JobId = task.JobId,
127           State = task.State,
128           Command = task.Command,
129           SlaveId = lastStateLog != null ? lastStateLog.SlaveId : Guid.Empty
130         }).ToList()
131      );
132
133      // process the jobProgresses
134      foreach (var jobProgress in heartbeat.JobProgress) {
135        var progress = jobProgress;
136        var curTask = taskInfos.SingleOrDefault(x => x.TaskId == progress.Key);
137        if (curTask == null) {
138          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, progress.Key));
139          LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
140        } else if (jobIdsWithStatisticsPending.Contains(curTask.JobId)) {
141          // parenting job of current task has been requested for deletion (indicated by job state "Statistics Pending")
142          // update task execution time
143          pm.UseTransaction(() => {
144            taskDao.UpdateExecutionTime(curTask.TaskId, progress.Value.TotalMilliseconds);
145          });
146          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
147          LogFactory.GetLogger(this.GetType().Namespace).Log("Abort task " + curTask.TaskId + " on slave " + heartbeat.SlaveId + ". The parenting job " + curTask.JobId + " was requested to be deleted.");
148        } else if (curTask.SlaveId == Guid.Empty || curTask.SlaveId != heartbeat.SlaveId) {
149          // assigned slave does not match heartbeat
150          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
151          LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask.TaskId);
152        } else if (!isAllowedToCalculate) {
153          actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
154          LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not allowed to calculate any tasks tue to a downtime. The task is paused.");
155        } else if (!assignedJobResourceDao.CheckJobGrantedForResource(curTask.JobId, heartbeat.SlaveId)) {
156          // slaveId (and parent resourceGroupIds) are not among the assigned resources ids for task-parenting job
157          // this might happen when (a) job-resource assignment has been changed (b) slave is moved to different group
158          actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
159          LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not granted to calculate task: " + curTask.TaskId + " of job: " + curTask.JobId);
160        } else {
161          // update task execution time
162          pm.UseTransaction(() => {
163            taskDao.UpdateExecutionTime(curTask.TaskId, progress.Value.TotalMilliseconds);
164          });
165          switch (curTask.Command) {
166            case DA.Command.Stop:
167              actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId));
168              break;
169            case DA.Command.Pause:
170              actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
171              break;
172            case DA.Command.Abort:
173              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
174              break;
175          }
176        }
177       
178      }
179      return actions;
180    }
181  }
182}
Note: See TracBrowser for help on using the repository browser.