Changeset 17574 for trunk/HeuristicLab.Services.Hive/3.3/Scheduler
- Timestamp:
- 05/29/20 13:28:25 (5 years ago)
- Location:
- trunk/HeuristicLab.Services.Hive/3.3/Scheduler
- Files:
-
- 1 added
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified trunk/HeuristicLab.Services.Hive/3.3/Scheduler/RoundRobinTaskScheduler.cs ΒΆ
r17180 r17574 23 23 using System.Collections.Generic; 24 24 using System.Linq; 25 using HeuristicLab.Services.Hive.DataAccess.Interfaces;26 25 using DA = HeuristicLab.Services.Hive.DataAccess; 27 26 28 27 namespace HeuristicLab.Services.Hive { 29 public class RoundRobinTaskScheduler : ITaskScheduler { 30 private IPersistenceManager PersistenceManager { 31 get { return ServiceLocator.Instance.PersistenceManager; } 28 public class RoundRobinTaskScheduler : TaskScheduler { 29 private class TaskPriorityResult { 30 public Guid TaskId { get; set; } 31 public Guid OwnerUserId { get; set; } 32 32 } 33 33 34 p ublic IEnumerable<TaskInfoForScheduler> Schedule(IEnumerable<TaskInfoForScheduler> tasks, int count = 1) {35 if (!tasks.Any()) return Enumerable.Empty<TaskInfoForScheduler>();34 protected override IReadOnlyList<Guid> ScheduleInternal(DA.Slave slave, int count) { 35 var pm = PersistenceManager; 36 36 37 var pm = PersistenceManager; 38 var userPriorityDao = pm.UserPriorityDao; 39 var jobDao = pm.JobDao; 37 var result = pm.DataContext.ExecuteQuery<TaskPriorityResult>( 38 GetHighestPriorityWaitingTasksQuery, slave.ResourceId, count, slave.FreeCores, slave.FreeMemory).ToList(); 40 39 41 var userPriorities = pm.UseTransaction(() => userPriorityDao.GetAll() 42 .OrderBy(x => x.DateEnqueued) 43 .ToArray() 44 ); 40 foreach (var row in result) { 41 pm.DataContext.ExecuteCommand("UPDATE UserPriority SET DateEnqueued = SYSDATETIME() WHERE UserId = {0}", row.OwnerUserId); 42 } 45 43 46 var userIds = userPriorities.Select(x => x.UserId).ToList(); 47 var jobs = pm.UseTransaction(() => { 48 return jobDao.GetAll() 49 .Where(x => userIds.Contains(x.OwnerUserId)) 50 .Select(x => new { 51 Id = x.JobId, 52 DateCreated = x.DateCreated, 53 OwnerUserId = x.OwnerUserId 54 }) 55 .ToList(); 56 }); 57 58 var taskJobRelations = tasks.Join(jobs, 59 task => task.JobId, 60 job => job.Id, 61 (task, job) => new { Task = task, JobInfo = job }) 62 .OrderByDescending(x => x.Task.Priority) 63 .ToList(); 64 65 var scheduledTasks = new List<TaskInfoForScheduler>(); 66 int priorityIndex = 0; 67 68 if (count == 0 || count > taskJobRelations.Count) count = taskJobRelations.Count; 69 70 for (int i = 0; i < count; i++) { 71 var defaultEntry = taskJobRelations.First(); // search first task which is not included yet 72 var priorityEntries = taskJobRelations.Where(x => x.JobInfo.OwnerUserId == userPriorities[priorityIndex].UserId).ToArray(); // search for tasks with desired user priority 73 while (!priorityEntries.Any() && priorityIndex < userPriorities.Length - 1) { 74 priorityIndex++; 75 priorityEntries = taskJobRelations.Where(x => x.JobInfo.OwnerUserId == userPriorities[priorityIndex].UserId).ToArray(); 76 } 77 if (priorityEntries.Any()) { // tasks with desired user priority found 78 var priorityEntry = priorityEntries.OrderByDescending(x => x.Task.Priority).ThenBy(x => x.JobInfo.DateCreated).First(); 79 if (defaultEntry.Task.Priority <= priorityEntry.Task.Priority) { 80 taskJobRelations.Remove(priorityEntry); 81 scheduledTasks.Add(priorityEntry.Task); 82 UpdateUserPriority(pm, userPriorities[priorityIndex]); 83 priorityIndex++; 84 } else { // there are other tasks with higher priorities 85 taskJobRelations.Remove(defaultEntry); 86 scheduledTasks.Add(defaultEntry.Task); 87 } 88 } else { 89 taskJobRelations.Remove(defaultEntry); 90 scheduledTasks.Add(defaultEntry.Task); 91 } 92 if (priorityIndex >= (userPriorities.Length - 1)) priorityIndex = 0; 93 } 94 return scheduledTasks; 95 44 return result.Select(x => x.TaskId).ToArray(); 96 45 } 97 46 98 private void UpdateUserPriority(IPersistenceManager pm, DA.UserPriority up) { 99 pm.UseTransaction(() => { 100 up.DateEnqueued = DateTime.Now; 101 pm.SubmitChanges(); 102 }); 103 } 47 #region Query Strings 48 private string GetHighestPriorityWaitingTasksQuery = @" 49 WITH rbranch AS( 50 SELECT ResourceId, ParentResourceId 51 FROM [Resource] 52 WHERE ResourceId = {0} 53 UNION ALL 54 SELECT r.ResourceId, r.ParentResourceId 55 FROM [Resource] r 56 JOIN rbranch rb ON rb.ParentResourceId = r.ResourceId 57 ) 58 SELECT TOP ({1}) t.TaskId, j.OwnerUserId 59 FROM Task t 60 JOIN Job j on t.JobId = j.JobId 61 JOIN AssignedJobResource ajr on j.JobId = ajr.JobId 62 JOIN rbranch on ajr.ResourceId = rbranch.ResourceId 63 JOIN UserPriority u on j.OwnerUserId = u.UserId 64 WHERE NOT (t.IsParentTask = 1 AND t.FinishWhenChildJobsFinished = 1) 65 AND t.TaskState = 'Waiting' 66 AND t.CoresNeeded <= {2} 67 AND t.MemoryNeeded <= {3} 68 AND j.JobState = 'Online' 69 ORDER BY t.Priority DESC, u.DateEnqueued ASC, j.DateCreated ASC"; 70 #endregion 104 71 } 105 72 }
Note: See TracChangeset
for help on using the changeset viewer.