Changeset 12858 for branches/HiveStatistics/sources/HeuristicLab.Services.Hive/3.3/Manager/NewHeartbeatManager.cs
- Timestamp:
- 08/13/15 15:22:51 (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HiveStatistics/sources/HeuristicLab.Services.Hive/3.3/Manager/NewHeartbeatManager.cs
r12773 r12858 47 47 public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) { 48 48 List<MessageContainer> actions = new List<MessageContainer>(); 49 using (var pm = PersistenceManager) { 50 var slaveDao = pm.SlaveDao; 51 var taskDao = pm.TaskDao; 52 var slave = pm.UseTransaction(() => slaveDao.GetById(heartbeat.SlaveId)); 53 if (slave == null) { 54 actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello)); 55 } else { 56 if (heartbeat.HbInterval != slave.HbInterval) { 57 actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval)); 58 } 59 if (slaveDao.SlaveHasToShutdownComputer(slave.ResourceId)) { 60 actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer)); 61 } 62 // update slave data 63 slave.FreeCores = heartbeat.FreeCores; 64 slave.FreeMemory = heartbeat.FreeMemory; 65 slave.CpuUtilization = heartbeat.CpuUtilization; 66 slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) 67 ? DA.SlaveState.Calculating 68 : DA.SlaveState.Idle; 69 slave.LastHeartbeat = DateTime.Now; 70 pm.UseTransaction(() => { 71 slave.IsAllowedToCalculate = slaveDao.SlaveIsAllowedToCalculate(slave.ResourceId); 72 pm.SubmitChanges(); 73 }); 74 75 // update task data 76 actions.AddRange(UpdateTasks(pm, heartbeat, slave.IsAllowedToCalculate)); 77 78 // assign new task 79 if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) { 80 bool mutexAquired = false; 81 var mutex = new Mutex(false, MutexName); 82 try { 83 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 84 if (mutexAquired) { 85 var waitingTasks = pm.UseTransaction(() => taskDao.GetWaitingTasks(slave) 86 .Select(x => new TaskInfoForScheduler { 87 TaskId = x.TaskId, 88 JobId = x.JobId, 89 Priority = x.Priority 90 }) 91 .ToList() 92 ); 93 var availableTasks = TaskScheduler.Schedule(waitingTasks); 94 if (availableTasks.Any()) { 95 var task = availableTasks.First(); 96 AssignTask(pm, slave, task.TaskId); 97 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 98 } 99 } else { 100 LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); 49 var pm = PersistenceManager; 50 var slaveDao = pm.SlaveDao; 51 var taskDao = pm.TaskDao; 52 var slave = pm.UseTransaction(() => slaveDao.GetById(heartbeat.SlaveId)); 53 if (slave == null) { 54 actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello)); 55 } else { 56 if (heartbeat.HbInterval != slave.HbInterval) { 57 actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval)); 58 } 59 if (slaveDao.SlaveHasToShutdownComputer(slave.ResourceId)) { 60 actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer)); 61 } 62 // update slave data 63 slave.FreeCores = heartbeat.FreeCores; 64 slave.FreeMemory = heartbeat.FreeMemory; 65 slave.CpuUtilization = heartbeat.CpuUtilization; 66 slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) 67 ? DA.SlaveState.Calculating 68 : DA.SlaveState.Idle; 69 slave.LastHeartbeat = DateTime.Now; 70 pm.UseTransaction(() => { 71 slave.IsAllowedToCalculate = slaveDao.SlaveIsAllowedToCalculate(slave.ResourceId); 72 pm.SubmitChanges(); 73 }); 74 75 // update task data 76 actions.AddRange(UpdateTasks(pm, heartbeat, slave.IsAllowedToCalculate)); 77 78 // assign new task 79 if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) { 80 bool mutexAquired = false; 81 var mutex = new Mutex(false, MutexName); 82 try { 83 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 84 if (mutexAquired) { 85 var waitingTasks = pm.UseTransaction(() => taskDao.GetWaitingTasks(slave) 86 .Select(x => new TaskInfoForScheduler { 87 TaskId = x.TaskId, 88 JobId = x.JobId, 89 Priority = x.Priority 90 }) 91 .ToList() 92 ); 93 var availableTasks = TaskScheduler.Schedule(waitingTasks).ToArray(); 94 if (availableTasks.Any()) { 95 var task = availableTasks.First(); 96 AssignTask(pm, slave, task.TaskId); 97 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 101 98 } 99 } else { 100 LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); 102 101 } 103 catch (AbandonedMutexException) {104 LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");105 }106 catch (Exception ex) {107 LogFactory.GetLogger(this.GetType().Namespace).Log(string.Format("HeartbeatManager threw an exception in ProcessHeartbeat: {0}", ex));108 }109 finally {110 if (mutexAquired) mutex.ReleaseMutex();111 }102 } 103 catch (AbandonedMutexException) { 104 LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned."); 105 } 106 catch (Exception ex) { 107 LogFactory.GetLogger(this.GetType().Namespace).Log(string.Format("HeartbeatManager threw an exception in ProcessHeartbeat: {0}", ex)); 108 } 109 finally { 110 if (mutexAquired) mutex.ReleaseMutex(); 112 111 } 113 112 }
Note: See TracChangeset
for help on using the changeset viewer.