Changeset 9385
- Timestamp:
- 04/19/13 17:16:49 (12 years ago)
- Location:
- branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/HiveDao.cs
r9381 r9385 22 22 using System; 23 23 using System.Collections.Generic; 24 using System.Data.Linq; 24 25 using System.Linq; 25 26 using System.Linq.Expressions; … … 45 46 public Task GetTaskDA(Guid id) { 46 47 var db = HiveOperationContext.Current.DataContext; 47 return db.Tasks.SingleOrDefault(x => x.TaskId == id); 48 } 48 return GetTaskByIdQuery(db, id).SingleOrDefault(); 49 } 50 51 private static Func<HiveDataContext, Guid, IQueryable<Task>> GetTaskByIdQuery = CompiledQuery.Compile((HiveDataContext db, Guid id) => 52 from t in db.Tasks 53 where t.TaskId == id 54 select t 55 ); 49 56 50 57 public IEnumerable<DT.Task> GetTasks(Expression<Func<Task, bool>> predicate) { … … 205 212 } 206 213 207 public I Enumerable<TaskInfoForScheduler> GetWaitingTasksDA(Slave slave) {214 public IQueryable<TaskInfoForScheduler> GetWaitingTasksDA(Slave slave) { 208 215 var db = HiveOperationContext.Current.DataContext; 216 209 217 var parentResources = GetParentResourcesDA(slave.ResourceId); 210 218 var resourceIds = parentResources.Select(x => x.ResourceId); … … 213 221 //we skip this step because it's wasted runtime 214 222 215 var query = from ar in db.AssignedResources 216 where resourceIds.Contains(ar.ResourceId) 217 && !(ar.Task.IsParentTask && ar.Task.FinishWhenChildJobsFinished) 218 && ar.Task.State == TaskState.Waiting 219 && ar.Task.CoresNeeded <= slave.FreeCores 220 && ar.Task.MemoryNeeded <= slave.FreeMemory 221 select new TaskInfoForScheduler() { TaskId = ar.Task.TaskId, JobId = ar.Task.JobId, Priority = ar.Task.Priority }; 222 var waitingTasks = query.ToArray(); 223 return waitingTasks; 224 } 223 return from ar in db.AssignedResources 224 where resourceIds.Contains(ar.ResourceId) 225 && !(ar.Task.IsParentTask && ar.Task.FinishWhenChildJobsFinished) 226 && ar.Task.State == TaskState.Waiting 227 && ar.Task.CoresNeeded <= slave.FreeCores 228 && ar.Task.MemoryNeeded <= slave.FreeMemory 229 select new TaskInfoForScheduler() { TaskId = ar.Task.TaskId, JobId = ar.Task.JobId, Priority = ar.Task.Priority }; 230 } 231 232 /*private static Func<HiveDataContext, Guid, Slave, IQueryable<TaskInfoForScheduler>> GetWaitingTasksQuery = CompiledQuery.Compile((HiveDataContext db, Guid id, Slave slave) => 233 from ar in db.AssignedResources 234 where ar.ResourceId == id 235 && !(ar.Task.IsParentTask && ar.Task.FinishWhenChildJobsFinished) 236 && ar.Task.State == TaskState.Waiting 237 && ar.Task.CoresNeeded <= slave.FreeCores 238 && ar.Task.MemoryNeeded <= slave.FreeMemory 239 select new TaskInfoForScheduler() { TaskId = ar.Task.TaskId, JobId = ar.Task.JobId, Priority = ar.Task.Priority } 240 );*/ 225 241 226 242 public DT.Task UpdateTaskState(Guid taskId, TaskState taskState, Guid? slaveId, Guid? userId, string exception) { … … 265 281 }); 266 282 267 var task = db.Tasks.SingleOrDefault(x => x.TaskId ==taskId);283 var task = GetTaskDA(taskId); 268 284 task.State = taskState; 269 285 … … 567 583 public Slave GetSlaveDA(Guid id) { 568 584 var db = HiveOperationContext.Current.DataContext; 569 return db.Resources.OfType<Slave>().SingleOrDefault(x => x.ResourceId == id); 570 } 585 return GetSlaveByIdQuery(db, id).SingleOrDefault(); 586 } 587 588 private static Func<HiveDataContext, Guid, IQueryable<Slave>> GetSlaveByIdQuery = CompiledQuery.Compile((HiveDataContext db, Guid slaveId) => 589 from s in db.Resources.OfType<Slave>() 590 where s.ResourceId == slaveId 591 select s 592 ); 571 593 572 594 public IEnumerable<DT.Slave> GetSlaves(Expression<Func<Slave, bool>> predicate) { … … 714 736 } 715 737 738 public IQueryable<Guid> GetAssignedResourcesIdsDA(Guid taskId) { 739 var db = HiveOperationContext.Current.DataContext; 740 return GetAssignedResourcesIdQuery(db, taskId); 741 } 742 743 private static Func<HiveDataContext, Guid, IQueryable<Guid>> GetAssignedResourcesIdQuery = CompiledQuery.Compile((HiveDataContext db, Guid taskId) => 744 from ar in db.AssignedResources 745 where ar.TaskId == taskId 746 select ar.ResourceId 747 ); 748 716 749 /// <summary> 717 750 /// Returns all parent resources of a resource (the given resource is also added) … … 727 760 public IEnumerable<Resource> GetParentResourcesDA(Guid resourceId) { 728 761 var db = HiveOperationContext.Current.DataContext; 729 var resources = new List<Resource>(); 730 CollectParentResources(resources, db.Resources.Where(r => r.ResourceId == resourceId).Single()); 731 return resources; 732 } 733 734 private void CollectParentResources(List<Resource> resources, Resource resource) { 762 var child = db.Resources.Single(r => r.ResourceId == resourceId); 763 764 yield return child; 765 while (child.ParentResource != null) { 766 child = child.ParentResource; 767 yield return child; 768 } 769 } 770 771 public IEnumerable<Guid> GetParentResourcesIDsDA(Guid resourceId) { 772 var db = HiveOperationContext.Current.DataContext; 773 var child = db.Resources.Single(r => r.ResourceId == resourceId); 774 775 yield return resourceId; 776 while (child.ParentResource != null) { 777 child = child.ParentResource; 778 yield return child.ResourceId; 779 } 780 } 781 782 public IQueryable<int> GetNumberOfDowntimesFromParentResourcesAtCurrentTimeDA(Guid resourceId, DowntimeType type) { 783 var db = HiveOperationContext.Current.DataContext; 784 785 var ids = GetParentResourcesIDsDA(resourceId).ToArray(); 786 787 return from r in db.Resources 788 where ids.Contains(r.ResourceId) 789 select (from d in db.Downtimes 790 where d.ResourceId == r.ResourceId && d.DowntimeType == type && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate) 791 select d).Count(); 792 } 793 794 /*private static Func<HiveDataContext, Guid, DowntimeType, int> GetNumberOfDowntimesAtCurrentTimeQuery = 795 CompiledQuery.Compile((HiveDataContext db, Guid ids, DowntimeType type) => 796 (from d in db.Downtimes 797 where d.ResourceId == ids && d.DowntimeType == type && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate) 798 select d).Count() 799 );*/ 800 801 private static void CollectParentResources(ICollection<Resource> resources, Resource resource) { 735 802 if (resource == null) return; 736 803 resources.Add(resource); -
branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Interfaces/IHiveDao.cs
r9381 r9385 22 22 using System; 23 23 using System.Collections.Generic; 24 using System.Linq; 24 25 using System.Linq.Expressions; 25 26 using HeuristicLab.Services.Hive.DataAccess; … … 41 42 void DeleteTask(Guid id); 42 43 IEnumerable<TaskInfoForScheduler> GetWaitingTasks(DT.Slave slave); 43 I Enumerable<TaskInfoForScheduler> GetWaitingTasksDA(Slave slave);44 IQueryable<TaskInfoForScheduler> GetWaitingTasksDA(Slave slave); 44 45 IEnumerable<DT.Task> GetParentTasks(IEnumerable<Guid> resourceIds, int count, bool finished); 45 46 DT.Task UpdateTaskState(Guid taskId, TaskState taskState, Guid? slaveId, Guid? userId, string exception); … … 123 124 void AssignJobToResource(Guid taskId, IEnumerable<Guid> resourceIds); 124 125 IEnumerable<DT.Resource> GetAssignedResources(Guid jobId); 126 IQueryable<Guid> GetAssignedResourcesIdsDA(Guid taskId); 125 127 IEnumerable<DT.Resource> GetParentResources(Guid resourceId); 126 128 IEnumerable<Resource> GetParentResourcesDA(Guid resourceId); 129 IEnumerable<Guid> GetParentResourcesIDsDA(Guid resourceId); 130 IQueryable<int> GetNumberOfDowntimesFromParentResourcesAtCurrentTimeDA(Guid resourceId, DowntimeType type); 127 131 IEnumerable<DT.Resource> GetChildResources(Guid resourceId); 128 132 IEnumerable<DT.Task> GetJobsByResourceId(Guid resourceId); -
branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
r9381 r9385 74 74 }); 75 75 76 // update task data 77 actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate)); 76 if (slave != null) { 77 // update task data 78 actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate)); 78 79 79 // assign new task80 if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {81 bool mutexAquired = false;82 var mutex = new Mutex(false, MutexName);83 try {80 // assign new task 81 if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) { 82 bool mutexAquired = false; 83 var mutex = new Mutex(false, MutexName); 84 try { 84 85 85 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 86 if (!mutexAquired) 87 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); 88 else { 89 trans.UseTransaction(() => { 90 IEnumerable<TaskInfoForScheduler> availableTasks = null; 91 availableTasks = taskScheduler.Schedule(dao.GetWaitingTasksDA(slave)); 92 if (availableTasks.Any()) { 93 var task = availableTasks.First(); 94 AssignJob(slave, task.TaskId); 95 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 96 } 97 }); 86 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 87 if (!mutexAquired) 88 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); 89 else { 90 trans.UseTransaction(() => { 91 IEnumerable<TaskInfoForScheduler> availableTasks = null; 92 availableTasks = taskScheduler.Schedule(dao.GetWaitingTasksDA(slave).ToArray()); 93 if (availableTasks.Any()) { 94 var task = availableTasks.First(); 95 AssignJob(slave, task.TaskId); 96 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 97 } 98 }); 99 } 98 100 } 99 }100 catch (AbandonedMutexException) {101 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");102 }103 catch (Exception ex) {104 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());105 }106 finally {107 if (mutexAquired) mutex.ReleaseMutex();101 catch (AbandonedMutexException) { 102 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned."); 103 } 104 catch (Exception ex) { 105 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString()); 106 } 107 finally { 108 if (mutexAquired) mutex.ReleaseMutex(); 109 } 108 110 } 109 111 } … … 132 134 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll)); 133 135 } else { 134 trans.UseTransaction(() => {135 // process the jobProgresses136 foreach (var jobProgress in heartbeat.JobProgress){136 // process the jobProgresses 137 foreach (var jobProgress in heartbeat.JobProgress) { 138 trans.UseTransaction(() => { 137 139 var curTask = dao.GetTaskDA(jobProgress.Key); 138 140 if (curTask == null) { … … 141 143 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); 142 144 } else { 143 var currentStateLog = curTask.StateLogs.Last OrDefault();145 var currentStateLog = curTask.StateLogs.Last(); 144 146 if (currentStateLog.SlaveId == Guid.Empty || currentStateLog.SlaveId != heartbeat.SlaveId) { 145 147 // assigned slave does not match heartbeat … … 168 170 } 169 171 } 170 } 171 } );172 }); 173 } 172 174 } 173 175 return actions; … … 175 177 176 178 private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, DA.Task curTask) { 177 var assignedResourceIds = curTask.AssignedResources.Select(ar => ar.Resource.ResourceId);178 var slaveResourceIds = dao.GetParentResources DA(slaveId).Select(x => x.ResourceId);179 var assignedResourceIds = dao.GetAssignedResourcesIdsDA(curTask.TaskId); 180 var slaveResourceIds = dao.GetParentResourcesIDsDA(slaveId).ToArray(); 179 181 return assignedResourceIds.Any(r => slaveResourceIds.Contains(r)); 180 182 } 181 183 182 184 private bool SlaveIsAllowedToCalculate(Guid slaveId) { 183 var parentResources = dao.GetParentResourcesDA(slaveId);185 var downtimes = dao.GetNumberOfDowntimesFromParentResourcesAtCurrentTimeDA(slaveId, DA.DowntimeType.Offline); 184 186 // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also 185 return parentResources.All(r => r.Downtimes.Where(d => d.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)).Count()== 0);187 return downtimes.All(x => x == 0); 186 188 } 187 189 188 190 private bool ShutdownSlaveComputer(Guid slaveId) { 189 var parentResources = dao.GetParentResourcesDA(slaveId); 190 191 return parentResources.Any(r => r.Downtimes.Where(d => d.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)).Count() != 0); 191 var downtimes = dao.GetNumberOfDowntimesFromParentResourcesAtCurrentTimeDA(slaveId, DA.DowntimeType.Shutdown); 192 return downtimes.Any(x => x != 0); 192 193 } 193 194 }
Note: See TracChangeset
for help on using the changeset viewer.