[5095] | 1 | using System;
|
---|
| 2 | using System.Collections.Generic;
|
---|
| 3 | using System.Linq;
|
---|
| 4 | using System.Text;
|
---|
| 5 | using System.Transactions;
|
---|
| 6 | using HeuristicLab.Services.Hive.Common;
|
---|
| 7 | using HeuristicLab.Services.Hive.Common.DataTransfer;
|
---|
| 8 | using HeuristicLab.Tracing;
|
---|
| 9 | using HeuristicLab.Core;
|
---|
| 10 |
|
---|
| 11 | namespace HeuristicLab.Services.Hive {
|
---|
| 12 | /// <summary>
|
---|
| 13 | /// This class holds the state of all recent heartbeats and decides to reschedule jobs and set slaves offline
|
---|
| 14 | /// </summary>
|
---|
| 15 | public class LifecycleManager : ILifecycleManager {
|
---|
| 16 | private DataAccess.IHiveDao dao {
|
---|
| 17 | get { return ServiceLocator.Instance.HiveDao; }
|
---|
| 18 | }
|
---|
| 19 | private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans {
|
---|
| 20 | get { return ServiceLocator.Instance.TransactionManager; }
|
---|
| 21 | }
|
---|
| 22 | private IAuthorizationManager auth {
|
---|
| 23 | get { return ServiceLocator.Instance.AuthorizationManager; }
|
---|
| 24 | }
|
---|
| 25 |
|
---|
| 26 | private static object locker = new object();
|
---|
| 27 | private Dictionary<Guid, DateTime> heartbeats = new Dictionary<Guid, DateTime>();
|
---|
| 28 |
|
---|
| 29 | // Windows-Forms timer is single threaded, so callbacks will be synchron
|
---|
| 30 | System.Windows.Forms.Timer timer = new System.Windows.Forms.Timer();
|
---|
| 31 |
|
---|
| 32 | /// <summary>
|
---|
| 33 | /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent
|
---|
| 34 | /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it.
|
---|
| 35 | /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat.
|
---|
| 36 | /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again.
|
---|
| 37 | /// </summary>
|
---|
| 38 | private Dictionary<Guid, int> newlyAssignedJobs = new Dictionary<Guid, int>();
|
---|
| 39 |
|
---|
| 40 | /// <summary>
|
---|
| 41 | /// Counts how many jobs are currently beeing transferred.
|
---|
| 42 | /// </summary>
|
---|
| 43 | private int jobsCurrentlyTransfering = 0;
|
---|
| 44 | public int JobsCurrentlyTransferring {
|
---|
| 45 | get { return jobsCurrentlyTransfering; }
|
---|
| 46 | set {
|
---|
| 47 | if (jobsCurrentlyTransfering != value) {
|
---|
| 48 | jobsCurrentlyTransfering = value;
|
---|
| 49 | Logger.Info("JobsCurrentlyTransfering: " + jobsCurrentlyTransfering);
|
---|
| 50 | }
|
---|
| 51 | }
|
---|
| 52 | }
|
---|
| 53 |
|
---|
| 54 | public ExecutionState ExecutionState {
|
---|
| 55 | get { return timer.Enabled ? ExecutionState.Started : Core.ExecutionState.Stopped; }
|
---|
| 56 | }
|
---|
| 57 |
|
---|
| 58 | public LifecycleManager() { }
|
---|
| 59 |
|
---|
| 60 | public void Start() {
|
---|
| 61 | if (ExecutionState == Core.ExecutionState.Stopped) {
|
---|
| 62 | this.timer.Interval = (int)new TimeSpan(0, 0, 10).TotalMilliseconds;
|
---|
| 63 | this.timer.Tick += new EventHandler(timer_Tick);
|
---|
| 64 | this.timer.Start();
|
---|
| 65 | AddAllSlavesToHeartbeats();
|
---|
| 66 | }
|
---|
| 67 | }
|
---|
| 68 |
|
---|
| 69 | public void Stop() {
|
---|
| 70 | if (ExecutionState == Core.ExecutionState.Started) {
|
---|
| 71 | timer.Stop();
|
---|
| 72 | }
|
---|
| 73 | }
|
---|
| 74 |
|
---|
| 75 | /// <summary>
|
---|
| 76 | // add all slaves to hearbeats-collection and give them some time to say hello (HEARTBEAT_TIMEOUT)
|
---|
| 77 | // otherwise alls the slaves jobs would be aborted immediately, which is not desirable if the server has just been restarted
|
---|
| 78 | /// </summary>
|
---|
| 79 | private void AddAllSlavesToHeartbeats() {
|
---|
| 80 | lock (locker) {
|
---|
| 81 | using (trans.OpenTransaction()) {
|
---|
| 82 | Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
|
---|
| 83 | foreach (Guid slaveId in slaveIds) {
|
---|
| 84 | if (!heartbeats.ContainsKey(slaveId)) {
|
---|
| 85 | heartbeats.Add(slaveId, DateTime.Now);
|
---|
| 86 | }
|
---|
| 87 | }
|
---|
| 88 | }
|
---|
| 89 | }
|
---|
| 90 | }
|
---|
| 91 |
|
---|
| 92 | /// <summary>
|
---|
| 93 | /// This method is supposed to check if slaves are online
|
---|
| 94 | /// if not -> set them offline and check if they where calculating a job
|
---|
| 95 | /// </summary>
|
---|
| 96 | void timer_Tick(object sender, EventArgs e) {
|
---|
| 97 | lock (locker) {
|
---|
| 98 | using (trans.OpenTransaction()) {
|
---|
| 99 | Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
|
---|
| 100 | foreach (Guid slaveId in slaveIds) {
|
---|
| 101 | if (SlaveTimedOut(slaveId)) {
|
---|
| 102 | var slave = dao.GetSlave(slaveId);
|
---|
| 103 | if (slave.SlaveState != SlaveState.Offline) {
|
---|
| 104 | AbortJobs(slaveId);
|
---|
| 105 | slave.SlaveState = SlaveState.Offline;
|
---|
| 106 | dao.UpdateSlave(slave);
|
---|
| 107 | }
|
---|
| 108 | heartbeats.Remove(slaveId);
|
---|
| 109 | }
|
---|
| 110 | }
|
---|
| 111 | }
|
---|
| 112 | }
|
---|
| 113 | }
|
---|
| 114 |
|
---|
| 115 | private bool SlaveTimedOut(Guid slaveId) {
|
---|
| 116 | if (!heartbeats.ContainsKey(slaveId))
|
---|
| 117 | return true;
|
---|
| 118 |
|
---|
| 119 | if ((DateTime.Now - heartbeats[slaveId]).TotalSeconds > ApplicationConstants.HeartbeatTimeout) {
|
---|
| 120 | return true;
|
---|
| 121 | }
|
---|
| 122 |
|
---|
| 123 | return false;
|
---|
| 124 | }
|
---|
| 125 |
|
---|
| 126 | private void AbortJobs(Guid slaveId) {
|
---|
| 127 | var jobs = dao.GetJobs(x => x.Slave.ResourceId == slaveId);
|
---|
| 128 | foreach (var j in jobs) {
|
---|
| 129 | j.JobState = JobState.Waiting;
|
---|
| 130 | dao.UpdateJob(j);
|
---|
| 131 | }
|
---|
| 132 | }
|
---|
| 133 |
|
---|
| 134 | /// <summary>
|
---|
| 135 | /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
|
---|
| 136 | /// </summary>
|
---|
| 137 | /// <returns>a list of actions the slave should do</returns>
|
---|
| 138 | public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
|
---|
| 139 | List<MessageContainer> actions = new List<MessageContainer>();
|
---|
| 140 | Slave slave = dao.GetSlave(heartbeat.SlaveId);
|
---|
| 141 | if (slave == null) {
|
---|
| 142 | actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
|
---|
| 143 | } else {
|
---|
| 144 | heartbeats[heartbeat.SlaveId] = DateTime.Now;
|
---|
| 145 | actions.AddRange(UpdateJobs(heartbeat));
|
---|
| 146 |
|
---|
| 147 | if (this.IsAllowedToSendJobs() && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
|
---|
| 148 | var availableJobs = dao.GetWaitingJobs(slave);
|
---|
| 149 | if (availableJobs.Count() > 0) {
|
---|
| 150 | actions.Add(new MessageContainer(MessageContainer.MessageType.AquireJob));
|
---|
| 151 | }
|
---|
| 152 | }
|
---|
| 153 |
|
---|
| 154 | if (slave.FreeCores != heartbeat.FreeCores ||
|
---|
| 155 | slave.FreeMemory != heartbeat.FreeMemory ||
|
---|
| 156 | slave.IsAllowedToCalculate != heartbeat.IsAllowedToCalculate ||
|
---|
| 157 | slave.SlaveState != (heartbeat.JobProgress.Count > 0 ? SlaveState.Calculating : SlaveState.Idle)) { // only update slave when something changed, to avoid unnecessary updates
|
---|
| 158 | slave.FreeCores = heartbeat.FreeCores;
|
---|
| 159 | slave.FreeMemory = heartbeat.FreeMemory;
|
---|
| 160 | slave.IsAllowedToCalculate = heartbeat.IsAllowedToCalculate;
|
---|
| 161 | slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
|
---|
| 162 | dao.UpdateSlave(slave);
|
---|
| 163 | }
|
---|
| 164 | }
|
---|
| 165 | return actions;
|
---|
| 166 | }
|
---|
| 167 |
|
---|
| 168 | /// <summary>
|
---|
| 169 | /// Update the progress of each job
|
---|
| 170 | /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
|
---|
| 171 | /// </summary>
|
---|
| 172 | private IEnumerable<MessageContainer> UpdateJobs(Heartbeat heartbeat) {
|
---|
| 173 | List<MessageContainer> actions = new List<MessageContainer>();
|
---|
| 174 |
|
---|
| 175 | if (heartbeat.JobProgress == null)
|
---|
| 176 | return actions;
|
---|
| 177 |
|
---|
| 178 | // process the jobProgresses
|
---|
| 179 | foreach (var jobProgress in heartbeat.JobProgress) {
|
---|
| 180 | Job curJob = dao.GetJob(jobProgress.Key);
|
---|
| 181 | if (curJob == null) {
|
---|
| 182 | // job does not exist in db
|
---|
| 183 | actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
|
---|
| 184 | Logger.Error("Job does not exist in DB: " + jobProgress.Key);
|
---|
| 185 | } else {
|
---|
| 186 | if (curJob.SlaveId == Guid.Empty || curJob.SlaveId != heartbeat.SlaveId) {
|
---|
| 187 | // assigned slave does not match heartbeat
|
---|
| 188 | actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
|
---|
| 189 | Logger.Error("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
|
---|
| 190 | } else {
|
---|
| 191 | // save job execution time
|
---|
| 192 | curJob.ExecutionTime = jobProgress.Value;
|
---|
| 193 |
|
---|
| 194 | if (curJob.JobState == JobState.Aborted) {
|
---|
| 195 | // a request to abort the job has been set
|
---|
| 196 | actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
|
---|
| 197 | }
|
---|
| 198 | dao.UpdateJob(curJob);
|
---|
| 199 | }
|
---|
| 200 | }
|
---|
| 201 | }
|
---|
| 202 | return actions;
|
---|
| 203 | }
|
---|
| 204 |
|
---|
| 205 | /// <summary>
|
---|
| 206 | /// Returns true if there are enough resources to send a job
|
---|
| 207 | /// There should not be too many jobs sent simultaniously
|
---|
| 208 | /// </summary>
|
---|
| 209 | private bool IsAllowedToSendJobs() {
|
---|
| 210 | return JobsCurrentlyTransferring < ApplicationConstants.MaxJobTransferCount;
|
---|
| 211 | }
|
---|
| 212 | }
|
---|
| 213 | }
|
---|