Changeset 8707


Ignore:
Timestamp:
09/28/12 10:49:02 (7 years ago)
Author:
jkarder
Message:

#1712: added user queue used to schedule tasks

Location:
branches/HiveTaskScheduler
Files:
13 edited

Legend:

Unmodified
Added
Removed
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive.DataAccess/3.3/HiveDataContext.dbml

    r8687 r8707  
    206206    <Type Name="UserPriority">
    207207      <Column Name="UserId" Type="System.Guid" DbType="UniqueIdentifier NOT NULL" IsPrimaryKey="true" CanBeNull="false" />
     208      <Column Name="DateEnqueued" Type="System.DateTime" DbType="DateTime" CanBeNull="false" />
    208209    </Type>
    209210  </Table>
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive.DataAccess/3.3/HiveDataContext.dbml.layout

    r8687 r8707  
    241241      </nodes>
    242242    </associationConnector>
    243     <classShape Id="f9e8867f-fd15-4a72-8ca4-4f02cd3f141f" absoluteBounds="4.125, 5.5, 2, 1.0016910807291666">
     243    <classShape Id="f9e8867f-fd15-4a72-8ca4-4f02cd3f141f" absoluteBounds="4.125, 5.5, 2, 1.1939925130208327">
    244244      <DataClassMoniker Name="/HiveDataContext/UserPriority" />
    245245      <nestedChildShapes>
    246         <elementListCompartment Id="ee41f516-7d9c-4a1d-a1b8-bbe00a6ffea8" absoluteBounds="4.14, 5.96, 1.9700000000000002, 0.44169108072916663" name="DataPropertiesCompartment" titleTextColor="Black" itemTextColor="Black" />
     246        <elementListCompartment Id="ee41f516-7d9c-4a1d-a1b8-bbe00a6ffea8" absoluteBounds="4.14, 5.96, 1.9700000000000002, 0.63399251302083326" name="DataPropertiesCompartment" titleTextColor="Black" itemTextColor="Black" />
    247247      </nestedChildShapes>
    248248    </classShape>
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive.DataAccess/3.3/HiveDataContext.designer.cs

    r8687 r8707  
    33// <auto-generated>
    44//     This code was generated by a tool.
    5 //     Runtime Version:4.0.30319.269
     5//     Runtime Version:4.0.30319.17929
    66//
    77//     Changes to this file may cause incorrect behavior and will be lost if
     
    44334433    private System.Guid _UserId;
    44344434   
     4435    private System.DateTime _DateEnqueued;
     4436   
    44354437    #region Extensibility Method Definitions
    44364438    partial void OnLoaded();
     
    44394441    partial void OnUserIdChanging(System.Guid value);
    44404442    partial void OnUserIdChanged();
     4443    partial void OnDateEnqueuedChanging(System.DateTime value);
     4444    partial void OnDateEnqueuedChanged();
    44414445    #endregion
    44424446   
     
    44664470    }
    44674471   
     4472    [global::System.Data.Linq.Mapping.ColumnAttribute(Storage="_DateEnqueued", DbType="DateTime")]
     4473    public System.DateTime DateEnqueued
     4474    {
     4475      get
     4476      {
     4477        return this._DateEnqueued;
     4478      }
     4479      set
     4480      {
     4481        if ((this._DateEnqueued != value))
     4482        {
     4483          this.OnDateEnqueuedChanging(value);
     4484          this.SendPropertyChanging();
     4485          this._DateEnqueued = value;
     4486          this.SendPropertyChanged("DateEnqueued");
     4487          this.OnDateEnqueuedChanged();
     4488        }
     4489      }
     4490    }
     4491   
    44684492    public event PropertyChangingEventHandler PropertyChanging;
    44694493   
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive.DataAccess/3.3/SQL Scripts/Initialize Hive Database.sql

    r8687 r8707  
    123123CREATE TABLE [UserPriority](
    124124  [UserId] UniqueIdentifier NOT NULL,
     125  [DateEnqueued] DateTime NOT NULL,
    125126  CONSTRAINT [PK_UserPriority] PRIMARY KEY ([UserId])
    126127  )
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/Convert.cs

    r8687 r8707  
    531531    public static DT.UserPriority ToDto(DB.UserPriority source) {
    532532      if (source == null) return null;
    533       return new DT.UserPriority() { UserId = source.UserId };
     533      return new DT.UserPriority() { Id = source.UserId, DateEnqueued = source.DateEnqueued };
    534534    }
    535535    public static DB.UserPriority ToEntity(DT.UserPriority source) {
     
    540540    public static void ToEntity(DT.UserPriority source, DB.UserPriority target) {
    541541      if ((source != null) && (target != null)) {
    542         target.UserId = source.UserId;
     542        target.UserId = source.Id;
     543        target.DateEnqueued = source.DateEnqueued;
    543544      }
    544545    }
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/DataTransfer/UserPriority.cs

    r8687 r8707  
    2828  public class UserPriority : HiveItem {
    2929    [DataMember]
    30     public Guid UserId { get; set; }
     30    public DateTime DateEnqueued { get; set; }
    3131  }
    3232}
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/HiveDao.cs

    r8687 r8707  
    259259        var entity = DT.Convert.ToEntity(dto);
    260260        db.Jobs.InsertOnSubmit(entity);
     261        if (!db.Jobs.Any(x => x.OwnerUserId == dto.OwnerUserId))
     262          EnqueueUserPriority(new DT.UserPriority { Id = dto.OwnerUserId, DateEnqueued = dto.DateCreated });
    261263        db.SubmitChanges();
    262264        return entity.JobId;
     
    863865      }
    864866    }
     867
     868    public void EnqueueUserPriority(DT.UserPriority dto) {
     869      using (var db = CreateContext()) {
     870        var entity = db.UserPriorities.FirstOrDefault(x => x.UserId == dto.Id);
     871        if (entity == null) db.UserPriorities.InsertOnSubmit(DT.Convert.ToEntity(dto));
     872        else DT.Convert.ToEntity(dto, entity);
     873        db.SubmitChanges();
     874      }
     875    }
    865876    #endregion
    866877
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/Interfaces/IHiveDao.cs

    r8687 r8707  
    154154    #region UserPriority Methods
    155155    IEnumerable<DT.UserPriority> GetUserPriorities(Expression<Func<UserPriority, bool>> predicate);
     156    void EnqueueUserPriority(DT.UserPriority userPriority);
    156157    #endregion
    157158  }
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs

    r8687 r8707  
    2323using System.Collections.Generic;
    2424using System.Linq;
     25using System.Threading;
    2526using HeuristicLab.Services.Hive.DataTransfer;
    2627using DA = HeuristicLab.Services.Hive.DataAccess;
     
    2829namespace HeuristicLab.Services.Hive {
    2930  public class HeartbeatManager {
     31    private const string MutexName = "HiveTaskSchedulingMutex";
     32
    3033    private IHiveDao dao {
    3134      get { return ServiceLocator.Instance.HiveDao; }
     
    6366        // assign new task
    6467        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
    65           var availableJobs = taskScheduler.Schedule(dao.GetWaitingTasks(slave));
    66           if (availableJobs.Any()) {
    67             var job = availableJobs.First();
    68             if (AssignJob(slave, job))
    69               actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, job.Id));
     68          bool mutexAquired = false;
     69          var mutex = new Mutex(false, MutexName);
     70          try {
     71            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
     72            if (!mutexAquired)
     73              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
     74            else {
     75              var availableJobs = taskScheduler.Schedule(dao.GetWaitingTasks(slave));
     76              if (availableJobs.Any()) {
     77                var job = availableJobs.First();
     78                if (AssignJob(slave, job))
     79                  actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, job.Id));
     80              }
     81            }
     82          }
     83          catch (AbandonedMutexException) {
     84            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
     85          }
     86          finally {
     87            if (mutexAquired) mutex.ReleaseMutex();
    7088          }
    7189        }
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/Properties/Settings.Designer.cs

    r7857 r8707  
    22// <auto-generated>
    33//     This code was generated by a tool.
    4 //     Runtime Version:4.0.30319.269
     4//     Runtime Version:4.0.30319.17929
    55//
    66//     Changes to this file may cause incorrect behavior and will be lost if
     
    7777            }
    7878        }
     79       
     80        [global::System.Configuration.ApplicationScopedSettingAttribute()]
     81        [global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
     82        [global::System.Configuration.DefaultSettingValueAttribute("00:00:20")]
     83        public global::System.TimeSpan SchedulingPatience {
     84            get {
     85                return ((global::System.TimeSpan)(this["SchedulingPatience"]));
     86            }
     87        }
    7988    }
    8089}
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/Properties/Settings.settings

    r7857 r8707  
    2121      <Value Profile="(Default)">3.00:00:00</Value>
    2222    </Setting>
     23    <Setting Name="SchedulingPatience" Type="System.TimeSpan" Scope="Application">
     24      <Value Profile="(Default)">00:00:20</Value>
     25    </Setting>
    2326  </Settings>
    2427</SettingsFile>
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/Scheduler/RoundRobinTaskScheduler.cs

    r8687 r8707  
    2020#endregion
    2121
     22using System;
    2223using System.Collections.Generic;
    2324using System.Linq;
     
    3132
    3233    public IEnumerable<Task> Schedule(IEnumerable<Task> tasks, int count = 1) {
    33       var userPriorities = dao.GetUserPriorities(x => true).ToArray();
     34      if (!tasks.Any()) return Enumerable.Empty<Task>();
     35
     36      var userPriorities = dao.GetUserPriorities(x => true).OrderBy(x => x.DateEnqueued).ToArray();
    3437
    3538      var jobs = new List<Job>();
    3639      foreach (var userPriority in userPriorities) {
    3740        var closureUserPriority = userPriority;
    38         jobs.AddRange(dao.GetJobs(x => x.OwnerUserId == closureUserPriority.UserId));
     41        jobs.AddRange(dao.GetJobs(x => x.OwnerUserId == closureUserPriority.Id));
    3942      }
    4043
     
    4346        job => job.Id,
    4447        (task, job) => new { Task = task, Job = job })
    45         .ToDictionary(k => k, v => false);
     48        .OrderByDescending(x => x.Task.Priority)
     49        .ToList();
    4650
    4751      var scheduledTasks = new List<Task>();
    4852      int priorityIndex = 0;
     53
    4954      if (count == 0 || count > taskJobRelations.Count) count = taskJobRelations.Count;
     55
    5056      for (int i = 0; i < count; i++) {
    51         var defaultEntry = taskJobRelations.First(x => !x.Value); // search first task which is not included yet
    52         var priorityEntries = taskJobRelations.Where(x => !x.Value && x.Key.Job.OwnerUserId == userPriorities[priorityIndex].UserId).ToArray(); // search for tasks with desired user priority
    53         while (!priorityEntries.Any() && priorityIndex < priorityEntries.Length)
    54           priorityEntries = taskJobRelations.Where(x => !x.Value && x.Key.Job.OwnerUserId == userPriorities[++priorityIndex].UserId).ToArray();
     57        var defaultEntry = taskJobRelations.First(); // search first task which is not included yet
     58        var priorityEntries = taskJobRelations.Where(x => x.Job.OwnerUserId == userPriorities[priorityIndex].Id).ToArray(); // search for tasks with desired user priority
     59        while (!priorityEntries.Any() && ++priorityIndex < userPriorities.Length)
     60          priorityEntries = taskJobRelations.Where(x => x.Job.OwnerUserId == userPriorities[priorityIndex].Id).ToArray();
    5561        if (priorityEntries.Any()) { // tasks with desired user priority found
    56           var priorityEntry = priorityEntries.OrderByDescending(x => x.Key.Task.Priority).ThenBy(x => x.Key.Task.DateCreated).First();
    57           if (defaultEntry.Key.Task.Priority <= priorityEntry.Key.Task.Priority) {
    58             taskJobRelations[priorityEntry.Key] = true;
    59             scheduledTasks.Add(priorityEntry.Key.Task);
     62          var priorityEntry = priorityEntries.OrderByDescending(x => x.Task.Priority).ThenBy(x => x.Job.DateCreated).First();
     63          if (defaultEntry.Task.Priority <= priorityEntry.Task.Priority) {
     64            taskJobRelations.Remove(priorityEntry);
     65            scheduledTasks.Add(priorityEntry.Task);
    6066            priorityIndex++;
    61           } else { // there are other tasks with higher priority
    62             taskJobRelations[defaultEntry.Key] = true;
    63             scheduledTasks.Add(defaultEntry.Key.Task);
     67          } else { // there are other tasks with higher priorities
     68            taskJobRelations.Remove(defaultEntry);
     69            scheduledTasks.Add(defaultEntry.Task);
    6470          }
    6571        } else {
    66           taskJobRelations[defaultEntry.Key] = true;
    67           scheduledTasks.Add(defaultEntry.Key.Task);
     72          taskJobRelations.Remove(defaultEntry);
     73          scheduledTasks.Add(defaultEntry.Task);
    6874        }
    6975      }
    7076
    71       // dequeue and enqueue priorities [0, priorityIndex), i.e. {4,3,6,2,5,1} and priorityIndex = 3 -> {2,5,1,4,3,6}
    72       // i don't know how to solve this yet ... (race conditions, locking?)
     77      // requeue user priorities
     78      if (priorityIndex < userPriorities.Length)
     79        for (int i = 0; i < priorityIndex; i++) {
     80          userPriorities[i].DateEnqueued = DateTime.Now;
     81          dao.EnqueueUserPriority(userPriorities[i]);
     82        }
    7383
    7484      return scheduledTasks;
  • branches/HiveTaskScheduler/HeuristicLab.Services.Hive/3.3/app.config

    r7857 r8707  
    2626                <value>3.00:00:00</value>
    2727            </setting>
     28            <setting name="SchedulingPatience" serializeAs="String">
     29                <value>00:00:20</value>
     30            </setting>
    2831        </HeuristicLab.Services.Hive.Properties.Settings>
    2932    </applicationSettings>
Note: See TracChangeset for help on using the changeset viewer.