Changeset 17574 for trunk/HeuristicLab.Services.Hive/3.3/Manager
- Timestamp:
- 05/29/20 13:28:25 (4 years ago)
- Location:
- trunk/HeuristicLab.Services.Hive/3.3/Manager
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/HeuristicLab.Services.Hive/3.3/Manager/EventManager.cs
r17180 r17574 28 28 public class EventManager : IEventManager { 29 29 private const string SlaveTimeout = "Slave timed out."; 30 private static readonly TaskState[] CompletedStates = { TaskState.Finished, TaskState.Aborted, TaskState.Failed }; 31 30 32 private IPersistenceManager PersistenceManager { 31 33 get { return ServiceLocator.Instance.PersistenceManager; } … … 33 35 34 36 public void Cleanup() { 37 Console.WriteLine("started cleanup"); 35 38 var pm = PersistenceManager; 36 39 37 pm.UseTransaction(() => { 38 FinishJobDeletion(pm); 39 pm.SubmitChanges(); 40 }); 40 // preemptiv delete obsolete entities 41 // speeds up job deletion 42 BatchDelete((p, s) => p.StateLogDao.DeleteObsolete(s), 100, 100, true, pm, "DeleteObsoleteStateLogs"); 43 BatchDelete((p, s) => p.TaskDataDao.DeleteObsolete(s), 100, 20, true, pm, "DeleteObsoleteTaskData"); 44 BatchDelete((p, s) => p.TaskDao.DeleteObsolete(s), 100, 20, false, pm, "DeleteObsoleteTasks"); 45 BatchDelete((p, s) => p.JobDao.DeleteByState(JobState.DeletionPending, s), 100, 20, true, pm, "DeleteObsoleteJobs"); 41 46 42 pm.UseTransaction(() => { 43 SetTimeoutSlavesOffline(pm); 44 SetTimeoutTasksWaiting(pm); 45 DeleteObsoleteSlaves(pm); 46 pm.SubmitChanges(); 47 }); 47 LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: SetTimeoutSlavesOffline"); 48 Console.WriteLine("5"); 49 pm.UseTransactionAndSubmit(() => { SetTimeoutSlavesOffline(pm); }); 50 LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: SetTimeoutTasksWaiting"); 51 Console.WriteLine("6"); 52 pm.UseTransactionAndSubmit(() => { SetTimeoutTasksWaiting(pm); }); 53 LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: DeleteObsoleteSlaves"); 54 Console.WriteLine("7"); 55 pm.UseTransactionAndSubmit(() => { DeleteObsoleteSlaves(pm); }); 56 LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: AbortObsoleteTasks"); 57 Console.WriteLine("8"); 58 pm.UseTransactionAndSubmit(() => { AbortObsoleteTasks(pm); }); 59 LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: FinishParentTasks"); 60 Console.WriteLine("9"); 61 pm.UseTransactionAndSubmit(() => { FinishParentTasks(pm); }); 62 LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: DONE"); 63 Console.WriteLine("10"); 64 } 48 65 49 pm.UseTransaction(() => { 50 FinishParentTasks(pm); 51 pm.SubmitChanges(); 52 }); 66 private void BatchDelete( 67 Func<IPersistenceManager, int, int> deletionFunc, 68 int batchSize, 69 int maxCalls, 70 bool limitIsBatchSize, 71 IPersistenceManager pm, 72 string logMessage 73 ) { 74 int totalDeleted = 0; 75 while (maxCalls > 0) { 76 maxCalls--; 77 LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"HiveJanitor: {logMessage}"); 78 Console.WriteLine($"HiveJanitor: {logMessage}"); 79 var deleted = pm.UseTransactionAndSubmit(() => { return deletionFunc(pm, batchSize); }); 80 LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"HiveJanitor: {logMessage} DONE (deleted {deleted}, {maxCalls} calls left)"); 81 Console.WriteLine($"HiveJanitor: {logMessage} DONE (deleted {deleted}, {maxCalls} calls left)"); 82 totalDeleted += deleted; 83 if (limitIsBatchSize && deleted < batchSize || deleted <= 0) return; 84 } 85 LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"HiveJanitor: Possible rows left to delete (total deleted: {totalDeleted})."); 86 Console.WriteLine($"HiveJanitor: Possible rows left to delete (total deleted: {totalDeleted})."); 53 87 } 54 88 … … 136 170 } 137 171 } 172 173 /// <summary> 174 /// Aborts tasks whose jobs have already been marked for deletion 175 /// </summary> 176 /// <param name="pm"></param> 177 private void AbortObsoleteTasks(IPersistenceManager pm) { 178 var jobDao = pm.JobDao; 179 var taskDao = pm.TaskDao; 180 181 var obsoleteTasks = (from jobId in jobDao.GetJobIdsByState(JobState.StatisticsPending) 182 join task in taskDao.GetAll() on jobId equals task.JobId 183 where !CompletedStates.Contains(task.State) && task.Command == null 184 select task).ToList(); 185 186 foreach (var t in obsoleteTasks) { 187 t.State = TaskState.Aborted; 188 } 189 } 138 190 } 139 191 } -
trunk/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
r17180 r17574 83 83 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 84 84 if (mutexAquired) { 85 var waitingTasks = pm.UseTransaction(() => taskDao.GetWaitingTasks(slave) 86 .Select(x => new TaskInfoForScheduler { 87 TaskId = x.TaskId, 88 JobId = x.JobId, 89 Priority = x.Priority 90 }) 91 .ToList() 92 ); 93 var availableTasks = TaskScheduler.Schedule(waitingTasks).ToArray(); 94 if (availableTasks.Any()) { 95 var task = availableTasks.First(); 96 AssignTask(pm, slave, task.TaskId); 97 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 85 var scheduledTaskIds = TaskScheduler.Schedule(slave, 1).ToArray(); 86 foreach (var id in scheduledTaskIds) { 87 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, id)); 98 88 } 99 89 } else { 100 LogFactory.GetLogger(this.GetType().Namespace).Log( "HeartbeatManager: The mutex used for scheduling could not be aquired.");90 LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager: The mutex used for scheduling could not be aquired. (HB from Slave {slave.ResourceId})"); 101 91 } 102 } 103 catch (AbandonedMutexException) { 104 LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned."); 105 } 106 catch (Exception ex) { 107 LogFactory.GetLogger(this.GetType().Namespace).Log(string.Format("HeartbeatManager threw an exception in ProcessHeartbeat: {0}", ex)); 108 } 109 finally { 92 } catch (AbandonedMutexException) { 93 LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager: The mutex used for scheduling has been abandoned. (HB from Slave {slave.ResourceId})"); 94 } catch (Exception ex) { 95 LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager threw an exception in ProcessHeartbeat (HB from Slave {slave.ResourceId}): {ex}"); 96 } finally { 110 97 if (mutexAquired) mutex.ReleaseMutex(); 111 98 } … … 113 100 } 114 101 return actions; 115 }116 117 private void AssignTask(IPersistenceManager pm, DA.Slave slave, Guid taskId) {118 const DA.TaskState transferring = DA.TaskState.Transferring;119 DateTime now = DateTime.Now;120 var taskDao = pm.TaskDao;121 var stateLogDao = pm.StateLogDao;122 pm.UseTransaction(() => {123 var task = taskDao.GetById(taskId);124 stateLogDao.Save(new DA.StateLog {125 State = transferring,126 DateTime = now,127 TaskId = taskId,128 SlaveId = slave.ResourceId,129 UserId = null,130 Exception = null131 });132 task.State = transferring;133 task.LastHeartbeat = now;134 pm.SubmitChanges();135 });136 102 } 137 103
Note: See TracChangeset
for help on using the changeset viewer.