- Timestamp:
- 07/17/15 10:11:56 (9 years ago)
- Location:
- branches/HiveStatistics/sources/HeuristicLab.Services.Hive/3.3/Manager
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HiveStatistics/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
r12691 r12773 125 125 /// </summary> 126 126 private IEnumerable<MessageContainer> UpdateTasks(Heartbeat heartbeat, bool IsAllowedToCalculate) { 127 List<MessageContainer> actions = new List<MessageContainer>(); 127 using (new PerformanceLogger("Old_UpdateTasks")) { 128 List<MessageContainer> actions = new List<MessageContainer>(); 128 129 129 if (heartbeat.JobProgress == null)130 return actions;130 if (heartbeat.JobProgress == null) 131 return actions; 131 132 132 if (!IsAllowedToCalculate && heartbeat.JobProgress.Count != 0) { 133 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll)); 134 } else { 135 // process the jobProgresses 136 foreach (var jobProgress in heartbeat.JobProgress) { 137 Tuple<Task, Guid?> taskWithLastStateLogSlaveId = null; 138 trans.UseTransaction(() => { 139 taskWithLastStateLogSlaveId = dao.GetTaskByIdAndLastStateLogSlaveId(jobProgress.Key); 140 }); 141 var curTask = taskWithLastStateLogSlaveId != null ? taskWithLastStateLogSlaveId.Item1 : null; 142 if (curTask == null) { 143 // task does not exist in db 144 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key)); 145 LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); 146 } else { 147 var slaveId = taskWithLastStateLogSlaveId.Item2; 148 if (slaveId == Guid.Empty || slaveId != heartbeat.SlaveId) { 149 // assigned slave does not match heartbeat 150 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 151 LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask); 152 } else if (!dao.TaskIsAllowedToBeCalculatedBySlave(curTask.TaskId, heartbeat.SlaveId)) { 153 // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group 154 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 133 if (!IsAllowedToCalculate && heartbeat.JobProgress.Count != 0) { 134 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll)); 135 } else { 136 // process the jobProgresses 137 foreach (var jobProgress in heartbeat.JobProgress) { 138 Tuple<Task, Guid?> taskWithLastStateLogSlaveId = null; 139 trans.UseTransaction(() => { 140 taskWithLastStateLogSlaveId = 141 dao.GetTaskByIdAndLastStateLogSlaveId(jobProgress.Key); 142 }); 143 var curTask = taskWithLastStateLogSlaveId != null ? taskWithLastStateLogSlaveId.Item1 : null; 144 if (curTask == null) { 145 // task does not exist in db 146 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key)); 147 LogFactory.GetLogger(this.GetType().Namespace) 148 .Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); 155 149 } else { 156 // save task execution time 157 curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds; 158 curTask.LastHeartbeat = DateTime.Now; 150 var slaveId = taskWithLastStateLogSlaveId.Item2; 151 if (slaveId == Guid.Empty || slaveId != heartbeat.SlaveId) { 152 // assigned slave does not match heartbeat 153 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 154 LogFactory.GetLogger(this.GetType().Namespace) 155 .Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask); 156 } else if (!dao.TaskIsAllowedToBeCalculatedBySlave(curTask.TaskId, heartbeat.SlaveId)) { 157 // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group 158 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 159 } else { 160 // save task execution time 161 curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds; 162 curTask.LastHeartbeat = DateTime.Now; 159 163 160 switch (curTask.Command) { 161 case Command.Stop: 162 actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId)); 163 break; 164 case Command.Pause: 165 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 166 break; 167 case Command.Abort: 168 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 169 break; 164 switch (curTask.Command) { 165 case Command.Stop: 166 actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId)); 167 break; 168 case Command.Pause: 169 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 170 break; 171 case Command.Abort: 172 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 173 break; 174 } 175 trans.UseTransaction(() => { 176 dao.UpdateTask(curTask); 177 }); 170 178 } 171 trans.UseTransaction(() => {172 dao.UpdateTask(curTask);173 });174 179 } 175 180 } 176 181 } 182 return actions; 177 183 } 178 return actions;179 184 } 180 185 } -
branches/HiveStatistics/sources/HeuristicLab.Services.Hive/3.3/Manager/NewHeartbeatManager.cs
r12691 r12773 50 50 var slaveDao = pm.SlaveDao; 51 51 var taskDao = pm.TaskDao; 52 53 52 var slave = pm.UseTransaction(() => slaveDao.GetById(heartbeat.SlaveId)); 54 53 if (slave == null) { … … 85 84 if (mutexAquired) { 86 85 var waitingTasks = pm.UseTransaction(() => taskDao.GetWaitingTasks(slave) 87 .Select(x => new TaskInfoForScheduler {88 TaskId = x.TaskId,89 JobId = x.JobId,90 Priority = x.Priority91 })92 .ToList()86 .Select(x => new TaskInfoForScheduler { 87 TaskId = x.TaskId, 88 JobId = x.JobId, 89 Priority = x.Priority 90 }) 91 .ToList() 93 92 ); 94 93 var availableTasks = TaskScheduler.Schedule(waitingTasks); … … 146 145 var assignedResourceDao = pm.AssignedResourceDao; 147 146 var actions = new List<MessageContainer>(); 148 if (heartbeat.JobProgress == null )147 if (heartbeat.JobProgress == null || !heartbeat.JobProgress.Any()) 149 148 return actions; 150 149 … … 154 153 // select all tasks and statelogs with one query 155 154 var taskIds = heartbeat.JobProgress.Select(x => x.Key).ToList(); 156 var taskInfos = 155 var taskInfos = pm.UseTransaction(() => 157 156 (from task in taskDao.GetAll() 158 157 where taskIds.Contains(task.TaskId) … … 162 161 Command = task.Command, 163 162 SlaveId = lastStateLog != null ? lastStateLog.SlaveId : default(Guid) 164 }) 165 .ToList();163 }).ToList() 164 ); 166 165 167 166 // process the jobProgresses
Note: See TracChangeset
for help on using the changeset viewer.