Changeset 5093 for branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core
- Timestamp:
- 12/13/10 14:13:15 (13 years ago)
- Location:
- branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/CreateHiveDatabaseApplication.cs
r4424 r5093 25 25 26 26 namespace HeuristicLab.Hive.Server.Core { 27 [Application("Create Hive Database", "Creates new empty Hive Database.", true)]27 [Application("Create Hive-3.3 Database", "Creates new empty Hive Database.", true)] 28 28 class CreateHiveDatabaseApplication : ApplicationBase { 29 29 … … 38 38 } 39 39 } 40 41 [Application("Test something")] 42 class TestApplication : ApplicationBase { 43 44 public override void Run() { 45 IContextFactory contextFactory = ServiceLocator.GetContextFactory(); 46 using (contextFactory.GetContext(false)) { 47 var job = DaoLocator.JobDao.FindWithLimitations(Contracts.BusinessObjects.JobState.Offline, 0, 1); 48 49 var jobs = DaoLocator.JobDao.FindAll(); 50 51 } 52 } 53 } 40 54 } -
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/DefaultScheduler.cs
r4424 r5093 47 47 /// Critical section /// 48 48 JobDto jobToCalculate = null; 49 using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE})) {49 using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = IsolationLevel.ReadCommitted })) { 50 50 SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId); 51 51 -
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/JobManager.cs
r4710 r5093 50 50 Logger.Info("Searching for dead Jobs"); 51 51 52 List<JobDto> allJobs = new List<JobDto>(DaoLocator.JobDao.FindAll());52 IEnumerable<JobDto> allJobs = DaoLocator.JobDao.FindAll(); 53 53 foreach (JobDto curJob in allJobs) { 54 54 if (curJob.State != JobState.Calculating && … … 69 69 70 70 private void lifecycleManager_Stopped(object sender, EventArgs e) { 71 Logger.Info("St artupEvent Fired, Checking DB for consistency");71 Logger.Info("Stopped Event Fired, Checking DB for consistency"); 72 72 CheckForDeadJobs(); 73 Logger.Info("St artupEvent Done");73 Logger.Info("Stopped Event Done"); 74 74 } 75 75 … … 276 276 /// </summary> 277 277 public Response AbortJob(Guid jobId) { 278 Logger.Debug("JobManager.AbortJob: " + jobId); 278 279 Response response = new Response(); 279 280 -
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/LifecycleManager.cs
r5000 r5093 35 35 private bool shutdownRequested; 36 36 private PluginManager pm; 37 38 private Dictionary<Guid, DateTime> lastHeartbeats = new Dictionary<Guid, DateTime>(); 39 public Dictionary<Guid, DateTime> LastHeartbeats { 40 get { return lastHeartbeats; } 41 } 42 43 /// <summary> 44 /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent 45 /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it. 46 /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat. 47 /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again. 48 /// </summary> 49 private Dictionary<Guid, int> newAssignedJobs = new Dictionary<Guid, int>(); 50 public Dictionary<Guid, int> NewAssignedJobs { 51 get { return newAssignedJobs; } 52 } 53 54 /// <summary> 55 /// When a slave reconnects and he has finished results waiting it calls IsJobStillNeeded. If the finished 56 /// result has not yet been collected from anywhere else, the job will be sent by the slave and the job state is set to Pending. 57 /// Now the job be in pending state until it is received from the reconnected slave or the TimeToLive value of this dictionary has reached zero. 58 /// </summary> 59 private Dictionary<Guid, int> pendingJobs = new Dictionary<Guid, int>(); 60 public Dictionary<Guid, int> PendingJobs { 61 get { return pendingJobs; } 62 } 37 63 38 64 private TimeSpan interval = new TimeSpan(0, 0, 10); -
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/SlaveCommunicator.cs
r5000 r5093 36 36 using HeuristicLab.Hive.Tracing; 37 37 using HeuristicLab.PluginInfrastructure.Manager; 38 using System.Web; 39 using System.Web.SessionState; 38 40 39 41 namespace HeuristicLab.Hive.Server.Core { … … 42 44 /// </summary> 43 45 public class SlaveCommunicator : ISlaveCommunicator, IInternalSlaveCommunicator { 44 private static Dictionary<Guid, DateTime> lastHeartbeats = new Dictionary<Guid, DateTime>();45 46 /// <summary>47 /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent48 /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it.49 /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat.50 /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again.51 /// </summary>52 private static Dictionary<Guid, int> newAssignedJobs = new Dictionary<Guid, int>();53 54 /// <summary>55 /// When a slave reconnects and he has finished results waiting it calls IsJobStillNeeded. If the finished56 /// result has not yet been collected from anywhere else, the job will be sent by the slave and the job state is set to Pending.57 /// Now the job be in pending state until it is received from the reconnected slave or the TimeToLive value of this dictionary has reached zero.58 /// </summary>59 private static Dictionary<Guid, int> pendingJobs = new Dictionary<Guid, int>();60 46 private static int PENDING_TIMEOUT = 100; 61 47 … … 74 60 /// </summary> 75 61 public SlaveCommunicator() { 62 Logger.Debug("ServiceCommunicator instantiated"); 76 63 lifecycleManager = ServiceLocator.GetLifecycleManager(); 77 64 jobManager = ServiceLocator.GetJobManager() as IInternalJobManager; … … 97 84 SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId); 98 85 99 if (!l astHeartbeats.ContainsKey(slave.Id)) {100 Logger.Info("No previous hearbeats are available for " + slave. Id + "although it is in state " + slave.State);86 if (!lifecycleManager.LastHeartbeats.ContainsKey(slave.Id)) { 87 Logger.Info("No previous hearbeats are available for " + slave.Name + "(" + slave.Id + "), although it is in state " + slave.State); 101 88 102 89 // add a heartbeat NOW and give the slave time to say something for HEARTBEAT_MAX_DIF 103 90 // otherwise alls the slaves jobs would be aborted, which is not desirable if the server has just been restarted 104 91 heartbeatLock.EnterWriteLock(); 105 l astHeartbeats.Add(slave.Id, DateTime.Now);92 lifecycleManager.LastHeartbeats.Add(slave.Id, DateTime.Now); 106 93 heartbeatLock.ExitWriteLock(); 107 94 } else { 108 DateTime lastHeartbeatOfSlave = l astHeartbeats[slave.Id];95 DateTime lastHeartbeatOfSlave = lifecycleManager.LastHeartbeats[slave.Id]; 109 96 110 97 TimeSpan diff = DateTime.Now.Subtract(lastHeartbeatOfSlave); … … 112 99 if (diff.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) { 113 100 // if slave calculated jobs, the job must be reset 114 Logger.Info("Slave timed out and is on RESET ");101 Logger.Info("Slave timed out and is on RESET (no message for " + diff.TotalSeconds + " seconds.)"); 115 102 foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) { 116 103 DaoLocator.JobDao.SetJobOffline(job); 117 lock ( newAssignedJobs) {118 if ( newAssignedJobs.ContainsKey(job.Id))119 newAssignedJobs.Remove(job.Id);104 lock (lifecycleManager.NewAssignedJobs) { 105 if (lifecycleManager.NewAssignedJobs.ContainsKey(job.Id)) 106 lifecycleManager.NewAssignedJobs.Remove(job.Id); 120 107 } 121 108 } … … 127 114 Logger.Debug("removing it from the heartbeats list"); 128 115 heartbeatLock.EnterWriteLock(); 129 l astHeartbeats.Remove(slave.Id);116 lifecycleManager.LastHeartbeats.Remove(slave.Id); 130 117 heartbeatLock.ExitWriteLock(); 131 118 } … … 148 135 149 136 foreach (JobDto currJob in pendingJobsInDb) { 150 lock ( pendingJobs) {151 if ( pendingJobs.ContainsKey(currJob.Id)) {152 if ( pendingJobs[currJob.Id] <= 0) {137 lock (lifecycleManager.PendingJobs) { 138 if (lifecycleManager.PendingJobs.ContainsKey(currJob.Id)) { 139 if (lifecycleManager.PendingJobs[currJob.Id] <= 0) { 153 140 currJob.State = JobState.Offline; 154 141 DaoLocator.JobDao.Update(currJob); 155 142 } else { 156 pendingJobs[currJob.Id]--;143 lifecycleManager.PendingJobs[currJob.Id]--; 157 144 } 158 145 } … … 173 160 174 161 heartbeatLock.EnterWriteLock(); 175 if (l astHeartbeats.ContainsKey(slave.Id)) {176 l astHeartbeats[slave.Id] = DateTime.Now;162 if (lifecycleManager.LastHeartbeats.ContainsKey(slave.Id)) { 163 lifecycleManager.LastHeartbeats[slave.Id] = DateTime.Now; 177 164 } else { 178 l astHeartbeats.Add(slave.Id, DateTime.Now);165 lifecycleManager.LastHeartbeats.Add(slave.Id, DateTime.Now); 179 166 } 180 167 heartbeatLock.ExitWriteLock(); … … 247 234 using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) { 248 235 SlaveDto slave = UpdateSlaveData(heartbeatData); 236 DaoLocator.SlaveDao.Update(slave); 237 249 238 SaveTimestamp(heartbeatData); 250 239 251 240 //ProcessJobProgress(heartbeatData, response); 252 241 response.ActionRequest = ProcessJobProgress(heartbeatData); 253 242 254 243 //check if new Cal must be loaded 255 244 if (slave.CalendarSyncStatus == CalendarState.Fetch || slave.CalendarSyncStatus == CalendarState.ForceFetch) { … … 270 259 } 271 260 272 DaoLocator.SlaveDao.Update(slave);273 261 scope.Complete(); 274 262 } … … 286 274 } 287 275 288 private staticvoid SaveTimestamp(HeartBeatData heartbeatData) {276 private void SaveTimestamp(HeartBeatData heartbeatData) { 289 277 heartbeatLock.EnterWriteLock(); 290 if (l astHeartbeats.ContainsKey(heartbeatData.SlaveId)) {291 l astHeartbeats[heartbeatData.SlaveId] = DateTime.Now;278 if (lifecycleManager.LastHeartbeats.ContainsKey(heartbeatData.SlaveId)) { 279 lifecycleManager.LastHeartbeats[heartbeatData.SlaveId] = DateTime.Now; 292 280 } else { 293 l astHeartbeats.Add(heartbeatData.SlaveId, DateTime.Now);281 lifecycleManager.LastHeartbeats.Add(heartbeatData.SlaveId, DateTime.Now); 294 282 } 295 283 heartbeatLock.ExitWriteLock(); … … 316 304 317 305 // find all the jobs in jobProgress which are not in the database -> they are not supposed to be calculated by this slave 318 IEnumerable<Guid> jobsToAbort = GetJobsNotInDatabase(heartbeatData.SlaveId, heartbeatData.JobProgress.Keys); 319 foreach (Guid jobId in jobsToAbort) { 320 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId)); 321 heartbeatData.JobProgress.Remove(jobId); 322 } 306 //IEnumerable<Guid> jobsToAbort = GetJobsNotCalculatedByThisSlave(heartbeatData.SlaveId, heartbeatData.JobProgress.Keys); 307 //foreach (Guid jobId in jobsToAbort) { 308 // Logger.Error("Job shall not be caculated by this slave or does not exist in DB: " + jobId); 309 // actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId)); 310 // heartbeatData.JobProgress.Remove(jobId); 311 //} 323 312 324 313 // process all the remaining jobProgresses … … 327 316 if (curJob == null) { 328 317 // job does not exist in db 318 Logger.Error("Job does not exist in DB: " + jobProgress.Key); 329 319 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key)); 330 Logger.Error("Job does not exist in DB: " + jobProgress.Key);331 320 } else { 332 321 curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id); 322 Guid id = curJob.Slave != null ? curJob.Slave.Id : Guid.Empty; 333 323 if (curJob.Slave == null || curJob.Slave.Id != heartbeatData.SlaveId) { 334 324 // assigned slave does not match heartbeat 325 Logger.Error("The slave " + heartbeatData.SlaveId + " is not supposed to calculate Job: " + curJob.ToString()); 335 326 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id)); 336 Logger.Error("The slave " + heartbeatData.SlaveId + " is not supposed to calculate Job: " + curJob);337 327 } else { 338 328 // save job execution time … … 341 331 if (curJob.State == JobState.Aborted) { 342 332 // a request to abort the job has been set 333 Logger.Error("Job is in state aborted, send AbortJob: " + curJob.Id); 343 334 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id)); 344 335 } else if (curJob.State == JobState.SnapshotRequested) { … … 355 346 foreach (JobDto currJob in jobsOfSlave) { 356 347 if (heartbeatData.JobProgress.ContainsKey(currJob.Id)) { 357 lock ( newAssignedJobs) {358 if ( newAssignedJobs.ContainsKey(currJob.Id)) {348 lock (lifecycleManager.NewAssignedJobs) { 349 if (lifecycleManager.NewAssignedJobs.ContainsKey(currJob.Id)) { 359 350 Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob); 360 newAssignedJobs.Remove(currJob.Id);351 lifecycleManager.NewAssignedJobs.Remove(currJob.Id); 361 352 } 362 353 } 363 354 } else { 364 lock ( newAssignedJobs) {365 if ( newAssignedJobs.ContainsKey(currJob.Id)) {366 newAssignedJobs[currJob.Id]--;367 Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave);368 if ( newAssignedJobs[currJob.Id] <= 0) {355 lock (lifecycleManager.NewAssignedJobs) { 356 if (lifecycleManager.NewAssignedJobs.ContainsKey(currJob.Id)) { 357 lifecycleManager.NewAssignedJobs[currJob.Id]--; 358 Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + lifecycleManager.NewAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave); 359 if (lifecycleManager.NewAssignedJobs[currJob.Id] <= 0) { 369 360 Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Slave); 370 361 … … 374 365 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id)); 375 366 376 newAssignedJobs.Remove(currJob.Id);367 lifecycleManager.NewAssignedJobs.Remove(currJob.Id); 377 368 } 378 369 } else { … … 388 379 389 380 /// <summary> 390 /// Returns the jobIds of the jobs which are not assigned to this slave in the database 391 /// </summary> 392 private IEnumerable<Guid> GetJobsNot InDatabase(Guid slaveId, IEnumerable<Guid> jobIds) {381 /// Returns the jobIds of the jobs which are not assigned to this slave in the database (they either are not stored in DB or they are assigned 382 /// </summary> 383 private IEnumerable<Guid> GetJobsNotCalculatedByThisSlave(Guid slaveId, IEnumerable<Guid> jobIds) { 393 384 IEnumerable<Guid> activeJobsOfSlave = DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(slaveId)).Select(j => j.Id); 394 385 return jobIds.Except(activeJobsOfSlave).ToList(); … … 520 511 521 512 Logger.Info("Job pulled: " + job2Calculate + " for user " + slaveId); 522 lock ( newAssignedJobs) {523 if (! newAssignedJobs.ContainsKey(job2Calculate.Id))524 newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);513 lock (lifecycleManager.NewAssignedJobs) { 514 if (!lifecycleManager.NewAssignedJobs.ContainsKey(job2Calculate.Id)) 515 lifecycleManager.NewAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE); 525 516 } 526 517 } else { … … 701 692 702 693 heartbeatLock.EnterWriteLock(); 703 if (l astHeartbeats.ContainsKey(slaveId))704 l astHeartbeats.Remove(slaveId);694 if (lifecycleManager.LastHeartbeats.ContainsKey(slaveId)) 695 lifecycleManager.LastHeartbeats.Remove(slaveId); 705 696 heartbeatLock.ExitWriteLock(); 706 697 … … 745 736 } 746 737 job.State = JobState.Pending; 747 lock ( pendingJobs) {748 pendingJobs.Add(job.Id, PENDING_TIMEOUT);738 lock (lifecycleManager.PendingJobs) { 739 lifecycleManager.PendingJobs.Add(job.Id, PENDING_TIMEOUT); 749 740 } 750 741 … … 765 756 if (ipd != null) { 766 757 response.List.Add(ConvertPluginDescriptorToDto(ipd)); 767 } else {768 response.StatusMessage = ResponseStatus.GetPlugins_PluginsNotAvailable;769 return response;770 758 } 771 759 }
Note: See TracChangeset
for help on using the changeset viewer.