using System;
using System.Collections.Generic;
using System.Linq;
using HeuristicLab.Services.Hive.Common;
using HeuristicLab.Services.Hive.Common.DataTransfer;
namespace HeuristicLab.Services.Hive {
public class HeartbeatManager {
private DataAccess.IHiveDao dao {
get { return ServiceLocator.Instance.HiveDao; }
}
private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans {
get { return ServiceLocator.Instance.TransactionManager; }
}
private IAuthorizationManager auth {
get { return ServiceLocator.Instance.AuthorizationManager; }
}
///
/// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
///
/// a list of actions the slave should do
public List ProcessHeartbeat(Heartbeat heartbeat) {
List actions = new List();
Slave slave = dao.GetSlave(heartbeat.SlaveId);
if (slave == null) {
actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
} else {
// update slave data
slave.FreeCores = heartbeat.FreeCores;
slave.FreeMemory = heartbeat.FreeMemory;
slave.IsAllowedToCalculate = true; // Todo: look into calendar
slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
slave.LastHeartbeat = DateTime.Now;
dao.UpdateSlave(slave);
// update job data
actions.AddRange(UpdateJobs(heartbeat));
// assign new job
if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
var availableJobs = dao.GetWaitingJobs(slave, 1);
if (availableJobs.Count() > 0) {
var job = availableJobs.First();
actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateJob, job.Id));
AssignJob(slave, job);
}
}
}
return actions;
}
private void AssignJob(Slave slave, Job job) {
job = dao.UpdateJobState(job.Id, JobState.Transferring, slave.Id, null, null);
dao.UpdateSlave(slave);
// from now on the job has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
job.LastHeartbeat = DateTime.Now;
dao.UpdateJob(job);
}
///
/// Update the progress of each job
/// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
///
private IEnumerable UpdateJobs(Heartbeat heartbeat) {
List actions = new List();
if (heartbeat.JobProgress == null)
return actions;
// process the jobProgresses
foreach (var jobProgress in heartbeat.JobProgress) {
Job curJob = dao.GetJob(jobProgress.Key);
if (curJob == null) {
// job does not exist in db
actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
LogFactory.GetLogger(this.GetType().Namespace).Log("Job does not exist in DB: " + jobProgress.Key);
} else {
if (curJob.CurrentStateLog.SlaveId == Guid.Empty || curJob.CurrentStateLog.SlaveId != heartbeat.SlaveId) {
// assigned slave does not match heartbeat
actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
} else {
// save job execution time
curJob.ExecutionTime = jobProgress.Value;
curJob.LastHeartbeat = DateTime.Now;
switch (curJob.Command) {
case Command.Stop:
actions.Add(new MessageContainer(MessageContainer.MessageType.StopJob, curJob.Id));
break;
case Command.Pause:
actions.Add(new MessageContainer(MessageContainer.MessageType.PauseJob, curJob.Id));
break;
case Command.Abort:
actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
break;
}
dao.UpdateJob(curJob);
}
}
}
return actions;
}
}
}