- Timestamp:
- 02/01/11 18:12:46 (13 years ago)
- Location:
- branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4
- Files:
-
- 1 added
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/HeuristicLab.Services.Hive-3.4.csproj
r5402 r5405 112 112 <None Include="HeuristicLabServicesHivePlugin.cs.frame" /> 113 113 <None Include="Properties\AssemblyInfo.cs.frame" /> 114 <Compile Include="HeartbeatManager.cs" /> 114 115 <Compile Include="Interfaces\ILifecycleManager.cs" /> 115 116 <Compile Include="Interfaces\IServiceLocator.cs" /> -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/HiveService.cs
r5404 r5405 25 25 private ILifecycleManager lifecycleManager { 26 26 get { return ServiceLocator.Instance.LifecycleManager; } 27 } 28 private HeartbeatManager heartbeatManager { 29 get { return ServiceLocator.Instance.HeartbeatManager; } 27 30 } 28 31 … … 162 165 163 166 #region Login Methods 164 public void Hello( Guid slaveId, string name, int cores, int memory) {165 using (trans.OpenTransaction()) { 166 var slave = dao.GetSlave(slaveI d);167 public void Hello(Slave slaveInfo) { 168 using (trans.OpenTransaction()) { 169 var slave = dao.GetSlave(slaveInfo.Id); 167 170 168 171 if (slave == null) { 169 slave = new Slave { Id = slaveId, Name = name, Cores = cores, Memory = memory }; 170 slave.IsAllowedToCalculate = true; //a little bit to optimistic? 171 slave.SlaveState = SlaveState.Idle; 172 dao.AddSlave(slave); 172 dao.AddSlave(slaveInfo); 173 173 } else { 174 //TODO: error handling?174 dao.UpdateSlave(slaveInfo); 175 175 } 176 176 } … … 191 191 public List<MessageContainer> Heartbeat(Heartbeat heartbeat) { 192 192 using (trans.OpenTransaction()) { 193 return lifecycleManager.ProcessHeartbeat(heartbeat);193 return heartbeatManager.ProcessHeartbeat(heartbeat); 194 194 } 195 195 } -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/Interfaces/ILifecycleManager.cs
r5095 r5405 14 14 15 15 void Stop(); 16 17 List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat);18 16 } 19 17 } -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/Interfaces/IServiceLocator.cs
r5095 r5405 12 12 ILifecycleManager LifecycleManager { get; } 13 13 TransactionManager TransactionManager { get; } 14 HeartbeatManager HeartbeatManager { get; } 14 15 } 15 16 } -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/LifecycleManager.cs
r5404 r5405 25 25 26 26 private static object locker = new object(); 27 private Dictionary<Guid, DateTime> heartbeats = new Dictionary<Guid, DateTime>();28 27 29 28 // Windows-Forms timer is single threaded, so callbacks will be synchron 30 System.Windows.Forms.Timer timer = new System.Windows.Forms.Timer();29 System.Windows.Forms.Timer timer; 31 30 32 /// <summary> 33 /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent 34 /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it. 35 /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat. 36 /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again. 37 /// </summary> 38 private Dictionary<Guid, int> newlyAssignedJobs = new Dictionary<Guid, int>(); 39 40 /// <summary> 41 /// Counts how many jobs are currently beeing transferred. 42 /// </summary> 43 private int jobsCurrentlyTransfering = 0; 44 public int JobsCurrentlyTransferring { 45 get { return jobsCurrentlyTransfering; } 46 set { 47 if (jobsCurrentlyTransfering != value) { 48 jobsCurrentlyTransfering = value; 49 Logger.Info("JobsCurrentlyTransfering: " + jobsCurrentlyTransfering); 50 } 51 } 31 public ExecutionState ExecutionState { 32 get { return timer.Enabled ? ExecutionState.Started : Core.ExecutionState.Stopped; } 52 33 } 53 34 54 public ExecutionState ExecutionState { 55 get { return timer.Enabled ? ExecutionState.Started : Core.ExecutionState.Stopped; } 35 public LifecycleManager() { 36 this.timer = new System.Windows.Forms.Timer(); 37 this.timer.Tick += new EventHandler(timer_Tick); 56 38 } 57 58 public LifecycleManager() { }59 39 60 40 public void Start() { 61 41 if (ExecutionState == Core.ExecutionState.Stopped) { 62 42 this.timer.Interval = (int)new TimeSpan(0, 0, 10).TotalMilliseconds; 63 this.timer.Tick += new EventHandler(timer_Tick);64 43 this.timer.Start(); 65 AddAllSlavesToHeartbeats();66 44 } 67 45 } … … 74 52 75 53 /// <summary> 76 // add all slaves to hearbeats-collection and give them some time to say hello (HEARTBEAT_TIMEOUT)77 // otherwise alls the slaves jobs would be aborted immediately, which is not desirable if the server has just been restarted78 /// </summary>79 private void AddAllSlavesToHeartbeats() {80 lock (locker) {81 using (trans.OpenTransaction()) {82 Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();83 foreach (Guid slaveId in slaveIds) {84 if (!heartbeats.ContainsKey(slaveId)) {85 heartbeats.Add(slaveId, DateTime.Now);86 }87 }88 }89 }90 }91 92 /// <summary>93 54 /// This method is supposed to check if slaves are online 94 55 /// if not -> set them offline and check if they where calculating a job … … 97 58 lock (locker) { 98 59 using (trans.OpenTransaction()) { 99 Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray(); 100 foreach (Guid slaveId in slaveIds) { 101 if (SlaveTimedOut(slaveId)) { 102 var slave = dao.GetSlave(slaveId); 103 if (slave.SlaveState != SlaveState.Offline) { 104 AbortJobs(slaveId); 105 slave.SlaveState = SlaveState.Offline; 106 dao.UpdateSlave(slave); 107 } 108 heartbeats.Remove(slaveId); 60 var slaves = dao.GetSlaves(x => x.SlaveState != SlaveState.Offline); 61 foreach (Slave slave in slaves) { 62 if (!slave.LastHeartbeat.HasValue || (DateTime.Now - slave.LastHeartbeat.Value).TotalSeconds > ApplicationConstants.HeartbeatTimeout) { 63 slave.SlaveState = SlaveState.Offline; 64 AbortJobs(slave.Id); 65 dao.UpdateSlave(slave); 109 66 } 110 67 } 111 68 } 112 69 } 113 }114 115 private bool SlaveTimedOut(Guid slaveId) {116 if (!heartbeats.ContainsKey(slaveId))117 return true;118 119 if ((DateTime.Now - heartbeats[slaveId]).TotalSeconds > ApplicationConstants.HeartbeatTimeout) {120 return true;121 }122 123 return false;124 70 } 125 71 … … 131 77 } 132 78 } 133 134 /// <summary>135 /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)136 /// </summary>137 /// <returns>a list of actions the slave should do</returns>138 public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {139 List<MessageContainer> actions = new List<MessageContainer>();140 Slave slave = dao.GetSlave(heartbeat.SlaveId);141 if (slave == null) {142 actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));143 } else {144 heartbeats[heartbeat.SlaveId] = DateTime.Now;145 actions.AddRange(UpdateJobs(heartbeat));146 147 if (this.IsAllowedToSendJobs() && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {148 var availableJobs = dao.GetWaitingJobs(slave, 1);149 if (availableJobs.Count() > 0) {150 var job = availableJobs.First();151 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateJob, job.Id));152 AssignJob(slave, job);153 }154 }155 156 if (slave.FreeCores != heartbeat.FreeCores ||157 slave.FreeMemory != heartbeat.FreeMemory ||158 slave.IsAllowedToCalculate != heartbeat.IsAllowedToCalculate ||159 slave.SlaveState != (heartbeat.JobProgress.Count > 0 ? SlaveState.Calculating : SlaveState.Idle)) { // only update slave when something changed, to avoid unnecessary updates160 slave.FreeCores = heartbeat.FreeCores;161 slave.FreeMemory = heartbeat.FreeMemory;162 slave.IsAllowedToCalculate = heartbeat.IsAllowedToCalculate;163 slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;164 dao.UpdateSlave(slave);165 }166 }167 return actions;168 }169 170 private void AssignJob(Slave slave, Job job) {171 job.SlaveId = slave.Id;172 job.JobState = JobState.Calculating; // Todo: Maybe use State = Transferring (?)173 job.DateCalculated = DateTime.Now; // Todo: use statelog instead174 dao.UpdateJob(job);175 dao.UpdateSlave(slave);176 }177 178 /// <summary>179 /// Update the progress of each job180 /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave181 /// </summary>182 private IEnumerable<MessageContainer> UpdateJobs(Heartbeat heartbeat) {183 List<MessageContainer> actions = new List<MessageContainer>();184 185 if (heartbeat.JobProgress == null)186 return actions;187 188 // process the jobProgresses189 foreach (var jobProgress in heartbeat.JobProgress) {190 Job curJob = dao.GetJob(jobProgress.Key);191 if (curJob == null) {192 // job does not exist in db193 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));194 Logger.Error("Job does not exist in DB: " + jobProgress.Key);195 } else {196 if (curJob.SlaveId == Guid.Empty || curJob.SlaveId != heartbeat.SlaveId) {197 // assigned slave does not match heartbeat198 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));199 Logger.Error("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);200 } else {201 // save job execution time202 curJob.ExecutionTime = jobProgress.Value;203 204 if (curJob.JobState == JobState.Aborted) {205 // a request to abort the job has been set206 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));207 }208 dao.UpdateJob(curJob);209 }210 }211 }212 return actions;213 }214 215 /// <summary>216 /// Returns true if there are enough resources to send a job217 /// There should not be too many jobs sent simultaniously218 /// </summary>219 private bool IsAllowedToSendJobs() {220 return JobsCurrentlyTransferring < ApplicationConstants.MaxJobTransferCount;221 }222 79 } 223 80 } -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/ServiceLocator.cs
r5095 r5405 48 48 } 49 49 } 50 51 private HeartbeatManager heartbeatManager; 52 public HeartbeatManager HeartbeatManager { 53 get { 54 if(heartbeatManager == null) heartbeatManager = new HeartbeatManager(); 55 return heartbeatManager; 56 } 57 } 50 58 } 51 59 }
Note: See TracChangeset
for help on using the changeset viewer.