Changeset 9385 for branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
- Timestamp:
- 04/19/13 17:16:49 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
r9381 r9385 74 74 }); 75 75 76 // update task data 77 actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate)); 76 if (slave != null) { 77 // update task data 78 actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate)); 78 79 79 // assign new task80 if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {81 bool mutexAquired = false;82 var mutex = new Mutex(false, MutexName);83 try {80 // assign new task 81 if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) { 82 bool mutexAquired = false; 83 var mutex = new Mutex(false, MutexName); 84 try { 84 85 85 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 86 if (!mutexAquired) 87 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); 88 else { 89 trans.UseTransaction(() => { 90 IEnumerable<TaskInfoForScheduler> availableTasks = null; 91 availableTasks = taskScheduler.Schedule(dao.GetWaitingTasksDA(slave)); 92 if (availableTasks.Any()) { 93 var task = availableTasks.First(); 94 AssignJob(slave, task.TaskId); 95 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 96 } 97 }); 86 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 87 if (!mutexAquired) 88 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); 89 else { 90 trans.UseTransaction(() => { 91 IEnumerable<TaskInfoForScheduler> availableTasks = null; 92 availableTasks = taskScheduler.Schedule(dao.GetWaitingTasksDA(slave).ToArray()); 93 if (availableTasks.Any()) { 94 var task = availableTasks.First(); 95 AssignJob(slave, task.TaskId); 96 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 97 } 98 }); 99 } 98 100 } 99 }100 catch (AbandonedMutexException) {101 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");102 }103 catch (Exception ex) {104 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());105 }106 finally {107 if (mutexAquired) mutex.ReleaseMutex();101 catch (AbandonedMutexException) { 102 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned."); 103 } 104 catch (Exception ex) { 105 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString()); 106 } 107 finally { 108 if (mutexAquired) mutex.ReleaseMutex(); 109 } 108 110 } 109 111 } … … 132 134 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll)); 133 135 } else { 134 trans.UseTransaction(() => {135 // process the jobProgresses136 foreach (var jobProgress in heartbeat.JobProgress){136 // process the jobProgresses 137 foreach (var jobProgress in heartbeat.JobProgress) { 138 trans.UseTransaction(() => { 137 139 var curTask = dao.GetTaskDA(jobProgress.Key); 138 140 if (curTask == null) { … … 141 143 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); 142 144 } else { 143 var currentStateLog = curTask.StateLogs.Last OrDefault();145 var currentStateLog = curTask.StateLogs.Last(); 144 146 if (currentStateLog.SlaveId == Guid.Empty || currentStateLog.SlaveId != heartbeat.SlaveId) { 145 147 // assigned slave does not match heartbeat … … 168 170 } 169 171 } 170 } 171 } );172 }); 173 } 172 174 } 173 175 return actions; … … 175 177 176 178 private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, DA.Task curTask) { 177 var assignedResourceIds = curTask.AssignedResources.Select(ar => ar.Resource.ResourceId);178 var slaveResourceIds = dao.GetParentResources DA(slaveId).Select(x => x.ResourceId);179 var assignedResourceIds = dao.GetAssignedResourcesIdsDA(curTask.TaskId); 180 var slaveResourceIds = dao.GetParentResourcesIDsDA(slaveId).ToArray(); 179 181 return assignedResourceIds.Any(r => slaveResourceIds.Contains(r)); 180 182 } 181 183 182 184 private bool SlaveIsAllowedToCalculate(Guid slaveId) { 183 var parentResources = dao.GetParentResourcesDA(slaveId);185 var downtimes = dao.GetNumberOfDowntimesFromParentResourcesAtCurrentTimeDA(slaveId, DA.DowntimeType.Offline); 184 186 // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also 185 return parentResources.All(r => r.Downtimes.Where(d => d.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)).Count()== 0);187 return downtimes.All(x => x == 0); 186 188 } 187 189 188 190 private bool ShutdownSlaveComputer(Guid slaveId) { 189 var parentResources = dao.GetParentResourcesDA(slaveId); 190 191 return parentResources.Any(r => r.Downtimes.Where(d => d.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)).Count() != 0); 191 var downtimes = dao.GetNumberOfDowntimesFromParentResourcesAtCurrentTimeDA(slaveId, DA.DowntimeType.Shutdown); 192 return downtimes.Any(x => x != 0); 192 193 } 193 194 }
Note: See TracChangeset
for help on using the changeset viewer.