Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
08/13/15 15:22:51 (9 years ago)
Author:
ascheibe
Message:

#2388

  • prevent disposing of hive data context
  • more cleanups
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/HiveStatistics/sources/HeuristicLab.Services.Hive/3.3/Manager/NewHeartbeatManager.cs

    r12773 r12858  
    4747    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
    4848      List<MessageContainer> actions = new List<MessageContainer>();
    49       using (var pm = PersistenceManager) {
    50         var slaveDao = pm.SlaveDao;
    51         var taskDao = pm.TaskDao;
    52         var slave = pm.UseTransaction(() => slaveDao.GetById(heartbeat.SlaveId));
    53         if (slave == null) {
    54           actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
    55         } else {
    56           if (heartbeat.HbInterval != slave.HbInterval) {
    57             actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
    58           }
    59           if (slaveDao.SlaveHasToShutdownComputer(slave.ResourceId)) {
    60             actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
    61           }
    62           // update slave data 
    63           slave.FreeCores = heartbeat.FreeCores;
    64           slave.FreeMemory = heartbeat.FreeMemory;
    65           slave.CpuUtilization = heartbeat.CpuUtilization;
    66           slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0)
    67             ? DA.SlaveState.Calculating
    68             : DA.SlaveState.Idle;
    69           slave.LastHeartbeat = DateTime.Now;
    70           pm.UseTransaction(() => {
    71             slave.IsAllowedToCalculate = slaveDao.SlaveIsAllowedToCalculate(slave.ResourceId);
    72             pm.SubmitChanges();
    73           });
    74 
    75           // update task data
    76           actions.AddRange(UpdateTasks(pm, heartbeat, slave.IsAllowedToCalculate));
    77 
    78           // assign new task
    79           if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
    80             bool mutexAquired = false;
    81             var mutex = new Mutex(false, MutexName);
    82             try {
    83               mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
    84               if (mutexAquired) {
    85                 var waitingTasks = pm.UseTransaction(() => taskDao.GetWaitingTasks(slave)
    86                     .Select(x => new TaskInfoForScheduler {
    87                       TaskId = x.TaskId,
    88                       JobId = x.JobId,
    89                       Priority = x.Priority
    90                     })
    91                     .ToList()
    92                 );
    93                 var availableTasks = TaskScheduler.Schedule(waitingTasks);
    94                 if (availableTasks.Any()) {
    95                   var task = availableTasks.First();
    96                   AssignTask(pm, slave, task.TaskId);
    97                   actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
    98                 }
    99               } else {
    100                 LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
     49      var pm = PersistenceManager;
     50      var slaveDao = pm.SlaveDao;
     51      var taskDao = pm.TaskDao;
     52      var slave = pm.UseTransaction(() => slaveDao.GetById(heartbeat.SlaveId));
     53      if (slave == null) {
     54        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
     55      } else {
     56        if (heartbeat.HbInterval != slave.HbInterval) {
     57          actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
     58        }
     59        if (slaveDao.SlaveHasToShutdownComputer(slave.ResourceId)) {
     60          actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
     61        }
     62        // update slave data 
     63        slave.FreeCores = heartbeat.FreeCores;
     64        slave.FreeMemory = heartbeat.FreeMemory;
     65        slave.CpuUtilization = heartbeat.CpuUtilization;
     66        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0)
     67          ? DA.SlaveState.Calculating
     68          : DA.SlaveState.Idle;
     69        slave.LastHeartbeat = DateTime.Now;
     70        pm.UseTransaction(() => {
     71          slave.IsAllowedToCalculate = slaveDao.SlaveIsAllowedToCalculate(slave.ResourceId);
     72          pm.SubmitChanges();
     73        });
     74
     75        // update task data
     76        actions.AddRange(UpdateTasks(pm, heartbeat, slave.IsAllowedToCalculate));
     77
     78        // assign new task
     79        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
     80          bool mutexAquired = false;
     81          var mutex = new Mutex(false, MutexName);
     82          try {
     83            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
     84            if (mutexAquired) {
     85              var waitingTasks = pm.UseTransaction(() => taskDao.GetWaitingTasks(slave)
     86                  .Select(x => new TaskInfoForScheduler {
     87                    TaskId = x.TaskId,
     88                    JobId = x.JobId,
     89                    Priority = x.Priority
     90                  })
     91                  .ToList()
     92              );
     93              var availableTasks = TaskScheduler.Schedule(waitingTasks).ToArray();
     94              if (availableTasks.Any()) {
     95                var task = availableTasks.First();
     96                AssignTask(pm, slave, task.TaskId);
     97                actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
    10198              }
     99            } else {
     100              LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
    102101            }
    103             catch (AbandonedMutexException) {
    104               LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
    105             }
    106             catch (Exception ex) {
    107               LogFactory.GetLogger(this.GetType().Namespace).Log(string.Format("HeartbeatManager threw an exception in ProcessHeartbeat: {0}", ex));
    108             }
    109             finally {
    110               if (mutexAquired) mutex.ReleaseMutex();
    111             }
     102          }
     103          catch (AbandonedMutexException) {
     104            LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
     105          }
     106          catch (Exception ex) {
     107            LogFactory.GetLogger(this.GetType().Namespace).Log(string.Format("HeartbeatManager threw an exception in ProcessHeartbeat: {0}", ex));
     108          }
     109          finally {
     110            if (mutexAquired) mutex.ReleaseMutex();
    112111          }
    113112        }
Note: See TracChangeset for help on using the changeset viewer.