Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/28/13 12:05:53 (11 years ago)
Author:
ascheibe
Message:

#2030 merged hive performance branch back into trunk

Location:
trunk/sources/HeuristicLab.Services.Hive
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Services.Hive

  • trunk/sources/HeuristicLab.Services.Hive/3.3

    • Property svn:ignore
      •  

        old new  
        22bin
        33obj
         4*.user
  • trunk/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs

    r9456 r9665  
    2424using System.Linq;
    2525using System.Threading;
    26 using HeuristicLab.Services.Hive.DataTransfer;
    27 using DA = HeuristicLab.Services.Hive.DataAccess;
     26using HeuristicLab.Services.Hive.DataAccess;
     27using Heartbeat = HeuristicLab.Services.Hive.DataTransfer.Heartbeat;
    2828
    2929namespace HeuristicLab.Services.Hive {
     
    3131    private const string MutexName = "HiveTaskSchedulingMutex";
    3232
    33     private IHiveDao dao {
    34       get { return ServiceLocator.Instance.HiveDao; }
     33    private IOptimizedHiveDao dao {
     34      get { return ServiceLocator.Instance.OptimizedHiveDao; }
    3535    }
    3636    private ITaskScheduler taskScheduler {
     
    4747    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
    4848      List<MessageContainer> actions = new List<MessageContainer>();
     49
    4950      Slave slave = null;
    50       slave = trans.UseTransaction(() => { return dao.GetSlave(heartbeat.SlaveId); });
    51 
     51      trans.UseTransaction(() => {
     52        slave = dao.GetSlaveById(heartbeat.SlaveId);
     53      });
    5254      if (slave == null) {
    5355        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
     
    5658          actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
    5759        }
    58         if (ShutdownSlaveComputer(slave.Id)) {
     60        if (dao.SlaveHasToShutdownComputer(slave.ResourceId)) {
    5961          actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
    6062        }
     
    6466        slave.FreeMemory = heartbeat.FreeMemory;
    6567        slave.CpuUtilization = heartbeat.CpuUtilization;
    66         slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.Id);
     68        slave.IsAllowedToCalculate = dao.SlaveIsAllowedToCalculate(slave.ResourceId);
    6769        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
    6870        slave.LastHeartbeat = DateTime.Now;
    6971
    70         trans.UseTransaction(() => { dao.UpdateSlave(slave); });
     72        trans.UseTransaction(() => {
     73          dao.UpdateSlave(slave);
     74        });
    7175
    7276        // update task data
     
    7882          var mutex = new Mutex(false, MutexName);
    7983          try {
     84
    8085            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
    8186            if (!mutexAquired)
    82               DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
     87              LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
    8388            else {
    84               IEnumerable<TaskInfoForScheduler> availableTasks = null;
    85               availableTasks = trans.UseTransaction(() => { return taskScheduler.Schedule(dao.GetWaitingTasks(slave)); });
    86               if (availableTasks.Any()) {
    87                 var task = availableTasks.First();
    88                 AssignJob(slave, task.TaskId);
    89                 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
    90               }
     89              trans.UseTransaction(() => {
     90                IEnumerable<TaskInfoForScheduler> availableTasks = null;
     91                availableTasks = taskScheduler.Schedule(dao.GetWaitingTasks(slave).ToArray());
     92                if (availableTasks.Any()) {
     93                  var task = availableTasks.First();
     94                  AssignTask(slave, task.TaskId);
     95                  actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
     96                }
     97              });
    9198            }
    9299          }
    93100          catch (AbandonedMutexException) {
    94             DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
     101            LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
    95102          }
    96103          catch (Exception ex) {
    97             DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());
     104            LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());
    98105          }
    99106          finally {
     
    105112    }
    106113
    107     private void AssignJob(Slave slave, Guid taskId) {
    108       trans.UseTransaction(() => {
    109         var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.Id, null, null);
     114    private void AssignTask(Slave slave, Guid taskId) {
     115      var task = dao.UpdateTaskState(taskId, TaskState.Transferring, slave.ResourceId, null, null);
    110116
    111         // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
    112         task.LastHeartbeat = DateTime.Now;
    113         dao.UpdateTask(task);
    114       });
     117      // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
     118      task.LastHeartbeat = DateTime.Now;
     119      dao.UpdateTask(task);
    115120    }
    116121
     
    130135        // process the jobProgresses
    131136        foreach (var jobProgress in heartbeat.JobProgress) {
    132           Task curTask = null;
    133           curTask = trans.UseTransaction(() => { return dao.GetTask(jobProgress.Key); });
     137          Tuple<Task, Guid?> taskWithLastStateLogSlaveId = null;
     138          trans.UseTransaction(() => {
     139            taskWithLastStateLogSlaveId = dao.GetTaskByIdAndLastStateLogSlaveId(jobProgress.Key);
     140          });
     141          var curTask = taskWithLastStateLogSlaveId != null ? taskWithLastStateLogSlaveId.Item1 : null;
    134142          if (curTask == null) {
    135143            // task does not exist in db
    136144            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key));
    137             DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
     145            LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
    138146          } else {
    139             if (curTask.CurrentStateLog.SlaveId == Guid.Empty || curTask.CurrentStateLog.SlaveId != heartbeat.SlaveId) {
     147            var slaveId = taskWithLastStateLogSlaveId.Item2;
     148            if (slaveId == Guid.Empty || slaveId != heartbeat.SlaveId) {
    140149              // assigned slave does not match heartbeat
    141               actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id));
    142               DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask);
    143             } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) {
     150              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
     151              LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask);
     152            } else if (!dao.TaskIsAllowedToBeCalculatedBySlave(curTask.TaskId, heartbeat.SlaveId)) {
    144153              // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
    145               actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id));
     154              actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
    146155            } else {
    147156              // save task execution time
    148               curTask.ExecutionTime = jobProgress.Value;
     157              curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds;
    149158              curTask.LastHeartbeat = DateTime.Now;
    150159
    151160              switch (curTask.Command) {
    152161                case Command.Stop:
    153                   actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.Id));
     162                  actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId));
    154163                  break;
    155164                case Command.Pause:
    156                   actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id));
     165                  actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
    157166                  break;
    158167                case Command.Abort:
    159                   actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id));
     168                  actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
    160169                  break;
    161170              }
    162               trans.UseTransaction(() => { dao.UpdateTask(curTask); });
     171              trans.UseTransaction(() => {
     172                dao.UpdateTask(curTask);
     173              });
    163174            }
    164175          }
     
    167178      return actions;
    168179    }
    169 
    170     private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) {
    171       return trans.UseTransaction(() => {
    172         var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id);
    173         var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id);
    174         return assignedResourceIds.Any(x => slaveResourceIds.Contains(x));
    175       });
    176     }
    177 
    178     private bool SlaveIsAllowedToCalculate(Guid slaveId) {
    179       // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also
    180       return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0); });
    181     }
    182 
    183     private bool ShutdownSlaveComputer(Guid slaveId) {
    184       return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).Any(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() != 0); });
    185     }
    186180  }
    187181}
Note: See TracChangeset for help on using the changeset viewer.