Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
01/07/13 22:00:04 (12 years ago)
Author:
ascheibe
Message:

#1712 reintegrated Hive Scheduler branch and made further performance improvements

Location:
trunk/sources/HeuristicLab.Services.Hive
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Services.Hive

  • trunk/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs

    r8957 r9123  
    2323using System.Collections.Generic;
    2424using System.Linq;
     25using System.Threading;
    2526using HeuristicLab.Services.Hive.DataTransfer;
    2627using DA = HeuristicLab.Services.Hive.DataAccess;
     
    2829namespace HeuristicLab.Services.Hive {
    2930  public class HeartbeatManager {
     31    private const string MutexName = "HiveTaskSchedulingMutex";
     32
    3033    private IHiveDao dao {
    3134      get { return ServiceLocator.Instance.HiveDao; }
    3235    }
    33     private IAuthorizationManager auth {
    34       get { return ServiceLocator.Instance.AuthorizationManager; }
     36    private ITaskScheduler taskScheduler {
     37      get { return ServiceLocator.Instance.TaskScheduler; }
    3538    }
    3639
     
    6669        // assign new task
    6770        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
    68           var availableJobs = dao.GetWaitingTasks(slave, 1);
    69           if (availableJobs.Count() > 0) {
    70             var job = availableJobs.First();
    71             if (AssignJob(slave, job))
    72               actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, job.Id));
     71          bool mutexAquired = false;
     72          var mutex = new Mutex(false, MutexName);
     73          try {
     74            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
     75            if (!mutexAquired)
     76              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
     77            else {
     78              var availableTasks = taskScheduler.Schedule(dao.GetWaitingTasks(slave));
     79              if (availableTasks.Any()) {
     80                var task = availableTasks.First();
     81                AssignJob(slave, task.TaskId);
     82                actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
     83              }
     84            }
     85          }
     86          catch (AbandonedMutexException) {
     87            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
     88          }
     89          catch (Exception ex) {
     90            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());
     91          }
     92          finally {
     93            if (mutexAquired) mutex.ReleaseMutex();
    7394          }
    7495        }
     
    7798    }
    7899
    79     // returns true if assignment was successful
    80     private bool AssignJob(Slave slave, Task task) {
    81       // load task again and check if it is still available (this is an attempt to reduce the race condition which causes multiple heartbeats to get the same task assigned)
    82       if (dao.GetTask(task.Id).State != TaskState.Waiting) return false;
    83 
    84       task = dao.UpdateTaskState(task.Id, DataAccess.TaskState.Transferring, slave.Id, null, null);
     100    private void AssignJob(Slave slave, Guid taskId) {
     101      var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.Id, null, null);
    85102
    86103      // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
    87104      task.LastHeartbeat = DateTime.Now;
    88105      dao.UpdateTask(task);
    89       return true;
    90106    }
    91107
Note: See TracChangeset for help on using the changeset viewer.