using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Transactions;
using HeuristicLab.Services.Hive.Common;
using HeuristicLab.Services.Hive.Common.DataTransfer;
using HeuristicLab.Tracing;
using HeuristicLab.Core;
namespace HeuristicLab.Services.Hive {
///
/// This class holds the state of all recent heartbeats and decides to reschedule jobs and set slaves offline
///
public class LifecycleManager : ILifecycleManager {
private DataAccess.IHiveDao dao {
get { return ServiceLocator.Instance.HiveDao; }
}
private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans {
get { return ServiceLocator.Instance.TransactionManager; }
}
private IAuthorizationManager auth {
get { return ServiceLocator.Instance.AuthorizationManager; }
}
private static object locker = new object();
private Dictionary heartbeats = new Dictionary();
// Windows-Forms timer is single threaded, so callbacks will be synchron
System.Windows.Forms.Timer timer = new System.Windows.Forms.Timer();
///
/// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent
/// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it.
/// The number value indicates a TimeToLive count that is decremented with each server-heartbeat.
/// When the number reaches zero, the jobs is assumed to be lost and is set Offline again.
///
private Dictionary newlyAssignedJobs = new Dictionary();
///
/// Counts how many jobs are currently beeing transferred.
///
private int jobsCurrentlyTransfering = 0;
public int JobsCurrentlyTransferring {
get { return jobsCurrentlyTransfering; }
set {
if (jobsCurrentlyTransfering != value) {
jobsCurrentlyTransfering = value;
Logger.Info("JobsCurrentlyTransfering: " + jobsCurrentlyTransfering);
}
}
}
public ExecutionState ExecutionState {
get { return timer.Enabled ? ExecutionState.Started : Core.ExecutionState.Stopped; }
}
public LifecycleManager() { }
public void Start() {
if (ExecutionState == Core.ExecutionState.Stopped) {
this.timer.Interval = (int)new TimeSpan(0, 0, 10).TotalMilliseconds;
this.timer.Tick += new EventHandler(timer_Tick);
this.timer.Start();
AddAllSlavesToHeartbeats();
}
}
public void Stop() {
if (ExecutionState == Core.ExecutionState.Started) {
timer.Stop();
}
}
///
// add all slaves to hearbeats-collection and give them some time to say hello (HEARTBEAT_TIMEOUT)
// otherwise alls the slaves jobs would be aborted immediately, which is not desirable if the server has just been restarted
///
private void AddAllSlavesToHeartbeats() {
lock (locker) {
using (trans.OpenTransaction()) {
Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
foreach (Guid slaveId in slaveIds) {
if (!heartbeats.ContainsKey(slaveId)) {
heartbeats.Add(slaveId, DateTime.Now);
}
}
}
}
}
///
/// This method is supposed to check if slaves are online
/// if not -> set them offline and check if they where calculating a job
///
void timer_Tick(object sender, EventArgs e) {
lock (locker) {
using (trans.OpenTransaction()) {
Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
foreach (Guid slaveId in slaveIds) {
if (SlaveTimedOut(slaveId)) {
var slave = dao.GetSlave(slaveId);
if (slave.SlaveState != SlaveState.Offline) {
AbortJobs(slaveId);
slave.SlaveState = SlaveState.Offline;
dao.UpdateSlave(slave);
}
heartbeats.Remove(slaveId);
}
}
}
}
}
private bool SlaveTimedOut(Guid slaveId) {
if (!heartbeats.ContainsKey(slaveId))
return true;
if ((DateTime.Now - heartbeats[slaveId]).TotalSeconds > ApplicationConstants.HeartbeatTimeout) {
return true;
}
return false;
}
private void AbortJobs(Guid slaveId) {
var jobs = dao.GetJobs(x => x.Slave.ResourceId == slaveId);
foreach (var j in jobs) {
j.JobState = JobState.Waiting;
dao.UpdateJob(j);
}
}
///
/// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
///
/// a list of actions the slave should do
public List ProcessHeartbeat(Heartbeat heartbeat) {
List actions = new List();
Slave slave = dao.GetSlave(heartbeat.SlaveId);
if (slave == null) {
actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
} else {
heartbeats[heartbeat.SlaveId] = DateTime.Now;
actions.AddRange(UpdateJobs(heartbeat));
if (this.IsAllowedToSendJobs() && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
var availableJobs = dao.GetWaitingJobs(slave);
if (availableJobs.Count() > 0) {
actions.Add(new MessageContainer(MessageContainer.MessageType.AquireJob));
}
}
if (slave.FreeCores != heartbeat.FreeCores ||
slave.FreeMemory != heartbeat.FreeMemory ||
slave.IsAllowedToCalculate != heartbeat.IsAllowedToCalculate ||
slave.SlaveState != (heartbeat.JobProgress.Count > 0 ? SlaveState.Calculating : SlaveState.Idle)) { // only update slave when something changed, to avoid unnecessary updates
slave.FreeCores = heartbeat.FreeCores;
slave.FreeMemory = heartbeat.FreeMemory;
slave.IsAllowedToCalculate = heartbeat.IsAllowedToCalculate;
slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
dao.UpdateSlave(slave);
}
}
return actions;
}
///
/// Update the progress of each job
/// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
///
private IEnumerable UpdateJobs(Heartbeat heartbeat) {
List actions = new List();
if (heartbeat.JobProgress == null)
return actions;
// process the jobProgresses
foreach (var jobProgress in heartbeat.JobProgress) {
Job curJob = dao.GetJob(jobProgress.Key);
if (curJob == null) {
// job does not exist in db
actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
Logger.Error("Job does not exist in DB: " + jobProgress.Key);
} else {
if (curJob.SlaveId == Guid.Empty || curJob.SlaveId != heartbeat.SlaveId) {
// assigned slave does not match heartbeat
actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
Logger.Error("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
} else {
// save job execution time
curJob.ExecutionTime = jobProgress.Value;
if (curJob.JobState == JobState.Aborted) {
// a request to abort the job has been set
actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
}
dao.UpdateJob(curJob);
}
}
}
return actions;
}
///
/// Returns true if there are enough resources to send a job
/// There should not be too many jobs sent simultaniously
///
private bool IsAllowedToSendJobs() {
return JobsCurrentlyTransferring < ApplicationConstants.MaxJobTransferCount;
}
}
}