using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using HeuristicLab.Services.Hive.DataAccess;
using HeuristicLab.Services.Hive.DataAccess.Interfaces;
using HeuristicLab.Services.Hive.DataTransfer;
using DA = HeuristicLab.Services.Hive.DataAccess;
namespace HeuristicLab.Services.Hive.Manager {
public class HeartbeatManager {
private const string MutexName = "HiveTaskSchedulingMutex";
private IPersistenceManager PersistenceManager {
get { return ServiceLocator.Instance.PersistenceManager; }
private ITaskScheduler TaskScheduler {
get { return ServiceLocator.Instance.TaskScheduler; }
/// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
/// a list of actions the slave should do
public List ProcessHeartbeat(Heartbeat heartbeat) {
List actions = new List();
var pm = PersistenceManager;
var slaveDao = pm.SlaveDao;
var taskDao = pm.TaskDao;
var slave = pm.UseTransaction(() => slaveDao.GetById(heartbeat.SlaveId));
if (slave == null) {
actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
} else {
if (heartbeat.HbInterval != slave.HbInterval) {
actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
if (slaveDao.SlaveHasToShutdownComputer(slave.ResourceId)) {
actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
// update slave data
slave.FreeCores = heartbeat.FreeCores;
slave.FreeMemory = heartbeat.FreeMemory;
slave.CpuUtilization = heartbeat.CpuUtilization;
slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0)
? DA.SlaveState.Calculating
: DA.SlaveState.Idle;
slave.LastHeartbeat = DateTime.Now;
pm.UseTransaction(() => {
slave.IsAllowedToCalculate = slaveDao.SlaveIsAllowedToCalculate(slave.ResourceId);
// update task data
actions.AddRange(UpdateTasks(pm, heartbeat, slave.IsAllowedToCalculate));
// assign new task
if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
bool mutexAquired = false;
var mutex = new Mutex(false, MutexName);
try {
mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
if (mutexAquired) {
var scheduledTaskIds = TaskScheduler.Schedule(slave, 1).ToArray();
foreach (var id in scheduledTaskIds) {
actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, id));
} else {
LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager: The mutex used for scheduling could not be aquired. (HB from Slave {slave.ResourceId})");
} catch (AbandonedMutexException) {
LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager: The mutex used for scheduling has been abandoned. (HB from Slave {slave.ResourceId})");
} catch (Exception ex) {
LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager threw an exception in ProcessHeartbeat (HB from Slave {slave.ResourceId}): {ex}");
} finally {
if (mutexAquired) mutex.ReleaseMutex();
return actions;
/// Update the progress of each task
/// Checks if all the task sent by heartbeat are supposed to be calculated by this slave
private IEnumerable UpdateTasks(IPersistenceManager pm, Heartbeat heartbeat, bool isAllowedToCalculate) {
var taskDao = pm.TaskDao;
var jobDao = pm.JobDao;
var assignedJobResourceDao = pm.AssignedJobResourceDao;
var actions = new List();
if (heartbeat.JobProgress == null || !heartbeat.JobProgress.Any())
return actions;
var jobIdsWithStatisticsPending = jobDao.GetJobIdsByState(DA.JobState.StatisticsPending).ToList();
// select all tasks and statelogs with one query
var taskIds = heartbeat.JobProgress.Select(x => x.Key).ToList();
var taskInfos = pm.UseTransaction(() =>
(from task in taskDao.GetAll()
where taskIds.Contains(task.TaskId)
let lastStateLog = task.StateLogs.OrderByDescending(x => x.DateTime).FirstOrDefault(x => x.State == DA.TaskState.Transferring)
select new {
TaskId = task.TaskId,
JobId = task.JobId,
State = task.State,
Command = task.Command,
SlaveId = lastStateLog != null ? lastStateLog.SlaveId : Guid.Empty
// process the jobProgresses
foreach (var jobProgress in heartbeat.JobProgress) {
var progress = jobProgress;
var curTask = taskInfos.SingleOrDefault(x => x.TaskId == progress.Key);
if (curTask == null) {
actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, progress.Key));
LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
} else if (jobIdsWithStatisticsPending.Contains(curTask.JobId)) {
// parenting job of current task has been requested for deletion (indicated by job state "Statistics Pending")
// update task execution time
pm.UseTransaction(() => {
taskDao.UpdateExecutionTime(curTask.TaskId, progress.Value.TotalMilliseconds);
actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
LogFactory.GetLogger(this.GetType().Namespace).Log("Abort task " + curTask.TaskId + " on slave " + heartbeat.SlaveId + ". The parenting job " + curTask.JobId + " was requested to be deleted.");
} else if (curTask.SlaveId == Guid.Empty || curTask.SlaveId != heartbeat.SlaveId) {
// assigned slave does not match heartbeat
actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask.TaskId);
} else if (!isAllowedToCalculate) {
actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not allowed to calculate any tasks tue to a downtime. The task is paused.");
} else if (!assignedJobResourceDao.CheckJobGrantedForResource(curTask.JobId, heartbeat.SlaveId)) {
// slaveId (and parent resourceGroupIds) are not among the assigned resources ids for task-parenting job
// this might happen when (a) job-resource assignment has been changed (b) slave is moved to different group
actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not granted to calculate task: " + curTask.TaskId + " of job: " + curTask.JobId);
} else {
// update task execution time
pm.UseTransaction(() => {
taskDao.UpdateExecutionTime(curTask.TaskId, progress.Value.TotalMilliseconds);
switch (curTask.Command) {
case DA.Command.Stop:
actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId));
case DA.Command.Pause:
actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
case DA.Command.Abort:
actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
return actions;