#region License Information /* HeuristicLab * Copyright (C) 2002-2013 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Threading; using HeuristicLab.Services.Hive.DataTransfer; using DA = HeuristicLab.Services.Hive.DataAccess; namespace HeuristicLab.Services.Hive { public class HeartbeatManager { private const string MutexName = "HiveTaskSchedulingMutex"; private IHiveDao dao { get { return ServiceLocator.Instance.HiveDao; } } private ITaskScheduler taskScheduler { get { return ServiceLocator.Instance.TaskScheduler; } } private DataAccess.ITransactionManager trans { get { return ServiceLocator.Instance.TransactionManager; } } /// /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!) /// /// a list of actions the slave should do public List ProcessHeartbeat(Heartbeat heartbeat) { List actions = new List(); Slave slave = null; slave = trans.UseTransaction(() => { return dao.GetSlave(heartbeat.SlaveId); }); if (slave == null) { actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello)); } else { if (heartbeat.HbInterval != slave.HbInterval) { actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval)); } if (ShutdownSlaveComputer(slave.Id)) { actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer)); } // update slave data slave.FreeCores = heartbeat.FreeCores; slave.FreeMemory = heartbeat.FreeMemory; slave.CpuUtilization = heartbeat.CpuUtilization; slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.Id); slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle; slave.LastHeartbeat = DateTime.Now; trans.UseTransaction(() => { dao.UpdateSlave(slave); }); // update task data actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate)); // assign new task if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) { bool mutexAquired = false; var mutex = new Mutex(false, MutexName); try { mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); if (!mutexAquired) DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); else { IEnumerable availableTasks = null; availableTasks = trans.UseTransaction(() => { return taskScheduler.Schedule(dao.GetWaitingTasks(slave)); }); if (availableTasks.Any()) { var task = availableTasks.First(); AssignJob(slave, task.TaskId); actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); } } } catch (AbandonedMutexException) { DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned."); } catch (Exception ex) { DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString()); } finally { if (mutexAquired) mutex.ReleaseMutex(); } } } return actions; } private void AssignJob(Slave slave, Guid taskId) { trans.UseTransaction(() => { var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.Id, null, null); // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout) task.LastHeartbeat = DateTime.Now; dao.UpdateTask(task); }); } /// /// Update the progress of each task /// Checks if all the task sent by heartbeat are supposed to be calculated by this slave /// private IEnumerable UpdateTasks(Heartbeat heartbeat, bool IsAllowedToCalculate) { List actions = new List(); if (heartbeat.JobProgress == null) return actions; if (!IsAllowedToCalculate && heartbeat.JobProgress.Count != 0) { actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll)); } else { // process the jobProgresses foreach (var jobProgress in heartbeat.JobProgress) { Task curTask = null; curTask = trans.UseTransaction(() => { return dao.GetTask(jobProgress.Key); }); if (curTask == null) { // task does not exist in db actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key)); DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); } else { if (curTask.CurrentStateLog.SlaveId == Guid.Empty || curTask.CurrentStateLog.SlaveId != heartbeat.SlaveId) { // assigned slave does not match heartbeat actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id)); DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask); } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) { // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id)); } else { // save task execution time curTask.ExecutionTime = jobProgress.Value; curTask.LastHeartbeat = DateTime.Now; switch (curTask.Command) { case Command.Stop: actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.Id)); break; case Command.Pause: actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id)); break; case Command.Abort: actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id)); break; } trans.UseTransaction(() => { dao.UpdateTask(curTask); }); } } } } return actions; } private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) { return trans.UseTransaction(() => { var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id); var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id); return assignedResourceIds.Any(x => slaveResourceIds.Contains(x)); }); } private bool SlaveIsAllowedToCalculate(Guid slaveId) { // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0); }); } private bool ShutdownSlaveComputer(Guid slaveId) { return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).Any(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() != 0); }); } } }