Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
05/29/20 13:28:25 (4 years ago)
Author:
jkarder
Message:

#3062: overhauled statistics generation and cleanup

  • switched to a single thread for database cleanup and statistics generation (executed sequentially)
  • switched to preemptive deletion of items that are in status DeletionPending (for jobs: statelogs, taskdata, tasks)
  • added code that aborts tasks whose jobs have already been marked for deletion
  • added method UseTransactionAndSubmit in addition to UseTransaction in PersistenceManager
  • updated DAO methods and introduced more bare metal sql
  • introduced DAO methods for batch deletion
  • fixed usage of enum values in DAO sql queries
  • deleted unnecessary triggers tr_JobDeleteCascade and tr_TaskDeleteCascade in Prepare Hive Database.sql
  • changed scheduling for less interference with janitor and other heartbeats
    • increased scheduling patience from 20 to 70 seconds (to wait longer to get the mutex for scheduling)
    • changed signature of ITaskScheduler.Schedule
    • added base class for TaskSchedulers and moved assignment of tasks to slaves into it
    • changed RoundRobinTaskScheduler to use bare metal sql
  • made MessageContainer a storable type (leftover)
  • updated HiveJanitorServiceInstaller.nsi
Location:
trunk/HeuristicLab.Services.Hive/3.3/Manager
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/HeuristicLab.Services.Hive/3.3/Manager/EventManager.cs

    r17180 r17574  
    2828  public class EventManager : IEventManager {
    2929    private const string SlaveTimeout = "Slave timed out.";
     30    private static readonly TaskState[] CompletedStates = { TaskState.Finished, TaskState.Aborted, TaskState.Failed };
     31
    3032    private IPersistenceManager PersistenceManager {
    3133      get { return ServiceLocator.Instance.PersistenceManager; }
     
    3335
    3436    public void Cleanup() {
     37      Console.WriteLine("started cleanup");
    3538      var pm = PersistenceManager;
    3639
    37       pm.UseTransaction(() => {
    38         FinishJobDeletion(pm);
    39         pm.SubmitChanges();
    40       });
     40      // preemptiv delete obsolete entities
     41      // speeds up job deletion
     42      BatchDelete((p, s) => p.StateLogDao.DeleteObsolete(s), 100, 100, true, pm, "DeleteObsoleteStateLogs");
     43      BatchDelete((p, s) => p.TaskDataDao.DeleteObsolete(s), 100, 20, true, pm, "DeleteObsoleteTaskData");
     44      BatchDelete((p, s) => p.TaskDao.DeleteObsolete(s), 100, 20, false, pm, "DeleteObsoleteTasks");
     45      BatchDelete((p, s) => p.JobDao.DeleteByState(JobState.DeletionPending, s), 100, 20, true, pm, "DeleteObsoleteJobs");
    4146
    42       pm.UseTransaction(() => {
    43         SetTimeoutSlavesOffline(pm);
    44         SetTimeoutTasksWaiting(pm);
    45         DeleteObsoleteSlaves(pm);
    46         pm.SubmitChanges();
    47       });
     47      LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: SetTimeoutSlavesOffline");
     48      Console.WriteLine("5");
     49      pm.UseTransactionAndSubmit(() => { SetTimeoutSlavesOffline(pm); });
     50      LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: SetTimeoutTasksWaiting");
     51      Console.WriteLine("6");
     52      pm.UseTransactionAndSubmit(() => { SetTimeoutTasksWaiting(pm); });
     53      LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: DeleteObsoleteSlaves");
     54      Console.WriteLine("7");
     55      pm.UseTransactionAndSubmit(() => { DeleteObsoleteSlaves(pm); });
     56      LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: AbortObsoleteTasks");
     57      Console.WriteLine("8");
     58      pm.UseTransactionAndSubmit(() => { AbortObsoleteTasks(pm); });
     59      LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: FinishParentTasks");
     60      Console.WriteLine("9");
     61      pm.UseTransactionAndSubmit(() => { FinishParentTasks(pm); });
     62      LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: DONE");
     63      Console.WriteLine("10");
     64    }
    4865
    49       pm.UseTransaction(() => {
    50         FinishParentTasks(pm);
    51         pm.SubmitChanges();
    52       });
     66    private void BatchDelete(
     67      Func<IPersistenceManager, int, int> deletionFunc,
     68      int batchSize,
     69      int maxCalls,
     70      bool limitIsBatchSize,
     71      IPersistenceManager pm,
     72      string logMessage
     73    ) {
     74      int totalDeleted = 0;
     75      while (maxCalls > 0) {
     76        maxCalls--;
     77        LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"HiveJanitor: {logMessage}");
     78        Console.WriteLine($"HiveJanitor: {logMessage}");
     79        var deleted = pm.UseTransactionAndSubmit(() => { return deletionFunc(pm, batchSize); });
     80        LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"HiveJanitor: {logMessage} DONE (deleted {deleted}, {maxCalls} calls left)");
     81        Console.WriteLine($"HiveJanitor: {logMessage} DONE (deleted {deleted}, {maxCalls} calls left)");
     82        totalDeleted += deleted;
     83        if (limitIsBatchSize && deleted < batchSize || deleted <= 0) return;
     84      }
     85      LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"HiveJanitor: Possible rows left to delete (total deleted: {totalDeleted}).");
     86      Console.WriteLine($"HiveJanitor: Possible rows left to delete (total deleted: {totalDeleted}).");
    5387    }
    5488
     
    136170      }
    137171    }
     172
     173    /// <summary>
     174    /// Aborts tasks whose jobs have already been marked for deletion
     175    /// </summary>
     176    /// <param name="pm"></param>
     177    private void AbortObsoleteTasks(IPersistenceManager pm) {
     178      var jobDao = pm.JobDao;
     179      var taskDao = pm.TaskDao;
     180
     181      var obsoleteTasks = (from jobId in jobDao.GetJobIdsByState(JobState.StatisticsPending)
     182                           join task in taskDao.GetAll() on jobId equals task.JobId
     183                           where !CompletedStates.Contains(task.State) && task.Command == null
     184                           select task).ToList();
     185
     186      foreach (var t in obsoleteTasks) {
     187        t.State = TaskState.Aborted;
     188      }
     189    }
    138190  }
    139191}
  • trunk/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs

    r17180 r17574  
    8383            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
    8484            if (mutexAquired) {
    85               var waitingTasks = pm.UseTransaction(() => taskDao.GetWaitingTasks(slave)
    86                   .Select(x => new TaskInfoForScheduler {
    87                     TaskId = x.TaskId,
    88                     JobId = x.JobId,
    89                     Priority = x.Priority
    90                   })
    91                   .ToList()
    92               );
    93               var availableTasks = TaskScheduler.Schedule(waitingTasks).ToArray();
    94               if (availableTasks.Any()) {
    95                 var task = availableTasks.First();
    96                 AssignTask(pm, slave, task.TaskId);
    97                 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
     85              var scheduledTaskIds = TaskScheduler.Schedule(slave, 1).ToArray();
     86              foreach (var id in scheduledTaskIds) {
     87                actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, id));
    9888              }
    9989            } else {
    100               LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
     90              LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager: The mutex used for scheduling could not be aquired. (HB from Slave {slave.ResourceId})");
    10191            }
    102           }
    103           catch (AbandonedMutexException) {
    104             LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
    105           }
    106           catch (Exception ex) {
    107             LogFactory.GetLogger(this.GetType().Namespace).Log(string.Format("HeartbeatManager threw an exception in ProcessHeartbeat: {0}", ex));
    108           }
    109           finally {
     92          } catch (AbandonedMutexException) {
     93            LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager: The mutex used for scheduling has been abandoned. (HB from Slave {slave.ResourceId})");
     94          } catch (Exception ex) {
     95            LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager threw an exception in ProcessHeartbeat (HB from Slave {slave.ResourceId}): {ex}");
     96          } finally {
    11097            if (mutexAquired) mutex.ReleaseMutex();
    11198          }
     
    113100      }
    114101      return actions;
    115     }
    116 
    117     private void AssignTask(IPersistenceManager pm, DA.Slave slave, Guid taskId) {
    118       const DA.TaskState transferring = DA.TaskState.Transferring;
    119       DateTime now = DateTime.Now;
    120       var taskDao = pm.TaskDao;
    121       var stateLogDao = pm.StateLogDao;
    122       pm.UseTransaction(() => {
    123         var task = taskDao.GetById(taskId);
    124         stateLogDao.Save(new DA.StateLog {
    125           State = transferring,
    126           DateTime = now,
    127           TaskId = taskId,
    128           SlaveId = slave.ResourceId,
    129           UserId = null,
    130           Exception = null
    131         });
    132         task.State = transferring;
    133         task.LastHeartbeat = now;
    134         pm.SubmitChanges();
    135       });
    136102    }
    137103
Note: See TracChangeset for help on using the changeset viewer.