Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
09/28/12 10:49:02 (12 years ago)
Author:
jkarder
Message:

#1712: added user queue used to schedule tasks

Location:
branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/Convert.cs

    r8687 r8707  
    531531    public static DT.UserPriority ToDto(DB.UserPriority source) {
    532532      if (source == null) return null;
    533       return new DT.UserPriority() { UserId = source.UserId };
     533      return new DT.UserPriority() { Id = source.UserId, DateEnqueued = source.DateEnqueued };
    534534    }
    535535    public static DB.UserPriority ToEntity(DT.UserPriority source) {
     
    540540    public static void ToEntity(DT.UserPriority source, DB.UserPriority target) {
    541541      if ((source != null) && (target != null)) {
    542         target.UserId = source.UserId;
     542        target.UserId = source.Id;
     543        target.DateEnqueued = source.DateEnqueued;
    543544      }
    544545    }
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/DataTransfer/UserPriority.cs

    r8687 r8707  
    2828  public class UserPriority : HiveItem {
    2929    [DataMember]
    30     public Guid UserId { get; set; }
     30    public DateTime DateEnqueued { get; set; }
    3131  }
    3232}
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/HiveDao.cs

    r8687 r8707  
    259259        var entity = DT.Convert.ToEntity(dto);
    260260        db.Jobs.InsertOnSubmit(entity);
     261        if (!db.Jobs.Any(x => x.OwnerUserId == dto.OwnerUserId))
     262          EnqueueUserPriority(new DT.UserPriority { Id = dto.OwnerUserId, DateEnqueued = dto.DateCreated });
    261263        db.SubmitChanges();
    262264        return entity.JobId;
     
    863865      }
    864866    }
     867
     868    public void EnqueueUserPriority(DT.UserPriority dto) {
     869      using (var db = CreateContext()) {
     870        var entity = db.UserPriorities.FirstOrDefault(x => x.UserId == dto.Id);
     871        if (entity == null) db.UserPriorities.InsertOnSubmit(DT.Convert.ToEntity(dto));
     872        else DT.Convert.ToEntity(dto, entity);
     873        db.SubmitChanges();
     874      }
     875    }
    865876    #endregion
    866877
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/Interfaces/IHiveDao.cs

    r8687 r8707  
    154154    #region UserPriority Methods
    155155    IEnumerable<DT.UserPriority> GetUserPriorities(Expression<Func<UserPriority, bool>> predicate);
     156    void EnqueueUserPriority(DT.UserPriority userPriority);
    156157    #endregion
    157158  }
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs

    r8687 r8707  
    2323using System.Collections.Generic;
    2424using System.Linq;
     25using System.Threading;
    2526using HeuristicLab.Services.Hive.DataTransfer;
    2627using DA = HeuristicLab.Services.Hive.DataAccess;
     
    2829namespace HeuristicLab.Services.Hive {
    2930  public class HeartbeatManager {
     31    private const string MutexName = "HiveTaskSchedulingMutex";
     32
    3033    private IHiveDao dao {
    3134      get { return ServiceLocator.Instance.HiveDao; }
     
    6366        // assign new task
    6467        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
    65           var availableJobs = taskScheduler.Schedule(dao.GetWaitingTasks(slave));
    66           if (availableJobs.Any()) {
    67             var job = availableJobs.First();
    68             if (AssignJob(slave, job))
    69               actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, job.Id));
     68          bool mutexAquired = false;
     69          var mutex = new Mutex(false, MutexName);
     70          try {
     71            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
     72            if (!mutexAquired)
     73              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
     74            else {
     75              var availableJobs = taskScheduler.Schedule(dao.GetWaitingTasks(slave));
     76              if (availableJobs.Any()) {
     77                var job = availableJobs.First();
     78                if (AssignJob(slave, job))
     79                  actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, job.Id));
     80              }
     81            }
     82          }
     83          catch (AbandonedMutexException) {
     84            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
     85          }
     86          finally {
     87            if (mutexAquired) mutex.ReleaseMutex();
    7088          }
    7189        }
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/Properties/Settings.Designer.cs

    r7857 r8707  
    22// <auto-generated>
    33//     This code was generated by a tool.
    4 //     Runtime Version:4.0.30319.269
     4//     Runtime Version:4.0.30319.17929
    55//
    66//     Changes to this file may cause incorrect behavior and will be lost if
     
    7777            }
    7878        }
     79       
     80        [global::System.Configuration.ApplicationScopedSettingAttribute()]
     81        [global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
     82        [global::System.Configuration.DefaultSettingValueAttribute("00:00:20")]
     83        public global::System.TimeSpan SchedulingPatience {
     84            get {
     85                return ((global::System.TimeSpan)(this["SchedulingPatience"]));
     86            }
     87        }
    7988    }
    8089}
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/Properties/Settings.settings

    r7857 r8707  
    2121      <Value Profile="(Default)">3.00:00:00</Value>
    2222    </Setting>
     23    <Setting Name="SchedulingPatience" Type="System.TimeSpan" Scope="Application">
     24      <Value Profile="(Default)">00:00:20</Value>
     25    </Setting>
    2326  </Settings>
    2427</SettingsFile>
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/Scheduler/RoundRobinTaskScheduler.cs

    r8687 r8707  
    2020#endregion
    2121
     22using System;
    2223using System.Collections.Generic;
    2324using System.Linq;
     
    3132
    3233    public IEnumerable<Task> Schedule(IEnumerable<Task> tasks, int count = 1) {
    33       var userPriorities = dao.GetUserPriorities(x => true).ToArray();
     34      if (!tasks.Any()) return Enumerable.Empty<Task>();
     35
     36      var userPriorities = dao.GetUserPriorities(x => true).OrderBy(x => x.DateEnqueued).ToArray();
    3437
    3538      var jobs = new List<Job>();
    3639      foreach (var userPriority in userPriorities) {
    3740        var closureUserPriority = userPriority;
    38         jobs.AddRange(dao.GetJobs(x => x.OwnerUserId == closureUserPriority.UserId));
     41        jobs.AddRange(dao.GetJobs(x => x.OwnerUserId == closureUserPriority.Id));
    3942      }
    4043
     
    4346        job => job.Id,
    4447        (task, job) => new { Task = task, Job = job })
    45         .ToDictionary(k => k, v => false);
     48        .OrderByDescending(x => x.Task.Priority)
     49        .ToList();
    4650
    4751      var scheduledTasks = new List<Task>();
    4852      int priorityIndex = 0;
     53
    4954      if (count == 0 || count > taskJobRelations.Count) count = taskJobRelations.Count;
     55
    5056      for (int i = 0; i < count; i++) {
    51         var defaultEntry = taskJobRelations.First(x => !x.Value); // search first task which is not included yet
    52         var priorityEntries = taskJobRelations.Where(x => !x.Value && x.Key.Job.OwnerUserId == userPriorities[priorityIndex].UserId).ToArray(); // search for tasks with desired user priority
    53         while (!priorityEntries.Any() && priorityIndex < priorityEntries.Length)
    54           priorityEntries = taskJobRelations.Where(x => !x.Value && x.Key.Job.OwnerUserId == userPriorities[++priorityIndex].UserId).ToArray();
     57        var defaultEntry = taskJobRelations.First(); // search first task which is not included yet
     58        var priorityEntries = taskJobRelations.Where(x => x.Job.OwnerUserId == userPriorities[priorityIndex].Id).ToArray(); // search for tasks with desired user priority
     59        while (!priorityEntries.Any() && ++priorityIndex < userPriorities.Length)
     60          priorityEntries = taskJobRelations.Where(x => x.Job.OwnerUserId == userPriorities[priorityIndex].Id).ToArray();
    5561        if (priorityEntries.Any()) { // tasks with desired user priority found
    56           var priorityEntry = priorityEntries.OrderByDescending(x => x.Key.Task.Priority).ThenBy(x => x.Key.Task.DateCreated).First();
    57           if (defaultEntry.Key.Task.Priority <= priorityEntry.Key.Task.Priority) {
    58             taskJobRelations[priorityEntry.Key] = true;
    59             scheduledTasks.Add(priorityEntry.Key.Task);
     62          var priorityEntry = priorityEntries.OrderByDescending(x => x.Task.Priority).ThenBy(x => x.Job.DateCreated).First();
     63          if (defaultEntry.Task.Priority <= priorityEntry.Task.Priority) {
     64            taskJobRelations.Remove(priorityEntry);
     65            scheduledTasks.Add(priorityEntry.Task);
    6066            priorityIndex++;
    61           } else { // there are other tasks with higher priority
    62             taskJobRelations[defaultEntry.Key] = true;
    63             scheduledTasks.Add(defaultEntry.Key.Task);
     67          } else { // there are other tasks with higher priorities
     68            taskJobRelations.Remove(defaultEntry);
     69            scheduledTasks.Add(defaultEntry.Task);
    6470          }
    6571        } else {
    66           taskJobRelations[defaultEntry.Key] = true;
    67           scheduledTasks.Add(defaultEntry.Key.Task);
     72          taskJobRelations.Remove(defaultEntry);
     73          scheduledTasks.Add(defaultEntry.Task);
    6874        }
    6975      }
    7076
    71       // dequeue and enqueue priorities [0, priorityIndex), i.e. {4,3,6,2,5,1} and priorityIndex = 3 -> {2,5,1,4,3,6}
    72       // i don't know how to solve this yet ... (race conditions, locking?)
     77      // requeue user priorities
     78      if (priorityIndex < userPriorities.Length)
     79        for (int i = 0; i < priorityIndex; i++) {
     80          userPriorities[i].DateEnqueued = DateTime.Now;
     81          dao.EnqueueUserPriority(userPriorities[i]);
     82        }
    7383
    7484      return scheduledTasks;
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/app.config

    r7857 r8707  
    2626                <value>3.00:00:00</value>
    2727            </setting>
     28            <setting name="SchedulingPatience" serializeAs="String">
     29                <value>00:00:20</value>
     30            </setting>
    2831        </HeuristicLab.Services.Hive.Properties.Settings>
    2932    </applicationSettings>
Note: See TracChangeset for help on using the changeset viewer.