Changeset 9665 for trunk/sources/HeuristicLab.Services.Hive/3.3/Manager
- Timestamp:
- 06/28/13 12:05:53 (11 years ago)
- Location:
- trunk/sources/HeuristicLab.Services.Hive
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.Services.Hive
- Property svn:mergeinfo changed
/branches/HivePerformance/sources/HeuristicLab.Services.Hive (added) merged: 9369,9381,9385,9391,9393,9397,9399,9434,9444,9469,9485,9539,9634-9636
- Property svn:mergeinfo changed
-
trunk/sources/HeuristicLab.Services.Hive/3.3
- Property svn:ignore
-
old new 2 2 bin 3 3 obj 4 *.user
-
- Property svn:ignore
-
trunk/sources/HeuristicLab.Services.Hive/3.3/Manager/EventManager.cs
r9456 r9665 78 78 //we have to find another way to deal with this. 79 79 //until then the next line is commented out... 80 //stats.UserStatistics = d ao.GetUserStatistics();80 //stats.UserStatistics = dtoDao.GetUserStatistics(); 81 81 dao.AddStatistics(stats); 82 82 } -
trunk/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
r9456 r9665 24 24 using System.Linq; 25 25 using System.Threading; 26 using HeuristicLab.Services.Hive.Data Transfer;27 using DA = HeuristicLab.Services.Hive.DataAccess;26 using HeuristicLab.Services.Hive.DataAccess; 27 using Heartbeat = HeuristicLab.Services.Hive.DataTransfer.Heartbeat; 28 28 29 29 namespace HeuristicLab.Services.Hive { … … 31 31 private const string MutexName = "HiveTaskSchedulingMutex"; 32 32 33 private I HiveDao dao {34 get { return ServiceLocator.Instance. HiveDao; }33 private IOptimizedHiveDao dao { 34 get { return ServiceLocator.Instance.OptimizedHiveDao; } 35 35 } 36 36 private ITaskScheduler taskScheduler { … … 47 47 public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) { 48 48 List<MessageContainer> actions = new List<MessageContainer>(); 49 49 50 Slave slave = null; 50 slave = trans.UseTransaction(() => { return dao.GetSlave(heartbeat.SlaveId); }); 51 51 trans.UseTransaction(() => { 52 slave = dao.GetSlaveById(heartbeat.SlaveId); 53 }); 52 54 if (slave == null) { 53 55 actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello)); … … 56 58 actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval)); 57 59 } 58 if ( ShutdownSlaveComputer(slave.Id)) {60 if (dao.SlaveHasToShutdownComputer(slave.ResourceId)) { 59 61 actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer)); 60 62 } … … 64 66 slave.FreeMemory = heartbeat.FreeMemory; 65 67 slave.CpuUtilization = heartbeat.CpuUtilization; 66 slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.Id);68 slave.IsAllowedToCalculate = dao.SlaveIsAllowedToCalculate(slave.ResourceId); 67 69 slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle; 68 70 slave.LastHeartbeat = DateTime.Now; 69 71 70 trans.UseTransaction(() => { dao.UpdateSlave(slave); }); 72 trans.UseTransaction(() => { 73 dao.UpdateSlave(slave); 74 }); 71 75 72 76 // update task data … … 78 82 var mutex = new Mutex(false, MutexName); 79 83 try { 84 80 85 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 81 86 if (!mutexAquired) 82 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");87 LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); 83 88 else { 84 IEnumerable<TaskInfoForScheduler> availableTasks = null; 85 availableTasks = trans.UseTransaction(() => { return taskScheduler.Schedule(dao.GetWaitingTasks(slave)); }); 86 if (availableTasks.Any()) { 87 var task = availableTasks.First(); 88 AssignJob(slave, task.TaskId); 89 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 90 } 89 trans.UseTransaction(() => { 90 IEnumerable<TaskInfoForScheduler> availableTasks = null; 91 availableTasks = taskScheduler.Schedule(dao.GetWaitingTasks(slave).ToArray()); 92 if (availableTasks.Any()) { 93 var task = availableTasks.First(); 94 AssignTask(slave, task.TaskId); 95 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 96 } 97 }); 91 98 } 92 99 } 93 100 catch (AbandonedMutexException) { 94 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");101 LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned."); 95 102 } 96 103 catch (Exception ex) { 97 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());104 LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString()); 98 105 } 99 106 finally { … … 105 112 } 106 113 107 private void AssignJob(Slave slave, Guid taskId) { 108 trans.UseTransaction(() => { 109 var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.Id, null, null); 114 private void AssignTask(Slave slave, Guid taskId) { 115 var task = dao.UpdateTaskState(taskId, TaskState.Transferring, slave.ResourceId, null, null); 110 116 111 // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout) 112 task.LastHeartbeat = DateTime.Now; 113 dao.UpdateTask(task); 114 }); 117 // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout) 118 task.LastHeartbeat = DateTime.Now; 119 dao.UpdateTask(task); 115 120 } 116 121 … … 130 135 // process the jobProgresses 131 136 foreach (var jobProgress in heartbeat.JobProgress) { 132 Task curTask = null; 133 curTask = trans.UseTransaction(() => { return dao.GetTask(jobProgress.Key); }); 137 Tuple<Task, Guid?> taskWithLastStateLogSlaveId = null; 138 trans.UseTransaction(() => { 139 taskWithLastStateLogSlaveId = dao.GetTaskByIdAndLastStateLogSlaveId(jobProgress.Key); 140 }); 141 var curTask = taskWithLastStateLogSlaveId != null ? taskWithLastStateLogSlaveId.Item1 : null; 134 142 if (curTask == null) { 135 143 // task does not exist in db 136 144 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key)); 137 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);145 LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); 138 146 } else { 139 if (curTask.CurrentStateLog.SlaveId == Guid.Empty || curTask.CurrentStateLog.SlaveId != heartbeat.SlaveId) { 147 var slaveId = taskWithLastStateLogSlaveId.Item2; 148 if (slaveId == Guid.Empty || slaveId != heartbeat.SlaveId) { 140 149 // assigned slave does not match heartbeat 141 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask. Id));142 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask);143 } else if (! TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) {150 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 151 LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask); 152 } else if (!dao.TaskIsAllowedToBeCalculatedBySlave(curTask.TaskId, heartbeat.SlaveId)) { 144 153 // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group 145 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask. Id));154 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 146 155 } else { 147 156 // save task execution time 148 curTask.ExecutionTime = jobProgress.Value;157 curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds; 149 158 curTask.LastHeartbeat = DateTime.Now; 150 159 151 160 switch (curTask.Command) { 152 161 case Command.Stop: 153 actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask. Id));162 actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId)); 154 163 break; 155 164 case Command.Pause: 156 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask. Id));165 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 157 166 break; 158 167 case Command.Abort: 159 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask. Id));168 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 160 169 break; 161 170 } 162 trans.UseTransaction(() => { dao.UpdateTask(curTask); }); 171 trans.UseTransaction(() => { 172 dao.UpdateTask(curTask); 173 }); 163 174 } 164 175 } … … 167 178 return actions; 168 179 } 169 170 private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) {171 return trans.UseTransaction(() => {172 var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id);173 var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id);174 return assignedResourceIds.Any(x => slaveResourceIds.Contains(x));175 });176 }177 178 private bool SlaveIsAllowedToCalculate(Guid slaveId) {179 // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also180 return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0); });181 }182 183 private bool ShutdownSlaveComputer(Guid slaveId) {184 return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).Any(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() != 0); });185 }186 180 } 187 181 }
Note: See TracChangeset
for help on using the changeset viewer.