Changeset 17825 for branches/3040_VectorBasedGP/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
- Timestamp:
- 01/27/21 14:10:56 (4 years ago)
- Location:
- branches/3040_VectorBasedGP
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/3040_VectorBasedGP
- Property svn:mergeinfo changed
-
branches/3040_VectorBasedGP/HeuristicLab.Services.Hive
- Property svn:mergeinfo changed
/trunk/HeuristicLab.Services.Hive merged: 17574-17575
- Property svn:mergeinfo changed
-
branches/3040_VectorBasedGP/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
r17180 r17825 83 83 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 84 84 if (mutexAquired) { 85 var waitingTasks = pm.UseTransaction(() => taskDao.GetWaitingTasks(slave) 86 .Select(x => new TaskInfoForScheduler { 87 TaskId = x.TaskId, 88 JobId = x.JobId, 89 Priority = x.Priority 90 }) 91 .ToList() 92 ); 93 var availableTasks = TaskScheduler.Schedule(waitingTasks).ToArray(); 94 if (availableTasks.Any()) { 95 var task = availableTasks.First(); 96 AssignTask(pm, slave, task.TaskId); 97 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 85 var scheduledTaskIds = TaskScheduler.Schedule(slave, 1).ToArray(); 86 foreach (var id in scheduledTaskIds) { 87 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, id)); 98 88 } 99 89 } else { 100 LogFactory.GetLogger(this.GetType().Namespace).Log( "HeartbeatManager: The mutex used for scheduling could not be aquired.");90 LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager: The mutex used for scheduling could not be aquired. (HB from Slave {slave.ResourceId})"); 101 91 } 102 } 103 catch (AbandonedMutexException) { 104 LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned."); 105 } 106 catch (Exception ex) { 107 LogFactory.GetLogger(this.GetType().Namespace).Log(string.Format("HeartbeatManager threw an exception in ProcessHeartbeat: {0}", ex)); 108 } 109 finally { 92 } catch (AbandonedMutexException) { 93 LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager: The mutex used for scheduling has been abandoned. (HB from Slave {slave.ResourceId})"); 94 } catch (Exception ex) { 95 LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager threw an exception in ProcessHeartbeat (HB from Slave {slave.ResourceId}): {ex}"); 96 } finally { 110 97 if (mutexAquired) mutex.ReleaseMutex(); 111 98 } … … 113 100 } 114 101 return actions; 115 }116 117 private void AssignTask(IPersistenceManager pm, DA.Slave slave, Guid taskId) {118 const DA.TaskState transferring = DA.TaskState.Transferring;119 DateTime now = DateTime.Now;120 var taskDao = pm.TaskDao;121 var stateLogDao = pm.StateLogDao;122 pm.UseTransaction(() => {123 var task = taskDao.GetById(taskId);124 stateLogDao.Save(new DA.StateLog {125 State = transferring,126 DateTime = now,127 TaskId = taskId,128 SlaveId = slave.ResourceId,129 UserId = null,130 Exception = null131 });132 task.State = transferring;133 task.LastHeartbeat = now;134 pm.SubmitChanges();135 });136 102 } 137 103 … … 154 120 var taskInfos = pm.UseTransaction(() => 155 121 (from task in taskDao.GetAll() 156 157 let lastStateLog = task.StateLogs.OrderByDescending(x => x.DateTime).FirstOrDefault()158 159 160 161 162 163 SlaveId = lastStateLog != null ? lastStateLog.SlaveId : default(Guid)164 122 where taskIds.Contains(task.TaskId) 123 let lastStateLog = task.StateLogs.OrderByDescending(x => x.DateTime).FirstOrDefault(x => x.State == DA.TaskState.Transferring) 124 select new { 125 TaskId = task.TaskId, 126 JobId = task.JobId, 127 State = task.State, 128 Command = task.Command, 129 SlaveId = lastStateLog != null ? lastStateLog.SlaveId : Guid.Empty 130 }).ToList() 165 131 ); 166 132
Note: See TracChangeset
for help on using the changeset viewer.