Changeset 15630 for branches/HiveProjectManagement/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
- Timestamp:
- 01/18/18 15:08:25 (3 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HiveProjectManagement/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
r15411 r15630 141 141 /// </summary> 142 142 private IEnumerable<MessageContainer> UpdateTasks(IPersistenceManager pm, Heartbeat heartbeat, bool isAllowedToCalculate) { 143 var taskDao = pm.TaskDao; 144 var jobDao = pm.JobDao; 145 var assignedJobResourceDao = pm.AssignedJobResourceDao; 146 var actions = new List<MessageContainer>(); 147 if (heartbeat.JobProgress == null || !heartbeat.JobProgress.Any()) 148 return actions; 149 150 var jobIdsWithStatisticsPending = jobDao.GetJobIdsByState(DA.JobState.StatisticsPending).ToList(); 151 152 // select all tasks and statelogs with one query 153 var taskIds = heartbeat.JobProgress.Select(x => x.Key).ToList(); 154 var taskInfos = pm.UseTransaction(() => 155 (from task in taskDao.GetAll() 156 where taskIds.Contains(task.TaskId) 157 let lastStateLog = task.StateLogs.OrderByDescending(x => x.DateTime).FirstOrDefault() 158 select new { 159 TaskId = task.TaskId, 160 JobId = task.JobId, 161 State = task.State, 162 Command = task.Command, 163 SlaveId = lastStateLog != null ? lastStateLog.SlaveId : default(Guid) 164 }).ToList() 165 ); 166 167 // process the jobProgresses 168 foreach (var jobProgress in heartbeat.JobProgress) { 169 var progress = jobProgress; 170 var curTask = taskInfos.SingleOrDefault(x => x.TaskId == progress.Key); 171 if (curTask == null) { 172 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, progress.Key)); 173 LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); 174 } else if (jobIdsWithStatisticsPending.Contains(curTask.JobId)) { 175 // parenting job of current task has been requested for deletion (indicated by job state "Statistics Pending") 176 // update task execution time 177 pm.UseTransaction(() => { 178 taskDao.UpdateExecutionTime(curTask.TaskId, progress.Value.TotalMilliseconds); 179 }); 180 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 181 LogFactory.GetLogger(this.GetType().Namespace).Log("Abort task " + curTask.TaskId + " on slave " + heartbeat.SlaveId + ". The parenting job " + curTask.JobId + " was requested to be deleted."); 182 } else if (curTask.SlaveId == Guid.Empty || curTask.SlaveId != heartbeat.SlaveId) { 183 // assigned slave does not match heartbeat 184 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 185 LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask.TaskId); 186 } else if (!isAllowedToCalculate) { 187 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 188 LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not allowed to calculate any tasks tue to a downtime. The task is paused."); 189 } else if (assignedJobResourceDao.CheckJobGrantedForResource(curTask.JobId, heartbeat.SlaveId)) { 190 // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group 191 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 192 LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not granted to calculate task: " + curTask.TaskId); 193 } else { 194 // update task execution time 195 pm.UseTransaction(() => { 196 taskDao.UpdateExecutionTime(curTask.TaskId, progress.Value.TotalMilliseconds); 197 }); 198 switch (curTask.Command) { 199 case DA.Command.Stop: 200 actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId)); 201 break; 202 case DA.Command.Pause: 203 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 204 break; 205 case DA.Command.Abort: 206 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 207 break; 208 } 209 } 210 211 } 212 return actions; 213 } 214 215 /// <summary> 216 /// Update the progress of each task 217 /// Checks if all the task sent by heartbeat are supposed to be calculated by this slave 218 /// </summary> 219 private IEnumerable<MessageContainer> UpdateTasks_Old(IPersistenceManager pm, Heartbeat heartbeat, bool isAllowedToCalculate) { 143 220 var taskDao = pm.TaskDao; 144 221 var assignedResourceDao = pm.AssignedTaskResourceDao;
Note: See TracChangeset
for help on using the changeset viewer.