Changeset 4423 for branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/SlaveCommunicator.cs
- Timestamp:
- 09/17/10 10:26:55 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/SlaveCommunicator.cs
r4368 r4423 45 45 /// The SlaveCommunicator manages the whole communication with the slave 46 46 /// </summary> 47 public class SlaveCommunicator : ISlaveCommunicator, 48 IInternalSlaveCommunicator { 47 public class SlaveCommunicator : ISlaveCommunicator, IInternalSlaveCommunicator { 49 48 private static Dictionary<Guid, DateTime> lastHeartbeats = new Dictionary<Guid, DateTime>(); 49 50 /// <summary> 51 /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent 52 /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it. 53 /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat. 54 /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again. 55 /// </summary> 50 56 private static Dictionary<Guid, int> newAssignedJobs = new Dictionary<Guid, int>(); 57 58 /// <summary> 59 /// When a slave reconnects and he has finished results waiting it calls IsJobStillNeeded. If the finished 60 /// result has not yet been collected from anywhere else, the job will be sent by the slave and the job state is set to Pending. 61 /// Now the job be in pending state until it is received from the reconnected slave or the TimeToLive value of this dictionary has reached zero. 62 /// </summary> 51 63 private static Dictionary<Guid, int> pendingJobs = new Dictionary<Guid, int>(); 64 private static int PENDING_TIMEOUT = 100; 52 65 53 66 private static ReaderWriterLockSlim heartbeatLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); 54 67 55 //private ISessionFactory factory;56 68 private ILifecycleManager lifecycleManager; 57 69 private IInternalJobManager jobManager; 58 70 private IScheduler scheduler; 59 71 60 private static int PENDING_TIMEOUT = 100;72 private static object slaveLocker = new object(); 61 73 62 74 /// <summary> … … 66 78 /// </summary> 67 79 public SlaveCommunicator() { 68 //factory = ServiceLocator.GetSessionFactory();69 70 80 lifecycleManager = ServiceLocator.GetLifecycleManager(); 71 81 jobManager = ServiceLocator.GetJobManager() as IInternalJobManager; 72 82 scheduler = ServiceLocator.GetScheduler(); 73 83 74 lifecycleManager. RegisterHeartbeat(new EventHandler(lifecycleManager_OnServerHeartbeat));84 lifecycleManager.ServerHeartbeat += new EventHandler(lifecycleManager_OnServerHeartbeat); 75 85 } 76 86 … … 82 92 /// <param name="e"></param> 83 93 void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) { 84 Logger.Debug("Server Heartbeat ticked"); 85 86 // [chn] why is transaction management done here 87 using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) { 88 List<SlaveDto> allSlaves = new List<SlaveDto>(DaoLocator.SlaveDao.FindAll()); 89 90 foreach (SlaveDto slave in allSlaves) { 91 if (slave.State != SlaveState.Offline && slave.State != SlaveState.NullState) { 94 // this block can conflict with the heartbeats from a slave (which also updates the slave-records) 95 lock (slaveLocker) { 96 Logger.Debug("Server Heartbeat ticked"); 97 List<Guid> slaveIds = DaoLocator.SlaveDao.FindAll().Select(s => s.Id).ToList(); 98 99 foreach (Guid slaveId in slaveIds) { 100 using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) { 92 101 heartbeatLock.EnterUpgradeableReadLock(); 102 SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId); 93 103 94 104 if (!lastHeartbeats.ContainsKey(slave.Id)) { 95 Logger.Info("Slave " + slave.Id + 96 " wasn't offline but hasn't sent heartbeats - setting offline"); 97 slave.State = SlaveState.Offline; 98 DaoLocator.SlaveDao.Update(slave); 99 Logger.Info("Slave " + slave.Id + 100 " wasn't offline but hasn't sent heartbeats - Resetting all his jobs"); 101 foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) { 102 //maybe implementa n additional Watchdog? Till then, just set them offline.. 103 DaoLocator.JobDao.SetJobOffline(job); 104 } 105 Logger.Info("No previous hearbeats are available for " + slave.Id + " although it is in state " + slave.State); 106 107 // add a heartbeat NOW and give the slave time to say something for HEARTBEAT_MAX_DIF 108 // otherwise alls the slaves jobs would be aborted, which is not desirable if the server has just been restarted 109 heartbeatLock.EnterWriteLock(); 110 lastHeartbeats.Add(slave.Id, DateTime.Now); 111 heartbeatLock.ExitWriteLock(); 105 112 } else { 106 DateTime lastH bOfSlave = lastHeartbeats[slave.Id];107 108 TimeSpan dif = DateTime.Now.Subtract(lastHbOfSlave);113 DateTime lastHeartbeatOfSlave = lastHeartbeats[slave.Id]; 114 115 TimeSpan diff = DateTime.Now.Subtract(lastHeartbeatOfSlave); 109 116 // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF 110 if (dif .TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {117 if (diff.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) { 111 118 // if slave calculated jobs, the job must be reset 112 119 Logger.Info("Slave timed out and is on RESET"); … … 119 126 } 120 127 Logger.Debug("setting slave offline"); 121 // slave must be set offline122 128 slave.State = SlaveState.Offline; 123 129 124 //slaveAdapter.Update(slave);125 130 DaoLocator.SlaveDao.Update(slave); 126 131 … … 131 136 } 132 137 } 133 134 138 heartbeatLock.ExitUpgradeableReadLock(); 135 } else { 136 //TODO: RLY neccesary? 137 //HiveLogger.Info(this.ToString() + ": Slave " + slave.Id + " has wrong state: Shouldn't have offline or nullstate, has " + slave.State); 138 heartbeatLock.EnterWriteLock(); 139 //HiveLogger.Info(this.ToString() + ": Slave " + slave.Id + " has wrong state: Resetting all his jobs"); 140 if (lastHeartbeats.ContainsKey(slave.Id)) 141 lastHeartbeats.Remove(slave.Id); 142 foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) { 143 DaoLocator.JobDao.SetJobOffline(job); 144 } 145 heartbeatLock.ExitWriteLock(); 146 } 147 } 148 CheckForPendingJobs(); 149 // DaoLocator.DestroyContext(); 150 scope.Complete(); 151 } 139 scope.Complete(); 140 } // using 141 } // foreach 142 143 using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) { 144 CheckForPendingJobs(); 145 scope.Complete(); 146 } 147 Logger.Debug("Server Heartbeat ended"); 148 } // lock 152 149 } 153 150 154 151 private void CheckForPendingJobs() { 155 IList<JobDto> pendingJobsInD B= new List<JobDto>(DaoLocator.JobDao.GetJobsByState(JobState.Pending));156 157 foreach (JobDto currJob in pendingJobsInD B) {152 IList<JobDto> pendingJobsInDb = new List<JobDto>(DaoLocator.JobDao.GetJobsByState(JobState.Pending)); 153 154 foreach (JobDto currJob in pendingJobsInDb) { 158 155 lock (pendingJobs) { 159 156 if (pendingJobs.ContainsKey(currJob.Id)) { … … 192 189 slave.CalendarSyncStatus = dbSlave != null ? dbSlave.CalendarSyncStatus : CalendarState.NotAllowedToFetch; 193 190 slave.State = SlaveState.Idle; 194 195 if (dbSlave == null) 191 slave.Login = DateTime.Now; 192 193 if (dbSlave == null) { 196 194 DaoLocator.SlaveDao.Insert(slave); 197 else {195 } else { 198 196 DaoLocator.SlaveDao.Update(slave); 199 197 } … … 207 205 SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId); 208 206 if (slave == null) { 209 //response.Success = false; 210 response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound; 207 response.StatusMessage = ResponseStatus.GetCalendar_ResourceDoesNotExist; 211 208 return response; 212 209 } … … 217 214 if (appointments.Count() == 0) { 218 215 response.StatusMessage = ResponseStatus.GetCalendar_NoCalendarFound; 219 //response.Success = false;220 216 } else { 221 //response.Success = true;222 217 response.Appointments = appointments; 223 218 } … … 232 227 SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId); 233 228 if (slave == null) { 234 //response.Success = false; 235 response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound; 229 response.StatusMessage = ResponseStatus.GetCalendar_ResourceDoesNotExist; 236 230 return response; 237 231 } … … 251 245 /// <returns></returns> 252 246 public ResponseHeartBeat ProcessHeartBeat(HeartBeatData heartbeatData) { 253 Logger.Debug("BEGIN Processing Heartbeat for Slave " + heartbeatData.SlaveId); 254 255 ResponseHeartBeat response = new ResponseHeartBeat(); 256 response.ActionRequest = new List<MessageContainer>(); 257 258 Logger.Debug("BEGIN Started Slave Fetching"); 259 SlaveDto slave = DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId); 260 Logger.Debug("END Finished Slave Fetching"); 261 262 slave.NrOfFreeCores = heartbeatData.FreeCores; 263 slave.FreeMemory = heartbeatData.FreeMemory; 264 slave.IsAllowedToCalculate = heartbeatData.IsAllowedToCalculate; 265 266 // check if the slave is logged in 267 if (slave.State == SlaveState.Offline || slave.State == SlaveState.NullState) { 268 response.StatusMessage = ResponseStatus.ProcessHeartBeat_UserNotLoggedIn; 269 response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage)); 270 Logger.Error("ProcessHeartBeat: Slave state null or offline: " + slave); 271 return response; 272 } 273 274 // save timestamp of this heartbeat 275 Logger.Debug("BEGIN Locking for Heartbeats"); 247 lock (slaveLocker) { 248 Logger.Debug("BEGIN Processing Heartbeat for Slave " + heartbeatData.SlaveId); 249 ResponseHeartBeat response = new ResponseHeartBeat(); 250 response.ActionRequest = new List<MessageContainer>(); 251 252 using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) { 253 SlaveDto slave = UpdateSlaveData(heartbeatData); 254 SaveTimestamp(heartbeatData); 255 256 //ProcessJobProgress(heartbeatData, response); 257 response.ActionRequest = ProcessJobProgress(heartbeatData); 258 259 //check if new Cal must be loaded 260 if (slave.CalendarSyncStatus == CalendarState.Fetch || slave.CalendarSyncStatus == CalendarState.ForceFetch) { 261 response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar)); 262 Logger.Info("fetch or forcefetch sent"); 263 } 264 265 // check if slave has a free core for a new job 266 // if true, ask scheduler for a new job for this slave 267 Logger.Debug(" BEGIN Looking for Slave Jobs"); 268 if (this.IsAllowedToSendJobs() && 269 slave.IsAllowedToCalculate && 270 heartbeatData.FreeCores > 0 && 271 scheduler.ExistsJobForSlave(heartbeatData)) { 272 response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob)); 273 } else { 274 response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage)); 275 } 276 277 DaoLocator.SlaveDao.Update(slave); 278 scope.Complete(); 279 } 280 Logger.Debug(" END Processed Heartbeat for Slave " + heartbeatData.SlaveId); 281 return response; 282 } 283 } 284 285 /// <summary> 286 /// Returns true if there are enough resources to send a job 287 /// There should not be too many jobs sent simultaniously 288 /// </summary> 289 private bool IsAllowedToSendJobs() { 290 return lifecycleManager.JobsCurrentlyTransferring < ApplicationConstants.MAX_JOB_TRANSFER_COUNT; 291 } 292 293 private static void SaveTimestamp(HeartBeatData heartbeatData) { 276 294 heartbeatLock.EnterWriteLock(); 277 Logger.Debug("END Locked for Heartbeats");278 295 if (lastHeartbeats.ContainsKey(heartbeatData.SlaveId)) { 279 296 lastHeartbeats[heartbeatData.SlaveId] = DateTime.Now; … … 282 299 } 283 300 heartbeatLock.ExitWriteLock(); 284 285 Logger.Debug("BEGIN Processing Heartbeat Jobs"); 286 ProcessJobProcess(heartbeatData, response); 287 Logger.Debug("END Processed Heartbeat Jobs"); 288 289 //check if new Cal must be loaded 290 if (slave.CalendarSyncStatus == CalendarState.Fetch || slave.CalendarSyncStatus == CalendarState.ForceFetch) { 291 response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar)); 292 //slave.CalendarSyncStatus = CalendarState.Fetching; 293 Logger.Info("fetch or forcefetch sent"); 294 } 295 296 // check if slave has a free core for a new job 297 // if true, ask scheduler for a new job for this slave 298 Logger.Debug(" BEGIN Looking for Slave Jobs"); 299 if (slave.IsAllowedToCalculate && heartbeatData.FreeCores > 0 && scheduler.ExistsJobForSlave(heartbeatData)) { 300 response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob)); 301 } else { 302 response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage)); 303 } 304 Logger.Debug(" END Looked for Slave Jobs"); 305 306 DaoLocator.SlaveDao.Update(slave); 307 308 //tx.Commit(); 309 Logger.Debug(" END Processed Heartbeat for Slave " + heartbeatData.SlaveId); 310 return response; 301 } 302 303 private SlaveDto UpdateSlaveData(HeartBeatData heartbeatData) { 304 SlaveDto slave = DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId); 305 if (slave == null) { 306 slave = new SlaveDto() { Id = heartbeatData.SlaveId }; 307 Login(slave); 308 slave = DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId); 309 } 310 311 slave.NrOfFreeCores = heartbeatData.FreeCores; 312 slave.FreeMemory = heartbeatData.FreeMemory; 313 slave.IsAllowedToCalculate = heartbeatData.IsAllowedToCalculate; 314 315 slave.State = heartbeatData.JobProgress.Count > 0 ? SlaveState.Calculating : SlaveState.Idle; 316 return slave; 317 } 318 319 private List<MessageContainer> ProcessJobProgress(HeartBeatData heartbeatData) { 320 List<MessageContainer> actions = new List<MessageContainer>(); 321 322 // find all the jobs in jobProgress which are not in the database -> they are not supposed to be calculated by this slave 323 IEnumerable<Guid> jobsToAbort = GetJobsNotInDatabase(heartbeatData.SlaveId, heartbeatData.JobProgress.Keys); 324 foreach (Guid jobId in jobsToAbort) { 325 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId)); 326 heartbeatData.JobProgress.Remove(jobId); 327 } 328 329 // process all the remaining jobProgresses 330 foreach (var jobProgress in heartbeatData.JobProgress) { 331 JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key); 332 if (curJob == null) { 333 // job does not exist in db 334 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key)); 335 Logger.Error("Job does not exist in DB: " + jobProgress.Key); 336 } else { 337 curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id); 338 if (curJob.Slave == null || curJob.Slave.Id != heartbeatData.SlaveId) { 339 // assigned slave does not match heartbeat 340 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id)); 341 Logger.Error("The slave " + heartbeatData.SlaveId + " is not supposed to calculate Job: " + curJob); 342 } else { 343 // save job execution time 344 curJob.ExecutionTime = jobProgress.Value; 345 346 if (curJob.State == JobState.Aborted) { 347 // a request to abort the job has been set 348 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id)); 349 } else if (curJob.State == JobState.SnapshotRequested) { 350 // a request for a snapshot has been set 351 actions.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id)); 352 curJob.State = JobState.SnapshotSent; 353 } 354 } 355 } 356 DaoLocator.JobDao.Update(curJob); 357 } 358 359 var jobsOfSlave = DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId)); 360 foreach (JobDto currJob in jobsOfSlave) { 361 if (heartbeatData.JobProgress.ContainsKey(currJob.Id)) { 362 lock (newAssignedJobs) { 363 if (newAssignedJobs.ContainsKey(currJob.Id)) { 364 Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob); 365 newAssignedJobs.Remove(currJob.Id); 366 } 367 } 368 } else { 369 lock (newAssignedJobs) { 370 if (newAssignedJobs.ContainsKey(currJob.Id)) { 371 newAssignedJobs[currJob.Id]--; 372 Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave); 373 if (newAssignedJobs[currJob.Id] <= 0) { 374 Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Slave); 375 376 currJob.State = JobState.Offline; 377 DaoLocator.JobDao.Update(currJob); 378 379 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id)); 380 381 newAssignedJobs.Remove(currJob.Id); 382 } 383 } else { 384 Logger.Error("Job ID wasn't with the heartbeats: " + currJob); 385 currJob.State = JobState.Offline; 386 DaoLocator.JobDao.Update(currJob); 387 } 388 } // lock 389 } 390 } 391 return actions; 392 } 393 394 /// <summary> 395 /// Returns the jobIds of the jobs which are not assigned to this slave in the database 396 /// </summary> 397 private IEnumerable<Guid> GetJobsNotInDatabase(Guid slaveId, IEnumerable<Guid> jobIds) { 398 IEnumerable<Guid> activeJobsOfSlave = DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(slaveId)).Select(j => j.Id); 399 return jobIds.Except(activeJobsOfSlave).ToList(); 311 400 } 312 401 … … 333 422 /// (2) state Abort results in Finished. This should be: AbortRequested, Aborted. 334 423 /// </summary> 335 /// <param name="hbData"></param> 336 /// <param name="jobAdapter"></param> 337 /// <param name="slaveAdapter"></param> 424 /// <param name="heartbeatData"></param> 338 425 /// <param name="response"></param> 339 private void ProcessJobProcess(HeartBeatData hbData, ResponseHeartBeat response) { 340 Logger.Debug("Started for Slave " + hbData.SlaveId); 341 List<JobDto> jobsOfSlave = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(hbData.SlaveId))); 342 if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) { 343 if (jobsOfSlave == null || jobsOfSlave.Count == 0) { 344 foreach (Guid jobId in hbData.JobProgress.Keys) { 345 response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId)); 346 } 347 348 Logger.Error("There is no job calculated by this user " + hbData.SlaveId + ", advise him to abort all"); 349 return; 350 } 351 352 foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) { 353 JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key); 354 if (curJob == null) { 355 response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist; 356 response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key)); 357 Logger.Error("Job does not exist in DB: " + jobProgress.Key); 358 return; 359 } 360 curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id); 361 if (curJob.Slave == null || curJob.Slave.Id != hbData.SlaveId) { 362 response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id)); 363 Logger.Error("There is no job calculated by this user " + hbData.SlaveId + " Job: " + curJob); 364 } else if (curJob.State == JobState.Aborted) { 365 // a request to abort the job has been set 366 response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id)); 367 curJob.State = JobState.Finished; 368 } else { 369 // save job progress 370 curJob.Percentage = jobProgress.Value; 371 372 if (curJob.State == JobState.SnapshotRequested) { 373 // a request for a snapshot has been set 374 response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id)); 375 curJob.State = JobState.SnapshotSent; 376 } 377 } 378 DaoLocator.JobDao.Update(curJob); 379 } 380 } 381 foreach (JobDto currJob in jobsOfSlave) { 382 bool found = false; 383 if (hbData.JobProgress != null) { 384 foreach (Guid jobId in hbData.JobProgress.Keys) { 385 if (jobId == currJob.Id) { 386 found = true; 387 break; 388 } 389 } 390 } 391 if (!found) { 392 lock (newAssignedJobs) { 393 if (newAssignedJobs.ContainsKey(currJob.Id)) { 394 newAssignedJobs[currJob.Id]--; 395 Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave); 396 if (newAssignedJobs[currJob.Id] <= 0) { 397 Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Slave); 398 399 currJob.State = JobState.Offline; 400 DaoLocator.JobDao.Update(currJob); 401 402 response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id)); 403 404 newAssignedJobs.Remove(currJob.Id); 405 } 406 } else { 407 Logger.Error("Job ID wasn't with the heartbeats: " + currJob); 408 currJob.State = JobState.Offline; 409 DaoLocator.JobDao.Update(currJob); 410 } 411 } // lock 412 } else { 413 lock (newAssignedJobs) { 414 415 if (newAssignedJobs.ContainsKey(currJob.Id)) { 416 Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob); 417 newAssignedJobs.Remove(currJob.Id); 418 } 419 } 420 } 421 } 422 } 426 //private void ProcessJobProgress(HeartBeatData heartbeatData, ResponseHeartBeat response) { 427 // Logger.Debug("Started for Slave " + heartbeatData.SlaveId); 428 // List<JobDto> jobsOfSlave = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId))); 429 // if (heartbeatData.JobProgress != null && heartbeatData.JobProgress.Count > 0) { 430 // if (jobsOfSlave == null || jobsOfSlave.Count == 0) { 431 // foreach (Guid jobId in heartbeatData.JobProgress.Keys) { 432 // response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId)); 433 // } 434 435 // Logger.Error("There is no job calculated by this user " + heartbeatData.SlaveId + ", advise him to abort all"); 436 // return; 437 // } 438 439 // foreach (KeyValuePair<Guid, TimeSpan> jobProgress in heartbeatData.JobProgress) { 440 // JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key); 441 // if (curJob == null) { 442 // response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist; 443 // response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key)); 444 // Logger.Error("Job does not exist in DB: " + jobProgress.Key); 445 // return; 446 // } 447 // curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id); 448 // if (curJob.Slave == null || curJob.Slave.Id != heartbeatData.SlaveId) { 449 // response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id)); 450 // Logger.Error("There is no job calculated by this user " + heartbeatData.SlaveId + " Job: " + curJob); 451 // } else if (curJob.State == JobState.Aborted) { 452 // // a request to abort the job has been set 453 // response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id)); 454 // curJob.State = JobState.Finished; 455 // } else { 456 // // save job progress 457 // curJob.ExecutionTime = jobProgress.Value; 458 459 // if (curJob.State == JobState.SnapshotRequested) { 460 // // a request for a snapshot has been set 461 // response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id)); 462 // curJob.State = JobState.SnapshotSent; 463 // } 464 // } 465 // DaoLocator.JobDao.Update(curJob); 466 // } 467 // } 468 469 // foreach (JobDto currJob in jobsOfSlave) { 470 // bool found = false; 471 // if (heartbeatData.JobProgress != null) { 472 // foreach (Guid jobId in heartbeatData.JobProgress.Keys) { 473 // if (jobId == currJob.Id) { 474 // found = true; 475 // break; 476 // } 477 // } 478 // } 479 480 // if (!found) { 481 // lock (newAssignedJobs) { 482 // if (newAssignedJobs.ContainsKey(currJob.Id)) { 483 // newAssignedJobs[currJob.Id]--; 484 // Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave); 485 // if (newAssignedJobs[currJob.Id] <= 0) { 486 // Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Slave); 487 488 // currJob.State = JobState.Offline; 489 // DaoLocator.JobDao.Update(currJob); 490 491 // response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id)); 492 493 // newAssignedJobs.Remove(currJob.Id); 494 // } 495 // } else { 496 // Logger.Error("Job ID wasn't with the heartbeats: " + currJob); 497 // currJob.State = JobState.Offline; 498 // DaoLocator.JobDao.Update(currJob); 499 // } 500 // } // lock 501 // } else { 502 // lock (newAssignedJobs) { 503 // if (newAssignedJobs.ContainsKey(currJob.Id)) { 504 // Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob); 505 // newAssignedJobs.Remove(currJob.Id); 506 // } 507 // } 508 // } 509 // } 510 //} 423 511 424 512 /// <summary> … … 466 554 //tx = session.BeginTransaction(); 467 555 468 ResponseResultReceived response = ProcessJobResult(result.SlaveId, result.Id, new byte[] { }, result. Percentage, result.Exception, finished);556 ResponseResultReceived response = ProcessJobResult(result.SlaveId, result.Id, new byte[] { }, result.ExecutionTime, result.Exception, finished); 469 557 470 558 if (response.StatusMessage == ResponseStatus.Ok) { … … 495 583 } 496 584 497 private ResponseResultReceived ProcessJobResult(Guid slaveId, Guid jobId, byte[] result, double? percentage, string exception, bool finished) {585 private ResponseResultReceived ProcessJobResult(Guid slaveId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) { 498 586 Logger.Info("BEGIN Job received for Storage - SUB method: " + jobId); 499 587 … … 511 599 512 600 if (job != null && job.JobInfo == null) { 513 //response.Success = false;514 601 response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist; 515 602 response.JobId = jobId; … … 520 607 } 521 608 if (job.JobInfo.State == JobState.Aborted) { 522 //response.Success = false;523 609 response.StatusMessage = ResponseStatus.ProcessJobResult_JobAborted; 524 610 … … 529 615 } 530 616 if (job.JobInfo.Slave == null) { 531 //response.Success = false;532 617 response.StatusMessage = ResponseStatus.ProcessJobResult_JobIsNotBeeingCalculated; 533 618 response.JobId = jobId; … … 562 647 } 563 648 if (job.JobInfo.State != JobState.Calculating && job.JobInfo.State != JobState.Pending) { 564 //response.Success = false;565 649 response.StatusMessage = ResponseStatus.ProcessJobResult_InvalidJobState; 566 650 response.JobId = jobId; … … 571 655 return response; 572 656 } 573 job.JobInfo. Percentage = percentage;657 job.JobInfo.ExecutionTime = executionTime; 574 658 575 659 if (!string.IsNullOrEmpty(exception)) { … … 600 684 /// these job results will be stored in the database 601 685 /// </summary> 602 /// <param name="slaveId"></param> 603 /// <param name="jobId"></param> 604 /// <param name="result"></param> 605 /// <param name="exception"></param> 606 /// <param name="finished"></param> 607 /// <returns></returns> 608 public ResponseResultReceived StoreFinishedJobResult(Guid slaveId, 609 Guid jobId, 610 byte[] result, 611 double percentage, 612 string exception) { 613 614 return ProcessJobResult(slaveId, jobId, result, percentage, exception, true); 615 } 616 617 public ResponseResultReceived ProcessSnapshot(Guid slaveId, Guid jobId, byte[] result, double percentage, string exception) { 618 return ProcessJobResult(slaveId, jobId, result, percentage, exception, false); 686 public ResponseResultReceived StoreFinishedJobResult(Guid slaveId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) { 687 return ProcessJobResult(slaveId, jobId, result, executionTime, exception, true); 688 } 689 690 public ResponseResultReceived ProcessSnapshot(Guid slaveId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) { 691 return ProcessJobResult(slaveId, jobId, result, executionTime, exception, false); 619 692 } 620 693 … … 637 710 SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId); 638 711 if (slave == null) { 639 //response.Success = false;640 712 response.StatusMessage = ResponseStatus.Logout_SlaveNotRegistered; 641 713 return response; … … 666 738 JobDto job = DaoLocator.JobDao.FindById(jobId); 667 739 if (job == null) { 668 //response.Success = false;669 740 response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobDoesNotExist; 670 741 Logger.Error("Job doesn't exist (anymore)! " + jobId); … … 672 743 } 673 744 if (job.State == JobState.Finished) { 674 //response.Success = true;675 745 response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobAlreadyFinished; 676 746 Logger.Error("already finished! " + job); … … 702 772 response.List.Add(ConvertPluginDescriptorToDto(ipd)); 703 773 } else { 704 //response.Success = false;705 774 response.StatusMessage = ResponseStatus.GetPlugins_PluginsNotAvailable; 706 775 return response; … … 722 791 return currCachedPlugin; 723 792 } 724 793 725 794 #endregion 726 795 }
Note: See TracChangeset
for help on using the changeset viewer.