Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs @ 16725

Last change on this file since 16725 was 16565, checked in by gkronber, 6 years ago

#2520: merged changes from PersistenceOverhaul branch (r16451:16564) into trunk

File size: 10.2 KB
RevLine 
[12691]1#region License Information
2/* HeuristicLab
[16565]3 * Copyright (C) 2002-2019 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
[12691]4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Services.Hive.DataAccess;
27using HeuristicLab.Services.Hive.DataAccess.Interfaces;
28using HeuristicLab.Services.Hive.DataTransfer;
29using DA = HeuristicLab.Services.Hive.DataAccess;
30
31namespace HeuristicLab.Services.Hive.Manager {
[12861]32  public class HeartbeatManager {
[12691]33    private const string MutexName = "HiveTaskSchedulingMutex";
34
35    private IPersistenceManager PersistenceManager {
36      get { return ServiceLocator.Instance.PersistenceManager; }
37    }
38
39    private ITaskScheduler TaskScheduler {
[12861]40      get { return ServiceLocator.Instance.TaskScheduler; }
[12691]41    }
42
43    /// <summary>
44    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
45    /// </summary>
46    /// <returns>a list of actions the slave should do</returns>
47    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
48      List<MessageContainer> actions = new List<MessageContainer>();
[12858]49      var pm = PersistenceManager;
50      var slaveDao = pm.SlaveDao;
51      var taskDao = pm.TaskDao;
52      var slave = pm.UseTransaction(() => slaveDao.GetById(heartbeat.SlaveId));
53      if (slave == null) {
54        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
55      } else {
56        if (heartbeat.HbInterval != slave.HbInterval) {
57          actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
58        }
59        if (slaveDao.SlaveHasToShutdownComputer(slave.ResourceId)) {
60          actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
61        }
62        // update slave data 
63        slave.FreeCores = heartbeat.FreeCores;
64        slave.FreeMemory = heartbeat.FreeMemory;
65        slave.CpuUtilization = heartbeat.CpuUtilization;
66        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0)
67          ? DA.SlaveState.Calculating
68          : DA.SlaveState.Idle;
69        slave.LastHeartbeat = DateTime.Now;
70        pm.UseTransaction(() => {
71          slave.IsAllowedToCalculate = slaveDao.SlaveIsAllowedToCalculate(slave.ResourceId);
72          pm.SubmitChanges();
73        });
[12691]74
[12858]75        // update task data
76        actions.AddRange(UpdateTasks(pm, heartbeat, slave.IsAllowedToCalculate));
[12691]77
[12858]78        // assign new task
79        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
80          bool mutexAquired = false;
81          var mutex = new Mutex(false, MutexName);
82          try {
83            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
84            if (mutexAquired) {
85              var waitingTasks = pm.UseTransaction(() => taskDao.GetWaitingTasks(slave)
86                  .Select(x => new TaskInfoForScheduler {
87                    TaskId = x.TaskId,
88                    JobId = x.JobId,
89                    Priority = x.Priority
90                  })
91                  .ToList()
92              );
93              var availableTasks = TaskScheduler.Schedule(waitingTasks).ToArray();
94              if (availableTasks.Any()) {
95                var task = availableTasks.First();
96                AssignTask(pm, slave, task.TaskId);
97                actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
[12691]98              }
[12858]99            } else {
100              LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
[12691]101            }
102          }
[12858]103          catch (AbandonedMutexException) {
104            LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
105          }
106          catch (Exception ex) {
107            LogFactory.GetLogger(this.GetType().Namespace).Log(string.Format("HeartbeatManager threw an exception in ProcessHeartbeat: {0}", ex));
108          }
109          finally {
110            if (mutexAquired) mutex.ReleaseMutex();
111          }
[12691]112        }
113      }
114      return actions;
115    }
116
117    private void AssignTask(IPersistenceManager pm, DA.Slave slave, Guid taskId) {
118      const DA.TaskState transferring = DA.TaskState.Transferring;
119      DateTime now = DateTime.Now;
120      var taskDao = pm.TaskDao;
121      var stateLogDao = pm.StateLogDao;
122      pm.UseTransaction(() => {
123        var task = taskDao.GetById(taskId);
124        stateLogDao.Save(new DA.StateLog {
125          State = transferring,
126          DateTime = now,
127          TaskId = taskId,
128          SlaveId = slave.ResourceId,
129          UserId = null,
130          Exception = null
131        });
132        task.State = transferring;
133        task.LastHeartbeat = now;
134        pm.SubmitChanges();
135      });
136    }
137
138    /// <summary>
139    /// Update the progress of each task
140    /// Checks if all the task sent by heartbeat are supposed to be calculated by this slave
141    /// </summary>
142    private IEnumerable<MessageContainer> UpdateTasks(IPersistenceManager pm, Heartbeat heartbeat, bool isAllowedToCalculate) {
143      var taskDao = pm.TaskDao;
[16117]144      var jobDao = pm.JobDao;
145      var assignedJobResourceDao = pm.AssignedJobResourceDao;
[12691]146      var actions = new List<MessageContainer>();
[12773]147      if (heartbeat.JobProgress == null || !heartbeat.JobProgress.Any())
[12691]148        return actions;
149
[16117]150      var jobIdsWithStatisticsPending = jobDao.GetJobIdsByState(DA.JobState.StatisticsPending).ToList();
[12691]151
[16117]152      // select all tasks and statelogs with one query
153      var taskIds = heartbeat.JobProgress.Select(x => x.Key).ToList();
154      var taskInfos = pm.UseTransaction(() =>
155        (from task in taskDao.GetAll()
156          where taskIds.Contains(task.TaskId)
157          let lastStateLog = task.StateLogs.OrderByDescending(x => x.DateTime).FirstOrDefault()
158          select new {
159            TaskId = task.TaskId,
160            JobId = task.JobId,
161            State = task.State,
162            Command = task.Command,
163            SlaveId = lastStateLog != null ? lastStateLog.SlaveId : default(Guid)
164          }).ToList()
165      );
166
167      // process the jobProgresses
168      foreach (var jobProgress in heartbeat.JobProgress) {
169        var progress = jobProgress;
170        var curTask = taskInfos.SingleOrDefault(x => x.TaskId == progress.Key);
171        if (curTask == null) {
172          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, progress.Key));
173          LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
174        } else if (jobIdsWithStatisticsPending.Contains(curTask.JobId)) {
175          // parenting job of current task has been requested for deletion (indicated by job state "Statistics Pending")
176          // update task execution time
177          pm.UseTransaction(() => {
178            taskDao.UpdateExecutionTime(curTask.TaskId, progress.Value.TotalMilliseconds);
179          });
180          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
181          LogFactory.GetLogger(this.GetType().Namespace).Log("Abort task " + curTask.TaskId + " on slave " + heartbeat.SlaveId + ". The parenting job " + curTask.JobId + " was requested to be deleted.");
182        } else if (curTask.SlaveId == Guid.Empty || curTask.SlaveId != heartbeat.SlaveId) {
183          // assigned slave does not match heartbeat
184          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
185          LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask.TaskId);
186        } else if (!isAllowedToCalculate) {
187          actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
188          LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not allowed to calculate any tasks tue to a downtime. The task is paused.");
189        } else if (!assignedJobResourceDao.CheckJobGrantedForResource(curTask.JobId, heartbeat.SlaveId)) {
190          // slaveId (and parent resourceGroupIds) are not among the assigned resources ids for task-parenting job
191          // this might happen when (a) job-resource assignment has been changed (b) slave is moved to different group
192          actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
193          LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not granted to calculate task: " + curTask.TaskId + " of job: " + curTask.JobId);
194        } else {
195          // update task execution time
196          pm.UseTransaction(() => {
197            taskDao.UpdateExecutionTime(curTask.TaskId, progress.Value.TotalMilliseconds);
198          });
199          switch (curTask.Command) {
200            case DA.Command.Stop:
201              actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId));
202              break;
203            case DA.Command.Pause:
204              actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
205              break;
206            case DA.Command.Abort:
[12691]207              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
[16117]208              break;
[12691]209          }
210        }
[16117]211       
212      }
[12691]213      return actions;
214    }
215  }
216}
Note: See TracBrowser for help on using the repository browser.