Changeset 16141 for branches/2817-BinPackingSpeedup/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
- Timestamp:
- 09/14/18 11:47:37 (6 years ago)
- Location:
- branches/2817-BinPackingSpeedup
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/2817-BinPackingSpeedup
- Property svn:mergeinfo changed
-
branches/2817-BinPackingSpeedup/HeuristicLab.Services.Hive
- Property svn:mergeinfo changed
-
branches/2817-BinPackingSpeedup/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
r16140 r16141 142 142 private IEnumerable<MessageContainer> UpdateTasks(IPersistenceManager pm, Heartbeat heartbeat, bool isAllowedToCalculate) { 143 143 var taskDao = pm.TaskDao; 144 var assignedResourceDao = pm.AssignedResourceDao; 144 var jobDao = pm.JobDao; 145 var assignedJobResourceDao = pm.AssignedJobResourceDao; 145 146 var actions = new List<MessageContainer>(); 146 147 if (heartbeat.JobProgress == null || !heartbeat.JobProgress.Any()) 147 148 return actions; 148 149 149 if (!isAllowedToCalculate && heartbeat.JobProgress.Count != 0) { 150 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll)); 151 } else { 152 // select all tasks and statelogs with one query 153 var taskIds = heartbeat.JobProgress.Select(x => x.Key).ToList(); 154 var taskInfos = pm.UseTransaction(() => 155 (from task in taskDao.GetAll() 156 where taskIds.Contains(task.TaskId) 157 let lastStateLog = task.StateLogs.OrderByDescending(x => x.DateTime).FirstOrDefault() 158 select new { 159 TaskId = task.TaskId, 160 Command = task.Command, 161 SlaveId = lastStateLog != null ? lastStateLog.SlaveId : default(Guid) 162 }).ToList() 163 ); 164 165 // process the jobProgresses 166 foreach (var jobProgress in heartbeat.JobProgress) { 167 var progress = jobProgress; 168 var curTask = taskInfos.SingleOrDefault(x => x.TaskId == progress.Key); 169 if (curTask == null) { 170 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, progress.Key)); 171 LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); 172 } else { 173 var slaveId = curTask.SlaveId; 174 if (slaveId == Guid.Empty || slaveId != heartbeat.SlaveId) { 175 // assigned slave does not match heartbeat 150 var jobIdsWithStatisticsPending = jobDao.GetJobIdsByState(DA.JobState.StatisticsPending).ToList(); 151 152 // select all tasks and statelogs with one query 153 var taskIds = heartbeat.JobProgress.Select(x => x.Key).ToList(); 154 var taskInfos = pm.UseTransaction(() => 155 (from task in taskDao.GetAll() 156 where taskIds.Contains(task.TaskId) 157 let lastStateLog = task.StateLogs.OrderByDescending(x => x.DateTime).FirstOrDefault() 158 select new { 159 TaskId = task.TaskId, 160 JobId = task.JobId, 161 State = task.State, 162 Command = task.Command, 163 SlaveId = lastStateLog != null ? lastStateLog.SlaveId : default(Guid) 164 }).ToList() 165 ); 166 167 // process the jobProgresses 168 foreach (var jobProgress in heartbeat.JobProgress) { 169 var progress = jobProgress; 170 var curTask = taskInfos.SingleOrDefault(x => x.TaskId == progress.Key); 171 if (curTask == null) { 172 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, progress.Key)); 173 LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); 174 } else if (jobIdsWithStatisticsPending.Contains(curTask.JobId)) { 175 // parenting job of current task has been requested for deletion (indicated by job state "Statistics Pending") 176 // update task execution time 177 pm.UseTransaction(() => { 178 taskDao.UpdateExecutionTime(curTask.TaskId, progress.Value.TotalMilliseconds); 179 }); 180 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 181 LogFactory.GetLogger(this.GetType().Namespace).Log("Abort task " + curTask.TaskId + " on slave " + heartbeat.SlaveId + ". The parenting job " + curTask.JobId + " was requested to be deleted."); 182 } else if (curTask.SlaveId == Guid.Empty || curTask.SlaveId != heartbeat.SlaveId) { 183 // assigned slave does not match heartbeat 184 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 185 LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask.TaskId); 186 } else if (!isAllowedToCalculate) { 187 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 188 LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not allowed to calculate any tasks tue to a downtime. The task is paused."); 189 } else if (!assignedJobResourceDao.CheckJobGrantedForResource(curTask.JobId, heartbeat.SlaveId)) { 190 // slaveId (and parent resourceGroupIds) are not among the assigned resources ids for task-parenting job 191 // this might happen when (a) job-resource assignment has been changed (b) slave is moved to different group 192 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 193 LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not granted to calculate task: " + curTask.TaskId + " of job: " + curTask.JobId); 194 } else { 195 // update task execution time 196 pm.UseTransaction(() => { 197 taskDao.UpdateExecutionTime(curTask.TaskId, progress.Value.TotalMilliseconds); 198 }); 199 switch (curTask.Command) { 200 case DA.Command.Stop: 201 actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId)); 202 break; 203 case DA.Command.Pause: 204 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 205 break; 206 case DA.Command.Abort: 176 207 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 177 LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask.TaskId); 178 } else if (!assignedResourceDao.TaskIsAllowedToBeCalculatedBySlave(curTask.TaskId, heartbeat.SlaveId)) { 179 // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group 180 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 181 } else { 182 // update task execution time 183 pm.UseTransaction(() => { 184 taskDao.UpdateExecutionTime(curTask.TaskId, progress.Value.TotalMilliseconds); 185 }); 186 switch (curTask.Command) { 187 case DA.Command.Stop: 188 actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId)); 189 break; 190 case DA.Command.Pause: 191 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 192 break; 193 case DA.Command.Abort: 194 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 195 break; 196 } 197 } 198 } 199 } 200 } 208 break; 209 } 210 } 211 212 } 201 213 return actions; 202 214 }
Note: See TracChangeset
for help on using the changeset viewer.