Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
04/18/13 16:35:14 (12 years ago)
Author:
pfleck
Message:

#2030
Activated Delayed Loading for binary data.
Added HiveOperationContext to store HiveDataContext for whole ServiceOperation duration.
Added HiveDao methods to query database objects, not DTOs.
Changed HartbeatManager to use only database objects from new queries.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs

    r9257 r9381  
    4747    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
    4848      List<MessageContainer> actions = new List<MessageContainer>();
    49       Slave slave = null;
    50       slave = trans.UseTransaction(() => { return dao.GetSlave(heartbeat.SlaveId); });
    5149
    52       if (slave == null) {
    53         actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
    54       } else {
    55         if (heartbeat.HbInterval != slave.HbInterval) {
    56           actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
     50      DA.Slave slave = null;
     51      trans.UseTransaction(() => {
     52        slave = dao.GetSlaveDA(heartbeat.SlaveId);
     53
     54        if (slave == null) {
     55          actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
     56        } else {
     57          if (heartbeat.HbInterval != slave.HbInterval) {
     58            actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
     59          }
     60          if (ShutdownSlaveComputer(slave.ResourceId)) {
     61            actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
     62          }
     63
     64          // update slave data
     65          slave.FreeCores = heartbeat.FreeCores;
     66          slave.FreeMemory = heartbeat.FreeMemory;
     67          slave.CpuUtilization = heartbeat.CpuUtilization;
     68          slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.ResourceId);
     69          slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? DA.SlaveState.Calculating : DA.SlaveState.Idle;
     70          slave.LastHeartbeat = DateTime.Now;
     71
     72          dao.UpdateSlaveDA(slave);
    5773        }
    58         if (ShutdownSlaveComputer(slave.Id)) {
    59           actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
    60         }
     74      });
    6175
    62         // update slave data
    63         slave.FreeCores = heartbeat.FreeCores;
    64         slave.FreeMemory = heartbeat.FreeMemory;
    65         slave.CpuUtilization = heartbeat.CpuUtilization;
    66         slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.Id);
    67         slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
    68         slave.LastHeartbeat = DateTime.Now;
     76      // update task data
     77      actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate));
    6978
    70         trans.UseTransaction(() => { dao.UpdateSlave(slave); });
     79      // assign new task
     80      if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
     81        bool mutexAquired = false;
     82        var mutex = new Mutex(false, MutexName);
     83        try {
    7184
    72         // update task data
    73         actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate));
    74 
    75         // assign new task
    76         if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
    77           bool mutexAquired = false;
    78           var mutex = new Mutex(false, MutexName);
    79           try {
    80             mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
    81             if (!mutexAquired)
    82               DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
    83             else {
     85          mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
     86          if (!mutexAquired)
     87            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
     88          else {
     89            trans.UseTransaction(() => {
    8490              IEnumerable<TaskInfoForScheduler> availableTasks = null;
    85               availableTasks = trans.UseTransaction(() => { return taskScheduler.Schedule(dao.GetWaitingTasks(slave)); });
     91              availableTasks = taskScheduler.Schedule(dao.GetWaitingTasksDA(slave));
    8692              if (availableTasks.Any()) {
    8793                var task = availableTasks.First();
     
    8995                actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
    9096              }
    91             }
     97            });
    9298          }
    93           catch (AbandonedMutexException) {
    94             DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
    95           }
    96           catch (Exception ex) {
    97             DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());
    98           }
    99           finally {
    100             if (mutexAquired) mutex.ReleaseMutex();
    101           }
     99        }
     100        catch (AbandonedMutexException) {
     101          DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
     102        }
     103        catch (Exception ex) {
     104          DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());
     105        }
     106        finally {
     107          if (mutexAquired) mutex.ReleaseMutex();
    102108        }
    103109      }
     
    105111    }
    106112
    107     private void AssignJob(Slave slave, Guid taskId) {
    108       trans.UseTransaction(() => {
    109         var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.Id, null, null);
     113    private void AssignJob(DA.Slave slave, Guid taskId) {
     114      var task = dao.UpdateTaskStateDA(taskId, DataAccess.TaskState.Transferring, slave.ResourceId, null, null);
    110115
    111         // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
    112         task.LastHeartbeat = DateTime.Now;
    113         dao.UpdateTask(task);
    114       });
     116      // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
     117      task.LastHeartbeat = DateTime.Now;
     118      dao.UpdateTaskDA(task);
    115119    }
    116120
     
    128132        actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll));
    129133      } else {
    130         // process the jobProgresses
    131         foreach (var jobProgress in heartbeat.JobProgress) {
    132           Task curTask = null;
    133           curTask = trans.UseTransaction(() => { return dao.GetTask(jobProgress.Key); });
    134           if (curTask == null) {
    135             // task does not exist in db
    136             actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key));
    137             DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
    138           } else {
    139             if (curTask.CurrentStateLog.SlaveId == Guid.Empty || curTask.CurrentStateLog.SlaveId != heartbeat.SlaveId) {
    140               // assigned slave does not match heartbeat
    141               actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id));
    142               DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask);
    143             } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) {
    144               // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
    145               actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id));
     134        trans.UseTransaction(() => {
     135          // process the jobProgresses
     136          foreach (var jobProgress in heartbeat.JobProgress) {
     137            var curTask = dao.GetTaskDA(jobProgress.Key);
     138            if (curTask == null) {
     139              // task does not exist in db
     140              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key));
     141              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
    146142            } else {
    147               // save task execution time
    148               curTask.ExecutionTime = jobProgress.Value;
    149               curTask.LastHeartbeat = DateTime.Now;
     143              var currentStateLog = curTask.StateLogs.LastOrDefault();
     144              if (currentStateLog.SlaveId == Guid.Empty || currentStateLog.SlaveId != heartbeat.SlaveId) {
     145                // assigned slave does not match heartbeat
     146                actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
     147                DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask);
     148              } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) {
     149                // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
     150                actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
     151              } else {
     152                // save task execution time
     153                curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds;
     154                curTask.LastHeartbeat = DateTime.Now;
    150155
    151               switch (curTask.Command) {
    152                 case Command.Stop:
    153                   actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.Id));
    154                   break;
    155                 case Command.Pause:
    156                   actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id));
    157                   break;
    158                 case Command.Abort:
    159                   actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id));
    160                   break;
     156                switch (curTask.Command) {
     157                  case DA.Command.Stop:
     158                    actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId));
     159                    break;
     160                  case DA.Command.Pause:
     161                    actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
     162                    break;
     163                  case DA.Command.Abort:
     164                    actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
     165                    break;
     166                }
     167                dao.UpdateTaskDA(curTask);
    161168              }
    162               trans.UseTransaction(() => { dao.UpdateTask(curTask); });
    163169            }
    164170          }
    165         }
     171        });
    166172      }
    167173      return actions;
    168174    }
    169175
    170     private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) {
    171       return trans.UseTransaction(() => {
    172         var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id);
    173         var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id);
    174         return assignedResourceIds.Any(x => slaveResourceIds.Contains(x));
    175       });
     176    private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, DA.Task curTask) {
     177      var assignedResourceIds = curTask.AssignedResources.Select(ar => ar.Resource.ResourceId);
     178      var slaveResourceIds = dao.GetParentResourcesDA(slaveId).Select(x => x.ResourceId);
     179      return assignedResourceIds.Any(r => slaveResourceIds.Contains(r));
    176180    }
    177181
    178182    private bool SlaveIsAllowedToCalculate(Guid slaveId) {
     183      var parentResources = dao.GetParentResourcesDA(slaveId);
    179184      // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also
    180       return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0); });
     185      return parentResources.All(r => r.Downtimes.Where(d => d.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)).Count() == 0);
    181186    }
    182187
    183188    private bool ShutdownSlaveComputer(Guid slaveId) {
    184       return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).Any(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() != 0); });
     189      var parentResources = dao.GetParentResourcesDA(slaveId);
     190
     191      return parentResources.Any(r => r.Downtimes.Where(d => d.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)).Count() != 0);
    185192    }
    186193  }
Note: See TracChangeset for help on using the changeset viewer.