Changeset 5093 for branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/SlaveCommunicator.cs
- Timestamp:
- 12/13/10 14:13:15 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/SlaveCommunicator.cs
r5000 r5093 36 36 using HeuristicLab.Hive.Tracing; 37 37 using HeuristicLab.PluginInfrastructure.Manager; 38 using System.Web; 39 using System.Web.SessionState; 38 40 39 41 namespace HeuristicLab.Hive.Server.Core { … … 42 44 /// </summary> 43 45 public class SlaveCommunicator : ISlaveCommunicator, IInternalSlaveCommunicator { 44 private static Dictionary<Guid, DateTime> lastHeartbeats = new Dictionary<Guid, DateTime>();45 46 /// <summary>47 /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent48 /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it.49 /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat.50 /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again.51 /// </summary>52 private static Dictionary<Guid, int> newAssignedJobs = new Dictionary<Guid, int>();53 54 /// <summary>55 /// When a slave reconnects and he has finished results waiting it calls IsJobStillNeeded. If the finished56 /// result has not yet been collected from anywhere else, the job will be sent by the slave and the job state is set to Pending.57 /// Now the job be in pending state until it is received from the reconnected slave or the TimeToLive value of this dictionary has reached zero.58 /// </summary>59 private static Dictionary<Guid, int> pendingJobs = new Dictionary<Guid, int>();60 46 private static int PENDING_TIMEOUT = 100; 61 47 … … 74 60 /// </summary> 75 61 public SlaveCommunicator() { 62 Logger.Debug("ServiceCommunicator instantiated"); 76 63 lifecycleManager = ServiceLocator.GetLifecycleManager(); 77 64 jobManager = ServiceLocator.GetJobManager() as IInternalJobManager; … … 97 84 SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId); 98 85 99 if (!l astHeartbeats.ContainsKey(slave.Id)) {100 Logger.Info("No previous hearbeats are available for " + slave. Id + "although it is in state " + slave.State);86 if (!lifecycleManager.LastHeartbeats.ContainsKey(slave.Id)) { 87 Logger.Info("No previous hearbeats are available for " + slave.Name + "(" + slave.Id + "), although it is in state " + slave.State); 101 88 102 89 // add a heartbeat NOW and give the slave time to say something for HEARTBEAT_MAX_DIF 103 90 // otherwise alls the slaves jobs would be aborted, which is not desirable if the server has just been restarted 104 91 heartbeatLock.EnterWriteLock(); 105 l astHeartbeats.Add(slave.Id, DateTime.Now);92 lifecycleManager.LastHeartbeats.Add(slave.Id, DateTime.Now); 106 93 heartbeatLock.ExitWriteLock(); 107 94 } else { 108 DateTime lastHeartbeatOfSlave = l astHeartbeats[slave.Id];95 DateTime lastHeartbeatOfSlave = lifecycleManager.LastHeartbeats[slave.Id]; 109 96 110 97 TimeSpan diff = DateTime.Now.Subtract(lastHeartbeatOfSlave); … … 112 99 if (diff.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) { 113 100 // if slave calculated jobs, the job must be reset 114 Logger.Info("Slave timed out and is on RESET ");101 Logger.Info("Slave timed out and is on RESET (no message for " + diff.TotalSeconds + " seconds.)"); 115 102 foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) { 116 103 DaoLocator.JobDao.SetJobOffline(job); 117 lock ( newAssignedJobs) {118 if ( newAssignedJobs.ContainsKey(job.Id))119 newAssignedJobs.Remove(job.Id);104 lock (lifecycleManager.NewAssignedJobs) { 105 if (lifecycleManager.NewAssignedJobs.ContainsKey(job.Id)) 106 lifecycleManager.NewAssignedJobs.Remove(job.Id); 120 107 } 121 108 } … … 127 114 Logger.Debug("removing it from the heartbeats list"); 128 115 heartbeatLock.EnterWriteLock(); 129 l astHeartbeats.Remove(slave.Id);116 lifecycleManager.LastHeartbeats.Remove(slave.Id); 130 117 heartbeatLock.ExitWriteLock(); 131 118 } … … 148 135 149 136 foreach (JobDto currJob in pendingJobsInDb) { 150 lock ( pendingJobs) {151 if ( pendingJobs.ContainsKey(currJob.Id)) {152 if ( pendingJobs[currJob.Id] <= 0) {137 lock (lifecycleManager.PendingJobs) { 138 if (lifecycleManager.PendingJobs.ContainsKey(currJob.Id)) { 139 if (lifecycleManager.PendingJobs[currJob.Id] <= 0) { 153 140 currJob.State = JobState.Offline; 154 141 DaoLocator.JobDao.Update(currJob); 155 142 } else { 156 pendingJobs[currJob.Id]--;143 lifecycleManager.PendingJobs[currJob.Id]--; 157 144 } 158 145 } … … 173 160 174 161 heartbeatLock.EnterWriteLock(); 175 if (l astHeartbeats.ContainsKey(slave.Id)) {176 l astHeartbeats[slave.Id] = DateTime.Now;162 if (lifecycleManager.LastHeartbeats.ContainsKey(slave.Id)) { 163 lifecycleManager.LastHeartbeats[slave.Id] = DateTime.Now; 177 164 } else { 178 l astHeartbeats.Add(slave.Id, DateTime.Now);165 lifecycleManager.LastHeartbeats.Add(slave.Id, DateTime.Now); 179 166 } 180 167 heartbeatLock.ExitWriteLock(); … … 247 234 using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) { 248 235 SlaveDto slave = UpdateSlaveData(heartbeatData); 236 DaoLocator.SlaveDao.Update(slave); 237 249 238 SaveTimestamp(heartbeatData); 250 239 251 240 //ProcessJobProgress(heartbeatData, response); 252 241 response.ActionRequest = ProcessJobProgress(heartbeatData); 253 242 254 243 //check if new Cal must be loaded 255 244 if (slave.CalendarSyncStatus == CalendarState.Fetch || slave.CalendarSyncStatus == CalendarState.ForceFetch) { … … 270 259 } 271 260 272 DaoLocator.SlaveDao.Update(slave);273 261 scope.Complete(); 274 262 } … … 286 274 } 287 275 288 private staticvoid SaveTimestamp(HeartBeatData heartbeatData) {276 private void SaveTimestamp(HeartBeatData heartbeatData) { 289 277 heartbeatLock.EnterWriteLock(); 290 if (l astHeartbeats.ContainsKey(heartbeatData.SlaveId)) {291 l astHeartbeats[heartbeatData.SlaveId] = DateTime.Now;278 if (lifecycleManager.LastHeartbeats.ContainsKey(heartbeatData.SlaveId)) { 279 lifecycleManager.LastHeartbeats[heartbeatData.SlaveId] = DateTime.Now; 292 280 } else { 293 l astHeartbeats.Add(heartbeatData.SlaveId, DateTime.Now);281 lifecycleManager.LastHeartbeats.Add(heartbeatData.SlaveId, DateTime.Now); 294 282 } 295 283 heartbeatLock.ExitWriteLock(); … … 316 304 317 305 // find all the jobs in jobProgress which are not in the database -> they are not supposed to be calculated by this slave 318 IEnumerable<Guid> jobsToAbort = GetJobsNotInDatabase(heartbeatData.SlaveId, heartbeatData.JobProgress.Keys); 319 foreach (Guid jobId in jobsToAbort) { 320 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId)); 321 heartbeatData.JobProgress.Remove(jobId); 322 } 306 //IEnumerable<Guid> jobsToAbort = GetJobsNotCalculatedByThisSlave(heartbeatData.SlaveId, heartbeatData.JobProgress.Keys); 307 //foreach (Guid jobId in jobsToAbort) { 308 // Logger.Error("Job shall not be caculated by this slave or does not exist in DB: " + jobId); 309 // actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId)); 310 // heartbeatData.JobProgress.Remove(jobId); 311 //} 323 312 324 313 // process all the remaining jobProgresses … … 327 316 if (curJob == null) { 328 317 // job does not exist in db 318 Logger.Error("Job does not exist in DB: " + jobProgress.Key); 329 319 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key)); 330 Logger.Error("Job does not exist in DB: " + jobProgress.Key);331 320 } else { 332 321 curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id); 322 Guid id = curJob.Slave != null ? curJob.Slave.Id : Guid.Empty; 333 323 if (curJob.Slave == null || curJob.Slave.Id != heartbeatData.SlaveId) { 334 324 // assigned slave does not match heartbeat 325 Logger.Error("The slave " + heartbeatData.SlaveId + " is not supposed to calculate Job: " + curJob.ToString()); 335 326 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id)); 336 Logger.Error("The slave " + heartbeatData.SlaveId + " is not supposed to calculate Job: " + curJob);337 327 } else { 338 328 // save job execution time … … 341 331 if (curJob.State == JobState.Aborted) { 342 332 // a request to abort the job has been set 333 Logger.Error("Job is in state aborted, send AbortJob: " + curJob.Id); 343 334 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id)); 344 335 } else if (curJob.State == JobState.SnapshotRequested) { … … 355 346 foreach (JobDto currJob in jobsOfSlave) { 356 347 if (heartbeatData.JobProgress.ContainsKey(currJob.Id)) { 357 lock ( newAssignedJobs) {358 if ( newAssignedJobs.ContainsKey(currJob.Id)) {348 lock (lifecycleManager.NewAssignedJobs) { 349 if (lifecycleManager.NewAssignedJobs.ContainsKey(currJob.Id)) { 359 350 Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob); 360 newAssignedJobs.Remove(currJob.Id);351 lifecycleManager.NewAssignedJobs.Remove(currJob.Id); 361 352 } 362 353 } 363 354 } else { 364 lock ( newAssignedJobs) {365 if ( newAssignedJobs.ContainsKey(currJob.Id)) {366 newAssignedJobs[currJob.Id]--;367 Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave);368 if ( newAssignedJobs[currJob.Id] <= 0) {355 lock (lifecycleManager.NewAssignedJobs) { 356 if (lifecycleManager.NewAssignedJobs.ContainsKey(currJob.Id)) { 357 lifecycleManager.NewAssignedJobs[currJob.Id]--; 358 Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + lifecycleManager.NewAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave); 359 if (lifecycleManager.NewAssignedJobs[currJob.Id] <= 0) { 369 360 Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Slave); 370 361 … … 374 365 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id)); 375 366 376 newAssignedJobs.Remove(currJob.Id);367 lifecycleManager.NewAssignedJobs.Remove(currJob.Id); 377 368 } 378 369 } else { … … 388 379 389 380 /// <summary> 390 /// Returns the jobIds of the jobs which are not assigned to this slave in the database 391 /// </summary> 392 private IEnumerable<Guid> GetJobsNot InDatabase(Guid slaveId, IEnumerable<Guid> jobIds) {381 /// Returns the jobIds of the jobs which are not assigned to this slave in the database (they either are not stored in DB or they are assigned 382 /// </summary> 383 private IEnumerable<Guid> GetJobsNotCalculatedByThisSlave(Guid slaveId, IEnumerable<Guid> jobIds) { 393 384 IEnumerable<Guid> activeJobsOfSlave = DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(slaveId)).Select(j => j.Id); 394 385 return jobIds.Except(activeJobsOfSlave).ToList(); … … 520 511 521 512 Logger.Info("Job pulled: " + job2Calculate + " for user " + slaveId); 522 lock ( newAssignedJobs) {523 if (! newAssignedJobs.ContainsKey(job2Calculate.Id))524 newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);513 lock (lifecycleManager.NewAssignedJobs) { 514 if (!lifecycleManager.NewAssignedJobs.ContainsKey(job2Calculate.Id)) 515 lifecycleManager.NewAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE); 525 516 } 526 517 } else { … … 701 692 702 693 heartbeatLock.EnterWriteLock(); 703 if (l astHeartbeats.ContainsKey(slaveId))704 l astHeartbeats.Remove(slaveId);694 if (lifecycleManager.LastHeartbeats.ContainsKey(slaveId)) 695 lifecycleManager.LastHeartbeats.Remove(slaveId); 705 696 heartbeatLock.ExitWriteLock(); 706 697 … … 745 736 } 746 737 job.State = JobState.Pending; 747 lock ( pendingJobs) {748 pendingJobs.Add(job.Id, PENDING_TIMEOUT);738 lock (lifecycleManager.PendingJobs) { 739 lifecycleManager.PendingJobs.Add(job.Id, PENDING_TIMEOUT); 749 740 } 750 741 … … 765 756 if (ipd != null) { 766 757 response.List.Add(ConvertPluginDescriptorToDto(ipd)); 767 } else {768 response.StatusMessage = ResponseStatus.GetPlugins_PluginsNotAvailable;769 return response;770 758 } 771 759 }
Note: See TracChangeset
for help on using the changeset viewer.