using System; using System.Collections.Generic; using System.Linq; using HeuristicLab.Services.Hive.Common; using HeuristicLab.Services.Hive.Common.DataTransfer; namespace HeuristicLab.Services.Hive { public class HeartbeatManager { private DataAccess.IHiveDao dao { get { return ServiceLocator.Instance.HiveDao; } } private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans { get { return ServiceLocator.Instance.TransactionManager; } } private IAuthorizationManager auth { get { return ServiceLocator.Instance.AuthorizationManager; } } /// /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!) /// /// a list of actions the slave should do public List ProcessHeartbeat(Heartbeat heartbeat) { List actions = new List(); Slave slave = dao.GetSlave(heartbeat.SlaveId); if (slave == null) { actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello)); } else { // update slave data slave.FreeCores = heartbeat.FreeCores; slave.FreeMemory = heartbeat.FreeMemory; slave.IsAllowedToCalculate = true; // Todo: look into calendar slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle; slave.LastHeartbeat = DateTime.Now; dao.UpdateSlave(slave); // update job data actions.AddRange(UpdateJobs(heartbeat)); // assign new job if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) { var availableJobs = dao.GetWaitingJobs(slave, 1); if (availableJobs.Count() > 0) { var job = availableJobs.First(); actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateJob, job.Id)); AssignJob(slave, job); } } } return actions; } private void AssignJob(Slave slave, Job job) { job = dao.UpdateJobState(job.Id, JobState.Transferring, slave.Id, null, null); dao.UpdateSlave(slave); // from now on the job has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout) job.LastHeartbeat = DateTime.Now; dao.UpdateJob(job); } /// /// Update the progress of each job /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave /// private IEnumerable UpdateJobs(Heartbeat heartbeat) { List actions = new List(); if (heartbeat.JobProgress == null) return actions; // process the jobProgresses foreach (var jobProgress in heartbeat.JobProgress) { Job curJob = dao.GetJob(jobProgress.Key); if (curJob == null) { // job does not exist in db actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key)); LogFactory.GetLogger(this.GetType().Namespace).Log("Job does not exist in DB: " + jobProgress.Key); } else { if (curJob.CurrentStateLog.SlaveId == Guid.Empty || curJob.CurrentStateLog.SlaveId != heartbeat.SlaveId) { // assigned slave does not match heartbeat actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id)); LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob); } else { // save job execution time curJob.ExecutionTime = jobProgress.Value; curJob.LastHeartbeat = DateTime.Now; switch (curJob.Command) { case Command.Stop: actions.Add(new MessageContainer(MessageContainer.MessageType.StopJob, curJob.Id)); break; case Command.Pause: actions.Add(new MessageContainer(MessageContainer.MessageType.PauseJob, curJob.Id)); break; case Command.Abort: actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id)); break; } dao.UpdateJob(curJob); } } } return actions; } } }