Changeset 9397
- Timestamp:
- 04/25/13 19:52:24 (12 years ago)
- Location:
- branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/HiveDao.cs
r9393 r9397 31 31 32 32 #region Task Methods 33 public Task GetTaskById(Guid id) {34 return GetTaskByIdQuery(Db, id).SingleOrDefault();35 } 36 37 private static Func<HiveDataContext, Guid, IQueryable<Task>> GetTaskByIdQuery = CompiledQuery.Compile((HiveDataContext db, Guid id) =>33 public Task GetTaskById(Guid taskId) { 34 return GetTaskByIdQuery(Db, taskId).SingleOrDefault(); 35 } 36 37 private static Func<HiveDataContext, Guid, IQueryable<Task>> GetTaskByIdQuery = CompiledQuery.Compile((HiveDataContext db, Guid taskId) => 38 38 from t in db.Tasks 39 where t.TaskId == id39 where t.TaskId == taskId 40 40 select t 41 41 ); 42 42 43 public void UpdateTaskAndPlugins(Task task) { /* 44 foreach (var pluginId in task.RequiredPlugins.Select(p => p.PluginId)) { 45 if (db.RequiredPlugins.Count(p => p.PluginId == pluginId) == 0) { 46 db.RequiredPlugins.InsertOnSubmit(new RequiredPlugin() { TaskId = task.TaskId, PluginId = pluginId }); 47 } 48 } 49 db.SubmitChanges(); */ 50 } 51 52 public void UpdateTask(Task task) { 53 Db.SubmitChanges(); 54 } 55 56 public IQueryable<TaskInfoForScheduler> GetWaitingTasks(Slave slave) { 57 var parentResources = GetParentResources(slave.ResourceId); 58 var resourceIds = parentResources.Select(x => x.ResourceId); 43 public Tuple<Task, Guid?> GetTaskByIdAndLastStateLogSlaveId(Guid taskId) { 44 return GetTaskByIdAndLastStateLogSlaveIdQuery(Db, taskId).SingleOrDefault(); 45 } 46 47 private static Func<HiveDataContext, Guid, IQueryable<Tuple<Task, Guid?>>> GetTaskByIdAndLastStateLogSlaveIdQuery = CompiledQuery.Compile((HiveDataContext db, Guid taskId) => 48 from t in db.Tasks 49 join sl in db.StateLogs on t.TaskId equals sl.TaskId 50 where t.TaskId == taskId 51 orderby sl.DateTime descending 52 group sl by t into logs 53 select new Tuple<Task, Guid?>(logs.Key, logs.First().SlaveId) 54 ); 55 56 private const string GetWaitingTasksQueryString = @" 57 WITH pr AS ( 58 SELECT ResourceId, ParentResourceId 59 FROM [Resource] 60 WHERE ResourceId = {0} 61 UNION ALL 62 SELECT r.ResourceId, r.ParentResourceId 63 FROM [Resource] r JOIN pr ON r.ResourceId = pr.ParentResourceId 64 ) 65 SELECT DISTINCT t.TaskId, t.JobId, t.Priority 66 FROM pr JOIN AssignedResources ar ON ar.ResourceId = pr.ResourceId 67 JOIN Task t ON t.TaskId = ar.TaskId 68 WHERE NOT (t.IsParentTask = 1 AND t.FinishWhenChildJobsFinished = 1) 69 AND t.TaskState = {1} 70 AND t.CoresNeeded <= {2} 71 AND t.MemoryNeeded <= {3} 72 "; 73 74 public IEnumerable<TaskInfoForScheduler> GetWaitingTasks(Slave slave) { 59 75 //Originally we checked here if there are parent tasks which should be calculated (with GetParentTasks(resourceIds, count, false);). 60 76 //Because there is at the moment no case where this makes sense (there don't exist parent tasks which need to be calculated), 61 77 //we skip this step because it's wasted runtime 62 63 return from ar in Db.AssignedResources 64 where resourceIds.Contains(ar.ResourceId) 65 && !(ar.Task.IsParentTask && ar.Task.FinishWhenChildJobsFinished) 66 && ar.Task.State == TaskState.Waiting 67 && ar.Task.CoresNeeded <= slave.FreeCores 68 && ar.Task.MemoryNeeded <= slave.FreeMemory 69 select new TaskInfoForScheduler() { TaskId = ar.Task.TaskId, JobId = ar.Task.JobId, Priority = ar.Task.Priority }; 70 } 71 72 /*private static Func<HiveDataContext, Guid, Slave, IQueryable<TaskInfoForScheduler>> GetWaitingTasksQuery = CompiledQuery.Compile((HiveDataContext db, Guid id, Slave slave) => 73 from ar in db.AssignedResources 74 where ar.ResourceId == id 75 && !(ar.Task.IsParentTask && ar.Task.FinishWhenChildJobsFinished) 76 && ar.Task.State == TaskState.Waiting 77 && ar.Task.CoresNeeded <= slave.FreeCores 78 && ar.Task.MemoryNeeded <= slave.FreeMemory 79 select new TaskInfoForScheduler() { TaskId = ar.Task.TaskId, JobId = ar.Task.JobId, Priority = ar.Task.Priority } 80 );*/ 78 return Db.ExecuteQuery<TaskInfoForScheduler>(GetWaitingTasksQueryString, slave.ResourceId, Enum.GetName(typeof(TaskState), TaskState.Waiting), slave.FreeCores, slave.FreeMemory); 79 } 80 81 public IQueryable<DT.LightweightTask> GetLightweightTasks(Guid jobId) { 82 return GetLightweightTasksQuery(Db, jobId); 83 } 84 85 private static Func<HiveDataContext, Guid, IQueryable<DT.LightweightTask>> GetLightweightTasksQuery = CompiledQuery.Compile((HiveDataContext db, Guid jobId) => 86 from task in db.Tasks 87 where task.JobId == jobId 88 select new DT.LightweightTask { 89 Id = task.TaskId, 90 ExecutionTime = TimeSpan.FromMilliseconds(task.ExecutionTimeMs), 91 ParentTaskId = task.ParentTaskId, 92 StateLog = task.StateLogs.Select(sl => ConvertStateLog(sl)).ToList(), 93 State = ConvertTaskState(task.State), 94 Command = ConvertCommand(task.Command), 95 LastTaskDataUpdate = task.JobData.LastUpdate 96 } 97 ); 98 99 static Func<StateLog, DT.StateLog> ConvertStateLog = sl => DT.Convert.ToDto(sl); 100 static Func<TaskState, DT.TaskState> ConvertTaskState = ts => DT.Convert.ToDto(ts); 101 static Func<Command?, DT.Command?> ConvertCommand = c => DT.Convert.ToDto(c); 102 103 public void UpdateTask(Task task) { 104 Db.SubmitChanges(); 105 } 81 106 82 107 public Task UpdateTaskState(Guid taskId, TaskState taskState, Guid? slaveId, Guid? userId, string exception) { … … 97 122 return task; 98 123 } 124 125 private const string TaskIsAllowedToBeCalculatedBySlaveQueryString = @" 126 WITH pr AS ( 127 SELECT ResourceId, ParentResourceId 128 FROM [Resource] 129 WHERE ResourceId = {0} 130 UNION ALL 131 SELECT r.ResourceId, r.ParentResourceId 132 FROM [Resource] r JOIN pr ON r.ResourceId = pr.ParentResourceId 133 ) 134 SELECT COUNT(ar.TaskId) 135 FROM pr JOIN AssignedResources ar ON pr.ResourceId = ar.ResourceId 136 WHERE ar.TaskId = {1} 137 "; 138 139 public bool TaskIsAllowedToBeCalculatedBySlave(Guid taskId, Guid slaveId) { 140 return Db.ExecuteQuery<int>(TaskIsAllowedToBeCalculatedBySlaveQueryString, slaveId, taskId).First() > 0; 141 } 99 142 #endregion 100 143 … … 155 198 Db.SubmitChanges(); 156 199 } 200 201 private const string DowntimeQueryString = @" 202 WITH pr AS ( 203 SELECT ResourceId, ParentResourceId 204 FROM [Resource] 205 WHERE ResourceId = {0} 206 UNION ALL 207 SELECT r.ResourceId, r.ParentResourceId 208 FROM [Resource] r JOIN pr ON r.ResourceId = pr.ParentResourceId 209 ) 210 SELECT COUNT(dt.DowntimeId) 211 FROM pr JOIN [Downtime] dt ON pr.ResourceId = dt.ResourceId 212 WHERE {1} BETWEEN dt.StartDate AND dt.EndDate 213 AND dt.DowntimeType = {2} 214 "; 215 216 public bool SlaveHasToShutdownComputer(Guid slaveId) { 217 return Db.ExecuteQuery<int>(DowntimeQueryString, slaveId, DateTime.Now, DowntimeType.Shutdown).First() > 0; 218 } 219 220 public bool SlaveIsAllowedToCalculate(Guid slaveId) { 221 return Db.ExecuteQuery<int>(DowntimeQueryString, slaveId, DateTime.Now, DowntimeType.Offline).First() == 0; 222 } 157 223 #endregion 158 224 … … 162 228 163 229 #region Resource Methods 164 public IQueryable<Guid> GetAssignedResourcesIds(Guid taskId) {165 return GetAssignedResourcesIdQuery(Db, taskId);166 }167 168 private static Func<HiveDataContext, Guid, IQueryable<Guid>> GetAssignedResourcesIdQuery = CompiledQuery.Compile((HiveDataContext db, Guid taskId) =>169 from ar in db.AssignedResources170 where ar.TaskId == taskId171 select ar.ResourceId172 );173 174 public IEnumerable<Resource> GetParentResources(Guid resourceId) {175 var child = Db.Resources.Single(r => r.ResourceId == resourceId);176 177 yield return child;178 while (child.ParentResource != null) {179 child = child.ParentResource;180 yield return child;181 }182 }183 184 public IEnumerable<Guid> GetParentResourcesIDs(Guid resourceId) {185 var child = Db.Resources.Single(r => r.ResourceId == resourceId);186 187 yield return resourceId;188 while (child.ParentResource != null) {189 child = child.ParentResource;190 yield return child.ResourceId;191 }192 }193 194 private static void CollectParentResources(ICollection<Resource> resources, Resource resource) {195 if (resource == null) return;196 resources.Add(resource);197 CollectParentResources(resources, resource.ParentResource);198 }199 230 #endregion 200 231 … … 212 243 213 244 #region Downtime Methods 214 public IQueryable<int> GetNumberOfDowntimesFromParentResourcesAtCurrentTime(Guid resourceId, DowntimeType type) {215 var ids = GetParentResourcesIDs(resourceId).ToArray();216 217 return from r in Db.Resources218 where ids.Contains(r.ResourceId)219 select (from d in Db.Downtimes220 where d.ResourceId == r.ResourceId && d.DowntimeType == type && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)221 select d).Count();222 }223 224 /*private static Func<HiveDataContext, Guid, DowntimeType, int> GetNumberOfDowntimesAtCurrentTimeQuery =225 CompiledQuery.Compile((HiveDataContext db, Guid ids, DowntimeType type) =>226 (from d in db.Downtimes227 where d.ResourceId == ids && d.DowntimeType == type && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)228 select d).Count()229 );*/230 231 245 #endregion 232 246 -
branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/HiveService.cs
r9391 r9397 132 132 133 133 return trans.UseTransaction(() => { 134 return d toDao.GetLightweightTasks(task => task.JobId ==jobId).ToArray();134 return dao.GetLightweightTasks(jobId).ToArray(); 135 135 }, false, true); 136 136 } … … 217 217 } 218 218 219 //d toDao.UpdateTaskAndPluginsDA(task); no idea why this is needed219 //dao.UpdateTaskAndPlugins(task); no idea why this is needed 220 220 return DT.Convert.ToDto(task); 221 221 }); -
branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Interfaces/IHiveDao.cs
r9393 r9397 29 29 public interface IHiveDao { 30 30 #region Task Methods 31 Task GetTaskById(Guid id); 31 Task GetTaskById(Guid task); 32 Tuple<Task, Guid?> GetTaskByIdAndLastStateLogSlaveId(Guid taskId); 32 33 33 IQueryable<TaskInfoForScheduler> GetWaitingTasks(Slave slave); 34 IEnumerable<TaskInfoForScheduler> GetWaitingTasks(Slave slave); 35 IQueryable<DT.LightweightTask> GetLightweightTasks(Guid jobId); 34 36 35 37 void UpdateTask(Task task); … … 37 39 Task UpdateTaskState(Guid taskId, TaskState taskState, Guid? slaveId, Guid? userId, string exception); 38 40 41 bool TaskIsAllowedToBeCalculatedBySlave(Guid taskId, Guid slaveId); 39 42 #endregion 40 43 … … 68 71 69 72 void UpdateSlave(Slave slave); 73 74 bool SlaveHasToShutdownComputer(Guid slaveId); 75 bool SlaveIsAllowedToCalculate(Guid slaveId); 70 76 #endregion 71 77 … … 75 81 76 82 #region Resource Methods 77 IQueryable<Guid> GetAssignedResourcesIds(Guid taskId); 78 IEnumerable<Resource> GetParentResources(Guid resourceId); 79 IEnumerable<Guid> GetParentResourcesIDs(Guid resourceId); 83 80 84 #endregion 81 85 … … 93 97 94 98 #region Downtime Methods 95 IQueryable<int> GetNumberOfDowntimesFromParentResourcesAtCurrentTime(Guid resourceId, DowntimeType type);96 99 #endregion 97 100 -
branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
r9391 r9397 51 51 trans.UseTransaction(() => { 52 52 slave = dao.GetSlaveById(heartbeat.SlaveId); 53 }); 54 if (slave == null) { 55 actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello)); 56 } else { 57 if (heartbeat.HbInterval != slave.HbInterval) { 58 actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval)); 59 } 60 if (dao.SlaveHasToShutdownComputer(slave.ResourceId)) { 61 actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer)); 62 } 53 63 54 if (slave == null) { 55 actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello)); 56 } else { 57 if (heartbeat.HbInterval != slave.HbInterval) { 58 actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval)); 59 } 60 if (ShutdownSlaveComputer(slave.ResourceId)) { 61 actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer)); 62 } 64 // update slave data 65 slave.FreeCores = heartbeat.FreeCores; 66 slave.FreeMemory = heartbeat.FreeMemory; 67 slave.CpuUtilization = heartbeat.CpuUtilization; 68 slave.IsAllowedToCalculate = dao.SlaveIsAllowedToCalculate(slave.ResourceId); 69 slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle; 70 slave.LastHeartbeat = DateTime.Now; 63 71 64 // update slave data 65 slave.FreeCores = heartbeat.FreeCores; 66 slave.FreeMemory = heartbeat.FreeMemory; 67 slave.CpuUtilization = heartbeat.CpuUtilization; 68 slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.ResourceId); 69 slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle; 70 slave.LastHeartbeat = DateTime.Now; 71 72 trans.UseTransaction(() => { 72 73 dao.UpdateSlave(slave); 73 } 74 } );74 }); 75 } 75 76 76 77 if (slave != null) { … … 136 137 // process the jobProgresses 137 138 foreach (var jobProgress in heartbeat.JobProgress) { 139 Tuple<Task, Guid?> taskWithLastStateLogSlaveId = null; 138 140 trans.UseTransaction(() => { 139 var curTask = dao.GetTaskById(jobProgress.Key); 140 if (curTask == null) { 141 // task does not exist in db 142 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key)); 143 LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); 141 taskWithLastStateLogSlaveId = dao.GetTaskByIdAndLastStateLogSlaveId(jobProgress.Key); 142 }); 143 var curTask = taskWithLastStateLogSlaveId != null ? taskWithLastStateLogSlaveId.Item1 : null; 144 if (curTask == null) { 145 // task does not exist in db 146 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key)); 147 LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); 148 } else { 149 var slaveId = taskWithLastStateLogSlaveId.Item2; 150 if (slaveId == Guid.Empty || slaveId != heartbeat.SlaveId) { 151 // assigned slave does not match heartbeat 152 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 153 LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask); 154 } else if (!dao.TaskIsAllowedToBeCalculatedBySlave(curTask.TaskId, heartbeat.SlaveId)) { 155 // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group 156 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 144 157 } else { 145 var currentStateLog = curTask.StateLogs.Last(); 146 if (currentStateLog.SlaveId == Guid.Empty || currentStateLog.SlaveId != heartbeat.SlaveId) { 147 // assigned slave does not match heartbeat 148 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 149 LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask); 150 } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) { 151 // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group 152 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 153 } else { 154 // save task execution time 155 curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds; 156 curTask.LastHeartbeat = DateTime.Now; 158 // save task execution time 159 curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds; 160 curTask.LastHeartbeat = DateTime.Now; 157 161 158 switch (curTask.Command) { 159 case Command.Stop: 160 actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId)); 161 break; 162 case Command.Pause: 163 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 164 break; 165 case Command.Abort: 166 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 167 break; 168 } 162 switch (curTask.Command) { 163 case Command.Stop: 164 actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId)); 165 break; 166 case Command.Pause: 167 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 168 break; 169 case Command.Abort: 170 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 171 break; 172 } 173 trans.UseTransaction(() => { 169 174 dao.UpdateTask(curTask); 170 } 175 }); 171 176 } 172 } );177 } 173 178 } 174 179 } 175 180 return actions; 176 181 } 177 178 private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) {179 var assignedResourceIds = dao.GetAssignedResourcesIds(curTask.TaskId);180 var slaveResourceIds = dao.GetParentResourcesIDs(slaveId).ToArray();181 return assignedResourceIds.Any(r => slaveResourceIds.Contains(r));182 }183 184 private bool SlaveIsAllowedToCalculate(Guid slaveId) {185 var downtimes = dao.GetNumberOfDowntimesFromParentResourcesAtCurrentTime(slaveId, DowntimeType.Offline);186 // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also187 return downtimes.All(x => x == 0);188 }189 190 private bool ShutdownSlaveComputer(Guid slaveId) {191 var downtimes = dao.GetNumberOfDowntimesFromParentResourcesAtCurrentTime(slaveId, DowntimeType.Shutdown);192 return downtimes.Any(x => x != 0);193 }194 182 } 195 183 }
Note: See TracChangeset
for help on using the changeset viewer.