Changeset 9381 for branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
- Timestamp:
- 04/18/13 16:35:14 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
r9257 r9381 47 47 public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) { 48 48 List<MessageContainer> actions = new List<MessageContainer>(); 49 Slave slave = null;50 slave = trans.UseTransaction(() => { return dao.GetSlave(heartbeat.SlaveId); });51 49 52 if (slave == null) { 53 actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello)); 54 } else { 55 if (heartbeat.HbInterval != slave.HbInterval) { 56 actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval)); 50 DA.Slave slave = null; 51 trans.UseTransaction(() => { 52 slave = dao.GetSlaveDA(heartbeat.SlaveId); 53 54 if (slave == null) { 55 actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello)); 56 } else { 57 if (heartbeat.HbInterval != slave.HbInterval) { 58 actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval)); 59 } 60 if (ShutdownSlaveComputer(slave.ResourceId)) { 61 actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer)); 62 } 63 64 // update slave data 65 slave.FreeCores = heartbeat.FreeCores; 66 slave.FreeMemory = heartbeat.FreeMemory; 67 slave.CpuUtilization = heartbeat.CpuUtilization; 68 slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.ResourceId); 69 slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? DA.SlaveState.Calculating : DA.SlaveState.Idle; 70 slave.LastHeartbeat = DateTime.Now; 71 72 dao.UpdateSlaveDA(slave); 57 73 } 58 if (ShutdownSlaveComputer(slave.Id)) { 59 actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer)); 60 } 74 }); 61 75 62 // update slave data 63 slave.FreeCores = heartbeat.FreeCores; 64 slave.FreeMemory = heartbeat.FreeMemory; 65 slave.CpuUtilization = heartbeat.CpuUtilization; 66 slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.Id); 67 slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle; 68 slave.LastHeartbeat = DateTime.Now; 76 // update task data 77 actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate)); 69 78 70 trans.UseTransaction(() => { dao.UpdateSlave(slave); }); 79 // assign new task 80 if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) { 81 bool mutexAquired = false; 82 var mutex = new Mutex(false, MutexName); 83 try { 71 84 72 // update task data 73 actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate)); 74 75 // assign new task 76 if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) { 77 bool mutexAquired = false; 78 var mutex = new Mutex(false, MutexName); 79 try { 80 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 81 if (!mutexAquired) 82 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); 83 else { 85 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 86 if (!mutexAquired) 87 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); 88 else { 89 trans.UseTransaction(() => { 84 90 IEnumerable<TaskInfoForScheduler> availableTasks = null; 85 availableTasks = t rans.UseTransaction(() => { return taskScheduler.Schedule(dao.GetWaitingTasks(slave)); });91 availableTasks = taskScheduler.Schedule(dao.GetWaitingTasksDA(slave)); 86 92 if (availableTasks.Any()) { 87 93 var task = availableTasks.First(); … … 89 95 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 90 96 } 91 } 97 }); 92 98 } 93 catch (AbandonedMutexException) {94 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");95 }96 catch (Exception ex) {97 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());98 }99 finally {100 if (mutexAquired) mutex.ReleaseMutex();101 }99 } 100 catch (AbandonedMutexException) { 101 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned."); 102 } 103 catch (Exception ex) { 104 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString()); 105 } 106 finally { 107 if (mutexAquired) mutex.ReleaseMutex(); 102 108 } 103 109 } … … 105 111 } 106 112 107 private void AssignJob(Slave slave, Guid taskId) { 108 trans.UseTransaction(() => { 109 var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.Id, null, null); 113 private void AssignJob(DA.Slave slave, Guid taskId) { 114 var task = dao.UpdateTaskStateDA(taskId, DataAccess.TaskState.Transferring, slave.ResourceId, null, null); 110 115 111 // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout) 112 task.LastHeartbeat = DateTime.Now; 113 dao.UpdateTask(task); 114 }); 116 // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout) 117 task.LastHeartbeat = DateTime.Now; 118 dao.UpdateTaskDA(task); 115 119 } 116 120 … … 128 132 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll)); 129 133 } else { 130 // process the jobProgresses 131 foreach (var jobProgress in heartbeat.JobProgress) { 132 Task curTask = null; 133 curTask = trans.UseTransaction(() => { return dao.GetTask(jobProgress.Key); }); 134 if (curTask == null) { 135 // task does not exist in db 136 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key)); 137 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); 138 } else { 139 if (curTask.CurrentStateLog.SlaveId == Guid.Empty || curTask.CurrentStateLog.SlaveId != heartbeat.SlaveId) { 140 // assigned slave does not match heartbeat 141 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id)); 142 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask); 143 } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) { 144 // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group 145 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id)); 134 trans.UseTransaction(() => { 135 // process the jobProgresses 136 foreach (var jobProgress in heartbeat.JobProgress) { 137 var curTask = dao.GetTaskDA(jobProgress.Key); 138 if (curTask == null) { 139 // task does not exist in db 140 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key)); 141 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); 146 142 } else { 147 // save task execution time 148 curTask.ExecutionTime = jobProgress.Value; 149 curTask.LastHeartbeat = DateTime.Now; 143 var currentStateLog = curTask.StateLogs.LastOrDefault(); 144 if (currentStateLog.SlaveId == Guid.Empty || currentStateLog.SlaveId != heartbeat.SlaveId) { 145 // assigned slave does not match heartbeat 146 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 147 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask); 148 } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) { 149 // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group 150 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 151 } else { 152 // save task execution time 153 curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds; 154 curTask.LastHeartbeat = DateTime.Now; 150 155 151 switch (curTask.Command) { 152 case Command.Stop: 153 actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.Id)); 154 break; 155 case Command.Pause: 156 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id)); 157 break; 158 case Command.Abort: 159 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id)); 160 break; 156 switch (curTask.Command) { 157 case DA.Command.Stop: 158 actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId)); 159 break; 160 case DA.Command.Pause: 161 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 162 break; 163 case DA.Command.Abort: 164 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 165 break; 166 } 167 dao.UpdateTaskDA(curTask); 161 168 } 162 trans.UseTransaction(() => { dao.UpdateTask(curTask); });163 169 } 164 170 } 165 } 171 }); 166 172 } 167 173 return actions; 168 174 } 169 175 170 private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) { 171 return trans.UseTransaction(() => { 172 var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id); 173 var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id); 174 return assignedResourceIds.Any(x => slaveResourceIds.Contains(x)); 175 }); 176 private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, DA.Task curTask) { 177 var assignedResourceIds = curTask.AssignedResources.Select(ar => ar.Resource.ResourceId); 178 var slaveResourceIds = dao.GetParentResourcesDA(slaveId).Select(x => x.ResourceId); 179 return assignedResourceIds.Any(r => slaveResourceIds.Contains(r)); 176 180 } 177 181 178 182 private bool SlaveIsAllowedToCalculate(Guid slaveId) { 183 var parentResources = dao.GetParentResourcesDA(slaveId); 179 184 // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also 180 return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0); });185 return parentResources.All(r => r.Downtimes.Where(d => d.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)).Count() == 0); 181 186 } 182 187 183 188 private bool ShutdownSlaveComputer(Guid slaveId) { 184 return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).Any(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() != 0); }); 189 var parentResources = dao.GetParentResourcesDA(slaveId); 190 191 return parentResources.Any(r => r.Downtimes.Where(d => d.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)).Count() != 0); 185 192 } 186 193 }
Note: See TracChangeset
for help on using the changeset viewer.