using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Transactions; using HeuristicLab.Services.Hive.Common; using HeuristicLab.Services.Hive.Common.DataTransfer; using HeuristicLab.Tracing; using HeuristicLab.Core; namespace HeuristicLab.Services.Hive { /// /// This class holds the state of all recent heartbeats and decides to reschedule jobs and set slaves offline /// public class LifecycleManager : ILifecycleManager { private DataAccess.IHiveDao dao { get { return ServiceLocator.Instance.HiveDao; } } private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans { get { return ServiceLocator.Instance.TransactionManager; } } private IAuthorizationManager auth { get { return ServiceLocator.Instance.AuthorizationManager; } } private static object locker = new object(); private Dictionary heartbeats = new Dictionary(); // Windows-Forms timer is single threaded, so callbacks will be synchron System.Windows.Forms.Timer timer = new System.Windows.Forms.Timer(); /// /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it. /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat. /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again. /// private Dictionary newlyAssignedJobs = new Dictionary(); /// /// Counts how many jobs are currently beeing transferred. /// private int jobsCurrentlyTransfering = 0; public int JobsCurrentlyTransferring { get { return jobsCurrentlyTransfering; } set { if (jobsCurrentlyTransfering != value) { jobsCurrentlyTransfering = value; Logger.Info("JobsCurrentlyTransfering: " + jobsCurrentlyTransfering); } } } public ExecutionState ExecutionState { get { return timer.Enabled ? ExecutionState.Started : Core.ExecutionState.Stopped; } } public LifecycleManager() { } public void Start() { if (ExecutionState == Core.ExecutionState.Stopped) { this.timer.Interval = (int)new TimeSpan(0, 0, 10).TotalMilliseconds; this.timer.Tick += new EventHandler(timer_Tick); this.timer.Start(); AddAllSlavesToHeartbeats(); } } public void Stop() { if (ExecutionState == Core.ExecutionState.Started) { timer.Stop(); } } /// // add all slaves to hearbeats-collection and give them some time to say hello (HEARTBEAT_TIMEOUT) // otherwise alls the slaves jobs would be aborted immediately, which is not desirable if the server has just been restarted /// private void AddAllSlavesToHeartbeats() { lock (locker) { using (trans.OpenTransaction()) { Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray(); foreach (Guid slaveId in slaveIds) { if (!heartbeats.ContainsKey(slaveId)) { heartbeats.Add(slaveId, DateTime.Now); } } } } } /// /// This method is supposed to check if slaves are online /// if not -> set them offline and check if they where calculating a job /// void timer_Tick(object sender, EventArgs e) { lock (locker) { using (trans.OpenTransaction()) { Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray(); foreach (Guid slaveId in slaveIds) { if (SlaveTimedOut(slaveId)) { var slave = dao.GetSlave(slaveId); if (slave.SlaveState != SlaveState.Offline) { AbortJobs(slaveId); slave.SlaveState = SlaveState.Offline; dao.UpdateSlave(slave); } heartbeats.Remove(slaveId); } } } } } private bool SlaveTimedOut(Guid slaveId) { if (!heartbeats.ContainsKey(slaveId)) return true; if ((DateTime.Now - heartbeats[slaveId]).TotalSeconds > ApplicationConstants.HeartbeatTimeout) { return true; } return false; } private void AbortJobs(Guid slaveId) { var jobs = dao.GetJobs(x => x.Slave.ResourceId == slaveId); foreach (var j in jobs) { j.JobState = JobState.Waiting; dao.UpdateJob(j); } } /// /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!) /// /// a list of actions the slave should do public List ProcessHeartbeat(Heartbeat heartbeat) { List actions = new List(); Slave slave = dao.GetSlave(heartbeat.SlaveId); if (slave == null) { actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello)); } else { heartbeats[heartbeat.SlaveId] = DateTime.Now; actions.AddRange(UpdateJobs(heartbeat)); if (this.IsAllowedToSendJobs() && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) { var availableJobs = dao.GetWaitingJobs(slave); if (availableJobs.Count() > 0) { actions.Add(new MessageContainer(MessageContainer.MessageType.AquireJob)); } } if (slave.FreeCores != heartbeat.FreeCores || slave.FreeMemory != heartbeat.FreeMemory || slave.IsAllowedToCalculate != heartbeat.IsAllowedToCalculate || slave.SlaveState != (heartbeat.JobProgress.Count > 0 ? SlaveState.Calculating : SlaveState.Idle)) { // only update slave when something changed, to avoid unnecessary updates slave.FreeCores = heartbeat.FreeCores; slave.FreeMemory = heartbeat.FreeMemory; slave.IsAllowedToCalculate = heartbeat.IsAllowedToCalculate; slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle; dao.UpdateSlave(slave); } } return actions; } /// /// Update the progress of each job /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave /// private IEnumerable UpdateJobs(Heartbeat heartbeat) { List actions = new List(); if (heartbeat.JobProgress == null) return actions; // process the jobProgresses foreach (var jobProgress in heartbeat.JobProgress) { Job curJob = dao.GetJob(jobProgress.Key); if (curJob == null) { // job does not exist in db actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key)); Logger.Error("Job does not exist in DB: " + jobProgress.Key); } else { if (curJob.SlaveId == Guid.Empty || curJob.SlaveId != heartbeat.SlaveId) { // assigned slave does not match heartbeat actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id)); Logger.Error("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob); } else { // save job execution time curJob.ExecutionTime = jobProgress.Value; if (curJob.JobState == JobState.Aborted) { // a request to abort the job has been set actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id)); } dao.UpdateJob(curJob); } } } return actions; } /// /// Returns true if there are enough resources to send a job /// There should not be too many jobs sent simultaniously /// private bool IsAllowedToSendJobs() { return JobsCurrentlyTransferring < ApplicationConstants.MaxJobTransferCount; } } }