Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
04/06/21 13:13:32 (3 years ago)
Author:
dpiringe
Message:

#3026

  • merged trunk into branch
Location:
branches/3026_IntegrationIntoSymSpace
Files:
14 edited
1 copied

Legend:

Unmodified
Added
Removed
  • branches/3026_IntegrationIntoSymSpace

  • branches/3026_IntegrationIntoSymSpace/HeuristicLab.Services.Hive

  • branches/3026_IntegrationIntoSymSpace/HeuristicLab.Services.Hive/3.3/HeuristicLab.Services.Hive-3.3.csproj

    r16658 r17928  
    173173      <DesignTimeSharedInput>True</DesignTimeSharedInput>
    174174    </Compile>
     175    <Compile Include="Scheduler\TaskScheduler.cs" />
    175176    <Compile Include="ServiceContracts\IHiveService.cs" />
    176177    <Compile Include="ServiceFaults\PluginAlreadyExistsFault.cs" />
  • branches/3026_IntegrationIntoSymSpace/HeuristicLab.Services.Hive/3.3/HiveJanitor.cs

    r17180 r17928  
    2828  public class HiveJanitor {
    2929    private bool stop;
    30     private AutoResetEvent cleanupWaitHandle;
    31     private AutoResetEvent generateStatisticsWaitHandle;
     30    private AutoResetEvent runWaitHandle;
    3231
    3332    private IPersistenceManager PersistenceManager {
     
    4443    public HiveJanitor() {
    4544      stop = false;
    46       cleanupWaitHandle = new AutoResetEvent(false);
    47       generateStatisticsWaitHandle = new AutoResetEvent(false);
     45      runWaitHandle = new AutoResetEvent(false);
    4846    }
    4947
    5048    public void StopJanitor() {
    5149      stop = true;
    52       cleanupWaitHandle.Set();
    53       generateStatisticsWaitHandle.Set();
     50      runWaitHandle.Set();
     51    }
     52
     53    public void Run() {
     54      while (!stop) {
     55        RunCleanup();
     56        RunGenerateStatistics();
     57        runWaitHandle.WaitOne(Properties.Settings.Default.GenerateStatisticsInterval);
     58      }
     59      runWaitHandle.Close();
    5460    }
    5561
    5662    public void RunCleanup() {
    5763      var pm = PersistenceManager;
    58       while (!stop) {
    59         try {
    60           LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: starting cleanup.");
    61           bool cleanup = false;
     64      try {
     65        LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: starting cleanup.");
     66        bool cleanup = false;
    6267
    63           var lifecycleDao = pm.LifecycleDao;
    64           pm.UseTransaction(() => {
    65             var lifecycle = lifecycleDao.GetLastLifecycle();
    66             if (lifecycle == null
    67                 || DateTime.Now - lifecycle.LastCleanup > Properties.Settings.Default.CleanupInterval) {
    68               lifecycleDao.UpdateLifecycle();
    69               cleanup = true;
    70             }
    71             pm.SubmitChanges();
    72           }, true);
     68        var lifecycleDao = pm.LifecycleDao;
     69        pm.UseTransaction(() => {
     70          var lifecycle = lifecycleDao.GetLastLifecycle();
     71          if (lifecycle == null
     72              || DateTime.Now - lifecycle.LastCleanup > Properties.Settings.Default.CleanupInterval) {
     73            lifecycleDao.UpdateLifecycle();
     74            cleanup = true;
     75          }
     76          pm.SubmitChanges();
     77        }, true);
    7378
    74           if (cleanup) {
    75             EventManager.Cleanup();
    76           }
    77           LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: cleanup finished.");
     79        if (cleanup) {
     80          EventManager.Cleanup();
    7881        }
    79         catch (Exception e) {
    80           LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log(string.Format("HiveJanitor: The following exception occured: {0}", e.ToString()));
    81         }
    82         cleanupWaitHandle.WaitOne(Properties.Settings.Default.CleanupInterval);
     82        LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: cleanup finished.");
     83      } catch (Exception e) {
     84        LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log(string.Format("HiveJanitor: The following exception occured: {0}", e.ToString()));
    8385      }
    84       cleanupWaitHandle.Close();
    8586    }
    8687
    8788    public void RunGenerateStatistics() {
    88       while (!stop) {
    89         try {
    90           LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: starting generate statistics.");
    91           StatisticsGenerator.GenerateStatistics();
    92           LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: generate statistics finished.");
    93         }
    94         catch (Exception e) {
    95           LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log(string.Format("HiveJanitor: The following exception occured: {0}", e));
    96         }
    97 
    98         generateStatisticsWaitHandle.WaitOne(Properties.Settings.Default.GenerateStatisticsInterval);
     89      try {
     90        LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: starting generate statistics.");
     91        StatisticsGenerator.GenerateStatistics();
     92        LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: generate statistics finished.");
     93      } catch (Exception e) {
     94        LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log(string.Format("HiveJanitor: The following exception occured: {0}", e));
    9995      }
    100 
    101       generateStatisticsWaitHandle.Close();
    10296    }
    10397  }
  • branches/3026_IntegrationIntoSymSpace/HeuristicLab.Services.Hive/3.3/HiveService.cs

    r17180 r17928  
    209209    }
    210210
     211    private bool IsAuthorizedForTask(DA.Task task, Guid? slaveId) {
     212      var lastStateLog = task.StateLogs.OrderByDescending(x => x.DateTime).FirstOrDefault(x => x.State == DA.TaskState.Transferring);
     213      return lastStateLog == null || slaveId == null || lastStateLog.SlaveId == slaveId;
     214    }
     215
    211216    public DT.Task UpdateTaskState(Guid taskId, DT.TaskState taskState, Guid? slaveId, Guid? userId, string exception) {
    212217      RoleVerifier.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave);
     
    217222        return pm.UseTransaction(() => {
    218223          var task = taskDao.GetById(taskId);
    219           UpdateTaskState(pm, task, taskState, slaveId, userId, exception);
    220           pm.SubmitChanges();
     224          if (IsAuthorizedForTask(task, slaveId)) {
     225            UpdateTaskState(pm, task, taskState, slaveId, userId, exception);
     226            pm.SubmitChanges();
     227          }
    221228          return task.ToDto();
    222229        });
     
    665672            bool oldIsAllowedToCalculate = slave.IsAllowedToCalculate;
    666673            Guid? oldParentResourceId = slave.ParentResourceId;
     674            bool? oldIsDisposable = slave.IsDisposable;
    667675            slaveInfo.CopyToEntity(slave);
    668676            slave.IsAllowedToCalculate = oldIsAllowedToCalculate;
    669677            slave.ParentResourceId = oldParentResourceId;
     678            slave.IsDisposable = oldIsDisposable;
    670679            slave.LastHeartbeat = DateTime.Now;
    671680            slave.SlaveState = DA.SlaveState.Idle;
  • branches/3026_IntegrationIntoSymSpace/HeuristicLab.Services.Hive/3.3/HiveStatisticsGenerator.cs

    r17180 r17928  
    2222using System;
    2323using System.Collections.Generic;
    24 using System.Data.Linq;
    2524using System.Linq;
    2625using HeuristicLab.Services.Access.DataAccess;
     
    3635
    3736    public void GenerateStatistics() {
    38       using (var pm = new PersistenceManager()) {
    39 
    40         pm.UseTransaction(() => {
    41           UpdateDimProjectTable(pm);
    42           pm.SubmitChanges();
     37      Console.WriteLine("started generate statistics");
     38
     39      using (var pm = new PersistenceManager(true)) {
     40        var sw = new System.Diagnostics.Stopwatch();
     41
     42        sw.Start();
     43        pm.UseTransactionAndSubmit(() => { UpdateDimProjectTable(pm); });
     44        sw.Stop();
     45        LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"UpdateDimProjectTable: {sw.Elapsed}");
     46        Console.WriteLine($"UpdateDimProjectTable: {sw.Elapsed}");
     47        sw.Reset();
     48
     49        pm.UseTransactionAndSubmit(() => {
     50          sw.Start();
     51          UpdateDimUserTable(pm);
     52          sw.Stop();
     53          LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"UpdateDimUserTable: {sw.Elapsed}");
     54          Console.WriteLine($"UpdateDimUserTable: {sw.Elapsed}");
     55          sw.Reset();
     56
     57          sw.Start();
     58          UpdateDimJobTable(pm);
     59          sw.Stop();
     60          LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"UpdateDimJobTable: {sw.Elapsed}");
     61          Console.WriteLine($"UpdateDimJobTable: {sw.Elapsed}");
     62          sw.Reset();
     63
     64          sw.Start();
     65          UpdateDimClientsTable(pm);
     66          sw.Stop();
     67          LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"UpdateDimClientsTable: {sw.Elapsed}");
     68          Console.WriteLine($"UpdateDimClientsTable: {sw.Elapsed}");
     69          sw.Reset();
    4370        });
    4471
    45         pm.UseTransaction(() => {
    46           UpdateDimUserTable(pm);
    47          
    48           UpdateDimJobTable(pm);
    49           UpdateDimClientsTable(pm);
    50           pm.SubmitChanges();
    51         });
    52 
    53         DimTime time = null;
    54         pm.UseTransaction(() => {
    55           time = UpdateDimTimeTable(pm);
    56           pm.SubmitChanges();
    57         });
    58 
    59         if (time != null) {
    60           pm.UseTransaction(() => {
    61             UpdateFactClientInfoTable(time, pm);
    62             pm.SubmitChanges();
    63             UpdateFactProjectInfoTable(time, pm);
    64             pm.SubmitChanges();
    65           });
    66 
    67           pm.UseTransaction(() => {
    68             try {
    69               UpdateFactTaskTable(pm);
    70               UpdateExistingDimJobs(pm);
    71               FlagJobsForDeletion(pm);
    72               pm.SubmitChanges();
    73             }
    74             catch (DuplicateKeyException e) {
    75               var logger = LogFactory.GetLogger(typeof(HiveStatisticsGenerator).Namespace);
    76               logger.Log(string.Format(
    77                 @"Propable change from summertime to wintertime, resulting in overlapping times.
    78                           On wintertime to summertime change, slave timeouts and a fact gap will occur.
    79                           Exception Details: {0}", e));
    80             }
    81           });
    82         }
     72        pm.UseTransactionAndSubmit(() => {
     73          sw.Start();
     74          var time = UpdateDimTimeTable(pm);
     75          sw.Stop();
     76          LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"UpdateDimTimeTable: {sw.Elapsed}");
     77          Console.WriteLine($"UpdateDimTimeTable: {sw.Elapsed}");
     78          sw.Reset();
     79
     80          sw.Start();
     81          UpdateFactClientInfoTable(time, pm);
     82          sw.Stop();
     83          LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"UpdateFactClientInfoTable: {sw.Elapsed}");
     84          Console.WriteLine($"UpdateFactClientInfoTable: {sw.Elapsed}");
     85          sw.Reset();
     86
     87          sw.Start();
     88          UpdateFactProjectInfoTable(time, pm);
     89          sw.Stop();
     90          LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"UpdateFactProjectInfoTable: {sw.Elapsed}");
     91          Console.WriteLine($"UpdateFactProjectInfoTable: {sw.Elapsed}");
     92          sw.Reset();
     93
     94
     95          sw.Start();
     96          UpdateFactTaskTable(pm);
     97          sw.Stop();
     98          LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"UpdateFactTaskTable: {sw.Elapsed}");
     99          Console.WriteLine($"UpdateFactTaskTable: {sw.Elapsed}");
     100          sw.Reset();
     101
     102          sw.Start();
     103          UpdateExistingDimJobs(pm);
     104          sw.Stop();
     105          LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"UpdateExistingDimJobs: {sw.Elapsed}");
     106          Console.WriteLine($"UpdateExistingDimJobs: {sw.Elapsed}");
     107          sw.Reset();
     108
     109          sw.Start();
     110          FlagJobsForDeletion(pm);
     111          sw.Stop();
     112          LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"FlagJobsForDeletion: {sw.Elapsed}");
     113          Console.WriteLine($"FlagJobsForDeletion: {sw.Elapsed}");
     114          sw.Reset();
     115        }, longRunning: true);
    83116      }
    84117    }
     
    207240
    208241    private void UpdateExistingDimJobs(PersistenceManager pm) {
    209       var dimProjectDao = pm.DimProjectDao;
    210       var jobDao = pm.JobDao;
    211242      var dimJobDao = pm.DimJobDao;
    212       var factTaskDao = pm.FactTaskDao;
    213       foreach (var dimJob in dimJobDao.GetNotCompletedJobs()) {
    214         var taskStates = factTaskDao.GetByJobId(dimJob.JobId)
    215             .GroupBy(x => x.TaskState)
    216             .Select(x => new {
    217               State = x.Key,
    218               Count = x.Count()
    219             }).ToList();
    220         int totalTasks = 0, completedTasks = 0;
    221         foreach (var state in taskStates) {
    222           totalTasks += state.Count;
    223           if (CompletedStates.Contains(state.State)) {
    224             completedTasks += state.Count;
    225           }
    226         }
    227         var job = jobDao.GetById(dimJob.JobId);
    228         if (totalTasks == completedTasks) {
    229           var completeDate = factTaskDao.GetLastCompletedTaskFromJob(dimJob.JobId);
    230           if (completeDate == null) {
    231             if (job == null) {
    232               completeDate = DateTime.Now;
    233             }
    234           }
    235           dimJob.DateCompleted = completeDate;
    236         }
    237         if(job != null) {
    238           dimJob.JobName = job.Name;
    239           dimJob.ProjectId = dimProjectDao.GetLastValidIdByProjectId(job.ProjectId);
    240         }
    241 
    242         dimJob.TotalTasks = totalTasks;
    243         dimJob.CompletedTasks = completedTasks;
    244       }
     243      dimJobDao.UpdateExistingDimJobs();
    245244    }
    246245
     
    248247      var jobDao = pm.JobDao;
    249248      var jobs = jobDao.GetJobsReadyForDeletion();
    250       foreach(var job in jobs) {
     249      foreach (var job in jobs) {
    251250        job.State = JobState.DeletionPending;
    252251      }
     
    257256      var resourceDao = pm.ResourceDao;
    258257
    259       var resources = resourceDao.GetAll().ToList();
    260       var dimClients = dimClientDao.GetAllOnlineClients().ToList();
     258      var resources = resourceDao.GetAll().ToList(); // all live now
     259      var dimClients = dimClientDao.GetAllOnlineClients().ToList(); // all in statistics which are online (i.e. not expired)
    261260
    262261      var onlineClients = dimClients.Where(x => resources.Select(y => y.ResourceId).Contains(x.ResourceId));
     
    265264
    266265      // set expiration time of removed resources
    267       foreach(var r in removedResources) {
     266      foreach (var r in removedResources) {
    268267        r.DateExpired = DateTime.Now;
    269268      }
     
    281280      // expire client if its parent has changed and create a new entry
    282281      // otherwise perform "normal" update
    283       foreach(var dimc in onlineClients) {
     282      foreach (var dimc in onlineClients) {
    284283        var r = resources.Where(x => x.ResourceId == dimc.ResourceId).SingleOrDefault();
    285         if(r != null) {
    286           if(dimc.ParentResourceId == null ? r.ParentResourceId != null : dimc.ParentResourceId != r.ParentResourceId) {
     284        if (r != null) {
     285          if (dimc.ParentResourceId == null ? r.ParentResourceId != null : dimc.ParentResourceId != r.ParentResourceId) {
    287286            var now = DateTime.Now;
    288287            dimc.DateExpired = now;
     
    416415        let uStats = projectUsageStats.Where(x => x.ProjectId == dimp.ProjectId).SingleOrDefault()
    417416        select new FactProjectInfo {
    418             ProjectId = dimp.Id,
    419             DimTime = newTime,
    420             NumTotalCores = aStats != null ? aStats.Cores : 0,
    421             TotalMemory = aStats != null ? aStats.Memory : 0,
    422             NumUsedCores = uStats != null ? uStats.Cores : 0,
    423             UsedMemory = uStats != null ? uStats.Memory : 0
    424           }
     417          ProjectId = dimp.Id,
     418          DimTime = newTime,
     419          NumTotalCores = aStats != null ? aStats.Cores : 0,
     420          TotalMemory = aStats != null ? aStats.Memory : 0,
     421          NumUsedCores = uStats != null ? uStats.Cores : 0,
     422          UsedMemory = uStats != null ? uStats.Memory : 0
     423        }
    425424        );
    426425    }
     
    431430      var dimClientDao = pm.DimClientDao;
    432431
    433       var factTaskIds = factTaskDao.GetAll().Select(x => x.TaskId);
    434       var notFinishedFactTasks = factTaskDao.GetNotFinishedTasks();
    435       //var notFinishedFactTasks = factTaskDao.GetNotFinishedTasks().Select(x => new {
    436       //  x.TaskId,
    437       //  x.LastClientId
    438       //});
    439 
    440       // query several properties for all new and not finished tasks
    441       // in order to use them later either...
    442       // (1) to update the fact task entry of not finished tasks
    443       // (2) to insert a new fact task entry for new tasks
     432      var preselectedNewAndNotFinishedTasks =
     433        (from task in taskDao.GetAll()
     434         from factTask in factTaskDao.GetAll().Where(f => task.TaskId == f.TaskId).DefaultIfEmpty()
     435         let stateLogs = task.StateLogs.OrderByDescending(x => x.DateTime).ToList()
     436         let lastSlaveId = stateLogs.First(x => x.SlaveId != null)
     437         where !task.IsParentTask && (factTask.TaskId == null || factTask.EndTime == null)
     438         select new { Task = task, StateLogs = stateLogs, LastSlaveId = lastSlaveId }).ToList();
     439
     440      Console.WriteLine("preselectedNewAndNotFinishedTasks.Count = {0}", preselectedNewAndNotFinishedTasks.Count);
     441
     442      // jkarder: maybe we can split this query into multiple ones to retrieve state logs and the last slave
     443
     444      var clients = dimClientDao.GetAllOnlineClients().ToList();
     445      Console.WriteLine("clients.Count = {0}", clients.Count);
     446      var notFinishedFactTasks = factTaskDao.GetNotFinishedTasks().ToList();
     447      Console.WriteLine("notFinishedFactTasks.Count = {0}", notFinishedFactTasks.Count);
     448
    444449      var newAndNotFinishedTasks =
    445         (from task in taskDao.GetAllChildTasks()
    446          let stateLogs = task.StateLogs.OrderByDescending(x => x.DateTime)
    447          let lastSlaveId = stateLogs.First(x => x.SlaveId != null).SlaveId
    448          where (!factTaskIds.Contains(task.TaskId)
    449                 || notFinishedFactTasks.Select(x => x.TaskId).Contains(task.TaskId))
     450        (from x in preselectedNewAndNotFinishedTasks
     451         let task = x.Task
     452         let stateLogs = x.StateLogs // jkarder: if multiple join results in multiple rows, statelogs of all equal tasks must be the same ...
     453         let lastSlaveId = x.LastSlaveId
    450454         join lastFactTask in notFinishedFactTasks on task.TaskId equals lastFactTask.TaskId into lastFactPerTask
    451455         from lastFact in lastFactPerTask.DefaultIfEmpty()
    452          join client in dimClientDao.GetAllOnlineClients() on lastSlaveId equals client.ResourceId into clientsPerSlaveId
    453          from client in clientsPerSlaveId.DefaultIfEmpty()
     456           // jkarder:
     457           // we can still fix this another way, if we only select that one row from dimclients that fits the given statelog/lastslave entry
     458           // dimclient has multiple entires for one and the same client, because we track changes in resource group hierarchies
     459           // -> left join from task/statelog to dimclient results in multiple rows because multiple rows in dimclient with the same resource id exist
     460           // -> further down the road we call singleordefault to single out tasks to get the data for the update
     461           // -> dimclient should only contain one valid row for a given time span that fits to the lastslaveid.datetime
     462           // -> client.datecreated <= lastslaveid.datetime <= client.dateexpired
     463           // it's aweful ...
     464         from client in clients.Where(c => lastSlaveId != null && lastSlaveId.SlaveId == c.ResourceId && c.DateCreated <= lastSlaveId.DateTime && (c.DateExpired == null || lastSlaveId.DateTime <= c.DateExpired)).DefaultIfEmpty()
     465           // jkarder:
     466           //join client in clients on lastSlaveId.SlaveId equals client.ResourceId into clientsPerSlaveId
     467           //from client in clientsPerSlaveId.DefaultIfEmpty()
    454468         select new {
    455469           TaskId = task.TaskId,
     
    459473           MemoryRequired = task.MemoryNeeded,
    460474           State = task.State,
    461            StateLogs = stateLogs.OrderBy(x => x.DateTime),
     475           StateLogs = stateLogs.OrderBy(x => x.DateTime).ToList(),
    462476           LastClientId = client != null
    463                           ? client.Id : lastFact != null
    464                           ? lastFact.LastClientId : (Guid?)null,
     477                        ? client.Id : lastFact != null
     478                        ? lastFact.LastClientId : (Guid?)null,
    465479           NotFinishedTask = notFinishedFactTasks.Any(y => y.TaskId == task.TaskId)
    466480         }).ToList();
     481
     482      Console.WriteLine("newAndNotFinishedTasks.Count = {0}", newAndNotFinishedTasks.Count);
     483
    467484
    468485      // (1) update data of already existing facts
    469486      // i.e. for all in newAndNotFinishedTasks where NotFinishedTask = true
    470487      foreach (var notFinishedFactTask in notFinishedFactTasks) {
    471         var nfftUpdate = newAndNotFinishedTasks.Where(x => x.TaskId == notFinishedFactTask.TaskId).SingleOrDefault();
    472         if(nfftUpdate != null) {
     488        // jkarder: firstordefault should work too, because statelogs of multiple task rows that result from the earlier join have to be the same
     489        var nfftUpdate = newAndNotFinishedTasks.Where(x => x.TaskId == notFinishedFactTask.TaskId).FirstOrDefault();
     490        if (nfftUpdate != null) {
    473491          var taskData = CalculateFactTaskData(nfftUpdate.StateLogs);
    474492
     
    487505          notFinishedFactTask.Exception = taskData.Exception;
    488506          notFinishedFactTask.InitialWaitingTime = taskData.InitialWaitingTime;
     507        } else {
     508          //Console.WriteLine("could not update task {0}", notFinishedFactTask.TaskId);
    489509        }
    490510      }
     511
     512      Console.WriteLine("nfft update complete");
    491513
    492514      // (2) insert facts for new tasks
    493515      // i.e. for all in newAndNotFinishedTasks where NotFinishedTask = false
    494       factTaskDao.Save(
    495         from x in newAndNotFinishedTasks
    496         where !x.NotFinishedTask
    497         let taskData = CalculateFactTaskData(x.StateLogs)
    498         select new FactTask {
    499           TaskId = x.TaskId,
    500           JobId = x.JobId,
    501           StartTime = taskData.StartTime,
    502           EndTime = taskData.EndTime,
    503           LastClientId = x.LastClientId,
    504           Priority = x.Priority,
    505           CoresRequired = x.CoresRequired,
    506           MemoryRequired = x.MemoryRequired,
    507           NumCalculationRuns = taskData.CalculationRuns,
    508           NumRetries = taskData.Retries,
    509           WaitingTime = taskData.WaitingTime,
    510           CalculatingTime = taskData.CalculatingTime,
    511           TransferTime = taskData.TransferTime,
    512           TaskState = x.State,
    513           Exception = taskData.Exception,
    514           InitialWaitingTime = taskData.InitialWaitingTime
    515         });
    516 
    517 
    518       ////update data of already existing facts
    519       //foreach (var notFinishedTask in factTaskDao.GetNotFinishedTasks()) {
    520       //  var ntc = newTasks.Where(x => x.TaskId == notFinishedTask.TaskId);
    521       //  if (ntc.Any()) {
    522       //    var x = ntc.Single();
    523       //    var taskData = CalculateFactTaskData(x.StateLogs);
    524 
    525       //    notFinishedTask.StartTime = taskData.StartTime;
    526       //    notFinishedTask.EndTime = taskData.EndTime;
    527       //    notFinishedTask.LastClientId = x.LastClientId;
    528       //    notFinishedTask.Priority = x.Priority;
    529       //    notFinishedTask.CoresRequired = x.CoresRequired;
    530       //    notFinishedTask.MemoryRequired = x.MemoryRequired;
    531       //    notFinishedTask.NumCalculationRuns = taskData.CalculationRuns;
    532       //    notFinishedTask.NumRetries = taskData.Retries;
    533       //    notFinishedTask.WaitingTime = taskData.WaitingTime;
    534       //    notFinishedTask.CalculatingTime = taskData.CalculatingTime;
    535       //    notFinishedTask.TransferTime = taskData.TransferTime;
    536       //    notFinishedTask.TaskState = x.State;
    537       //    notFinishedTask.Exception = taskData.Exception;
    538       //    notFinishedTask.InitialWaitingTime = taskData.InitialWaitingTime;
    539       //  }
    540       //}
     516      var newFactTasks = (from x in newAndNotFinishedTasks
     517                          where !x.NotFinishedTask
     518                          let taskData = CalculateFactTaskData(x.StateLogs)
     519                          select new FactTask {
     520                            TaskId = x.TaskId,
     521                            JobId = x.JobId,
     522                            StartTime = taskData.StartTime,
     523                            EndTime = taskData.EndTime,
     524                            LastClientId = x.LastClientId,
     525                            Priority = x.Priority,
     526                            CoresRequired = x.CoresRequired,
     527                            MemoryRequired = x.MemoryRequired,
     528                            NumCalculationRuns = taskData.CalculationRuns,
     529                            NumRetries = taskData.Retries,
     530                            WaitingTime = taskData.WaitingTime,
     531                            CalculatingTime = taskData.CalculatingTime,
     532                            TransferTime = taskData.TransferTime,
     533                            TaskState = x.State,
     534                            Exception = taskData.Exception,
     535                            InitialWaitingTime = taskData.InitialWaitingTime
     536                          }).ToList();
     537      Console.WriteLine("newFactTasks.Count = {0}", newFactTasks.Count);
     538      factTaskDao.Save(newFactTasks);
     539      Console.WriteLine("save of new fact tasks completed");
    541540    }
    542541
     
    551550          return user != null ? user.UserName : UnknownUserName;
    552551        }
    553       }
    554       catch (Exception) {
     552      } catch (Exception) {
    555553        return UnknownUserName;
    556554      }
  • branches/3026_IntegrationIntoSymSpace/HeuristicLab.Services.Hive/3.3/Interfaces/ITaskScheduler.cs

    r17180 r17928  
    2020#endregion
    2121
     22using System;
    2223using System.Collections.Generic;
    23 using HeuristicLab.Services.Hive.DataTransfer;
     24using HeuristicLab.Services.Hive.DataAccess;
    2425
    2526namespace HeuristicLab.Services.Hive {
    2627  public interface ITaskScheduler {
    27     IEnumerable<TaskInfoForScheduler> Schedule(IEnumerable<TaskInfoForScheduler> tasks, int count = 1);
     28    IEnumerable<Guid> Schedule(Slave slave, int count = 1);
    2829  }
    2930}
  • branches/3026_IntegrationIntoSymSpace/HeuristicLab.Services.Hive/3.3/Manager/EventManager.cs

    r17180 r17928  
    2828  public class EventManager : IEventManager {
    2929    private const string SlaveTimeout = "Slave timed out.";
     30    private static readonly TaskState[] CompletedStates = { TaskState.Finished, TaskState.Aborted, TaskState.Failed };
     31
    3032    private IPersistenceManager PersistenceManager {
    3133      get { return ServiceLocator.Instance.PersistenceManager; }
     
    3335
    3436    public void Cleanup() {
     37      Console.WriteLine("started cleanup");
    3538      var pm = PersistenceManager;
    3639
    37       pm.UseTransaction(() => {
    38         FinishJobDeletion(pm);
    39         pm.SubmitChanges();
    40       });
     40      // preemptiv delete obsolete entities
     41      // speeds up job deletion
     42      BatchDelete((p, s) => p.StateLogDao.DeleteObsolete(s), 100, 100, true, pm, "DeleteObsoleteStateLogs");
     43      BatchDelete((p, s) => p.TaskDataDao.DeleteObsolete(s), 100, 20, true, pm, "DeleteObsoleteTaskData");
     44      BatchDelete((p, s) => p.TaskDao.DeleteObsolete(s), 100, 20, false, pm, "DeleteObsoleteTasks");
     45      BatchDelete((p, s) => p.JobDao.DeleteByState(JobState.DeletionPending, s), 100, 20, true, pm, "DeleteObsoleteJobs");
    4146
    42       pm.UseTransaction(() => {
    43         SetTimeoutSlavesOffline(pm);
    44         SetTimeoutTasksWaiting(pm);
    45         DeleteObsoleteSlaves(pm);
    46         pm.SubmitChanges();
    47       });
     47      LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: SetTimeoutSlavesOffline");
     48      Console.WriteLine("5");
     49      pm.UseTransactionAndSubmit(() => { SetTimeoutSlavesOffline(pm); });
     50      LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: SetTimeoutTasksWaiting");
     51      Console.WriteLine("6");
     52      pm.UseTransactionAndSubmit(() => { SetTimeoutTasksWaiting(pm); });
     53      LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: DeleteObsoleteSlaves");
     54      Console.WriteLine("7");
     55      pm.UseTransactionAndSubmit(() => { DeleteObsoleteSlaves(pm); });
     56      LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: AbortObsoleteTasks");
     57      Console.WriteLine("8");
     58      pm.UseTransactionAndSubmit(() => { AbortObsoleteTasks(pm); });
     59      LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: FinishParentTasks");
     60      Console.WriteLine("9");
     61      pm.UseTransactionAndSubmit(() => { FinishParentTasks(pm); });
     62      LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log("HiveJanitor: DONE");
     63      Console.WriteLine("10");
     64    }
    4865
    49       pm.UseTransaction(() => {
    50         FinishParentTasks(pm);
    51         pm.SubmitChanges();
    52       });
     66    private void BatchDelete(
     67      Func<IPersistenceManager, int, int> deletionFunc,
     68      int batchSize,
     69      int maxCalls,
     70      bool limitIsBatchSize,
     71      IPersistenceManager pm,
     72      string logMessage
     73    ) {
     74      int totalDeleted = 0;
     75      while (maxCalls > 0) {
     76        maxCalls--;
     77        LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"HiveJanitor: {logMessage}");
     78        Console.WriteLine($"HiveJanitor: {logMessage}");
     79        var deleted = pm.UseTransactionAndSubmit(() => { return deletionFunc(pm, batchSize); });
     80        LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"HiveJanitor: {logMessage} DONE (deleted {deleted}, {maxCalls} calls left)");
     81        Console.WriteLine($"HiveJanitor: {logMessage} DONE (deleted {deleted}, {maxCalls} calls left)");
     82        totalDeleted += deleted;
     83        if (limitIsBatchSize && deleted < batchSize || deleted <= 0) return;
     84      }
     85      LogFactory.GetLogger(typeof(HiveJanitor).Namespace).Log($"HiveJanitor: Possible rows left to delete (total deleted: {totalDeleted}).");
     86      Console.WriteLine($"HiveJanitor: Possible rows left to delete (total deleted: {totalDeleted}).");
    5387    }
    5488
     
    136170      }
    137171    }
     172
     173    /// <summary>
     174    /// Aborts tasks whose jobs have already been marked for deletion
     175    /// </summary>
     176    /// <param name="pm"></param>
     177    private void AbortObsoleteTasks(IPersistenceManager pm) {
     178      var jobDao = pm.JobDao;
     179      var taskDao = pm.TaskDao;
     180
     181      var obsoleteTasks = (from jobId in jobDao.GetJobIdsByState(JobState.StatisticsPending)
     182                           join task in taskDao.GetAll() on jobId equals task.JobId
     183                           where !CompletedStates.Contains(task.State) && task.Command == null
     184                           select task).ToList();
     185
     186      foreach (var t in obsoleteTasks) {
     187        t.State = TaskState.Aborted;
     188      }
     189    }
    138190  }
    139191}
  • branches/3026_IntegrationIntoSymSpace/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs

    r17180 r17928  
    8383            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
    8484            if (mutexAquired) {
    85               var waitingTasks = pm.UseTransaction(() => taskDao.GetWaitingTasks(slave)
    86                   .Select(x => new TaskInfoForScheduler {
    87                     TaskId = x.TaskId,
    88                     JobId = x.JobId,
    89                     Priority = x.Priority
    90                   })
    91                   .ToList()
    92               );
    93               var availableTasks = TaskScheduler.Schedule(waitingTasks).ToArray();
    94               if (availableTasks.Any()) {
    95                 var task = availableTasks.First();
    96                 AssignTask(pm, slave, task.TaskId);
    97                 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
     85              var scheduledTaskIds = TaskScheduler.Schedule(slave, 1).ToArray();
     86              foreach (var id in scheduledTaskIds) {
     87                actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, id));
    9888              }
    9989            } else {
    100               LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
     90              LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager: The mutex used for scheduling could not be aquired. (HB from Slave {slave.ResourceId})");
    10191            }
    102           }
    103           catch (AbandonedMutexException) {
    104             LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
    105           }
    106           catch (Exception ex) {
    107             LogFactory.GetLogger(this.GetType().Namespace).Log(string.Format("HeartbeatManager threw an exception in ProcessHeartbeat: {0}", ex));
    108           }
    109           finally {
     92          } catch (AbandonedMutexException) {
     93            LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager: The mutex used for scheduling has been abandoned. (HB from Slave {slave.ResourceId})");
     94          } catch (Exception ex) {
     95            LogFactory.GetLogger(this.GetType().Namespace).Log($"HeartbeatManager threw an exception in ProcessHeartbeat (HB from Slave {slave.ResourceId}): {ex}");
     96          } finally {
    11097            if (mutexAquired) mutex.ReleaseMutex();
    11198          }
     
    113100      }
    114101      return actions;
    115     }
    116 
    117     private void AssignTask(IPersistenceManager pm, DA.Slave slave, Guid taskId) {
    118       const DA.TaskState transferring = DA.TaskState.Transferring;
    119       DateTime now = DateTime.Now;
    120       var taskDao = pm.TaskDao;
    121       var stateLogDao = pm.StateLogDao;
    122       pm.UseTransaction(() => {
    123         var task = taskDao.GetById(taskId);
    124         stateLogDao.Save(new DA.StateLog {
    125           State = transferring,
    126           DateTime = now,
    127           TaskId = taskId,
    128           SlaveId = slave.ResourceId,
    129           UserId = null,
    130           Exception = null
    131         });
    132         task.State = transferring;
    133         task.LastHeartbeat = now;
    134         pm.SubmitChanges();
    135       });
    136102    }
    137103
     
    154120      var taskInfos = pm.UseTransaction(() =>
    155121        (from task in taskDao.GetAll()
    156           where taskIds.Contains(task.TaskId)
    157           let lastStateLog = task.StateLogs.OrderByDescending(x => x.DateTime).FirstOrDefault()
    158           select new {
    159             TaskId = task.TaskId,
    160             JobId = task.JobId,
    161             State = task.State,
    162             Command = task.Command,
    163             SlaveId = lastStateLog != null ? lastStateLog.SlaveId : default(Guid)
    164           }).ToList()
     122         where taskIds.Contains(task.TaskId)
     123         let lastStateLog = task.StateLogs.OrderByDescending(x => x.DateTime).FirstOrDefault(x => x.State == DA.TaskState.Transferring)
     124         select new {
     125           TaskId = task.TaskId,
     126           JobId = task.JobId,
     127           State = task.State,
     128           Command = task.Command,
     129           SlaveId = lastStateLog != null ? lastStateLog.SlaveId : Guid.Empty
     130         }).ToList()
    165131      );
    166132
  • branches/3026_IntegrationIntoSymSpace/HeuristicLab.Services.Hive/3.3/MessageContainer.cs

    r17180 r17928  
    2424using HEAL.Attic;
    2525using HeuristicLab.Common;
    26 using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
    2726
    2827namespace HeuristicLab.Services.Hive {
     
    3130  /// the actual message itself and the TaskId, refered by the message
    3231  /// </summary>
    33   [StorableClass]
     32  [StorableType("67DEE81F-81FA-4B47-B043-93DBC2028DB5")]
    3433  [Serializable]
    3534  [DataContract]
    3635  public class MessageContainer : IDeepCloneable {
    37 
     36    [StorableType("A907BDB0-99E3-4EE2-BA31-72FFD29F7B19")]
    3837    public enum MessageType {
    3938      // *** commands from hive server ***
     
    6261
    6362    [StorableConstructor]
    64     protected MessageContainer(bool deserializing) { }
     63    protected MessageContainer(StorableConstructorFlag _) { }
    6564    protected MessageContainer() { }
    6665    public MessageContainer(MessageType message) {
  • branches/3026_IntegrationIntoSymSpace/HeuristicLab.Services.Hive/3.3/Properties/Settings.Designer.cs

    r12961 r17928  
    11//------------------------------------------------------------------------------
    22// <auto-generated>
    3 //     Dieser Code wurde von einem Tool generiert.
    4 //     Laufzeitversion:4.0.30319.42000
     3//     This code was generated by a tool.
     4//     Runtime Version:4.0.30319.42000
    55//
    6 //     Änderungen an dieser Datei können falsches Verhalten verursachen und gehen verloren, wenn
    7 //     der Code erneut generiert wird.
     6//     Changes to this file may cause incorrect behavior and will be lost if
     7//     the code is regenerated.
    88// </auto-generated>
    99//------------------------------------------------------------------------------
     
    1313   
    1414    [global::System.Runtime.CompilerServices.CompilerGeneratedAttribute()]
    15     [global::System.CodeDom.Compiler.GeneratedCodeAttribute("Microsoft.VisualStudio.Editors.SettingsDesigner.SettingsSingleFileGenerator", "14.0.0.0")]
     15    [global::System.CodeDom.Compiler.GeneratedCodeAttribute("Microsoft.VisualStudio.Editors.SettingsDesigner.SettingsSingleFileGenerator", "16.5.0.0")]
    1616    public sealed partial class Settings : global::System.Configuration.ApplicationSettingsBase {
    1717       
     
    8080        [global::System.Configuration.ApplicationScopedSettingAttribute()]
    8181        [global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
    82         [global::System.Configuration.DefaultSettingValueAttribute("00:00:20")]
     82        [global::System.Configuration.DefaultSettingValueAttribute("00:01:10")]
    8383        public global::System.TimeSpan SchedulingPatience {
    8484            get {
  • branches/3026_IntegrationIntoSymSpace/HeuristicLab.Services.Hive/3.3/Properties/Settings.settings

    r12961 r17928  
    2222    </Setting>
    2323    <Setting Name="SchedulingPatience" Type="System.TimeSpan" Scope="Application">
    24       <Value Profile="(Default)">00:00:20</Value>
     24      <Value Profile="(Default)">00:01:10</Value>
    2525    </Setting>
    2626    <Setting Name="ProfileServicePerformance" Type="System.Boolean" Scope="Application">
  • branches/3026_IntegrationIntoSymSpace/HeuristicLab.Services.Hive/3.3/Scheduler/RoundRobinTaskScheduler.cs

    r17180 r17928  
    2323using System.Collections.Generic;
    2424using System.Linq;
    25 using HeuristicLab.Services.Hive.DataAccess.Interfaces;
    2625using DA = HeuristicLab.Services.Hive.DataAccess;
    2726
    2827namespace HeuristicLab.Services.Hive {
    29   public class RoundRobinTaskScheduler : ITaskScheduler {
    30     private IPersistenceManager PersistenceManager {
    31       get { return ServiceLocator.Instance.PersistenceManager; }
     28  public class RoundRobinTaskScheduler : TaskScheduler {
     29    private class TaskPriorityResult {
     30      public Guid TaskId { get; set; }
     31      public Guid OwnerUserId { get; set; }
    3232    }
    3333
    34     public IEnumerable<TaskInfoForScheduler> Schedule(IEnumerable<TaskInfoForScheduler> tasks, int count = 1) {
    35       if (!tasks.Any()) return Enumerable.Empty<TaskInfoForScheduler>();
     34    protected override IReadOnlyList<Guid> ScheduleInternal(DA.Slave slave, int count) {
     35      var pm = PersistenceManager;
    3636
    37       var pm = PersistenceManager;
    38       var userPriorityDao = pm.UserPriorityDao;
    39       var jobDao = pm.JobDao;
     37      var result = pm.DataContext.ExecuteQuery<TaskPriorityResult>(
     38        GetHighestPriorityWaitingTasksQuery, slave.ResourceId, count, slave.FreeCores, slave.FreeMemory).ToList();
    4039
    41       var userPriorities = pm.UseTransaction(() => userPriorityDao.GetAll()
    42         .OrderBy(x => x.DateEnqueued)
    43         .ToArray()
    44       );
     40      foreach (var row in result) {
     41        pm.DataContext.ExecuteCommand("UPDATE UserPriority SET DateEnqueued = SYSDATETIME() WHERE UserId = {0}", row.OwnerUserId);
     42      }
    4543
    46       var userIds = userPriorities.Select(x => x.UserId).ToList();
    47       var jobs = pm.UseTransaction(() => {
    48         return jobDao.GetAll()
    49           .Where(x => userIds.Contains(x.OwnerUserId))
    50           .Select(x => new {
    51             Id = x.JobId,
    52             DateCreated = x.DateCreated,
    53             OwnerUserId = x.OwnerUserId
    54           })
    55           .ToList();
    56       });
    57 
    58       var taskJobRelations = tasks.Join(jobs,
    59         task => task.JobId,
    60         job => job.Id,
    61         (task, job) => new { Task = task, JobInfo = job })
    62         .OrderByDescending(x => x.Task.Priority)
    63         .ToList();
    64 
    65       var scheduledTasks = new List<TaskInfoForScheduler>();
    66       int priorityIndex = 0;
    67 
    68       if (count == 0 || count > taskJobRelations.Count) count = taskJobRelations.Count;
    69 
    70       for (int i = 0; i < count; i++) {
    71         var defaultEntry = taskJobRelations.First(); // search first task which is not included yet
    72         var priorityEntries = taskJobRelations.Where(x => x.JobInfo.OwnerUserId == userPriorities[priorityIndex].UserId).ToArray(); // search for tasks with desired user priority
    73         while (!priorityEntries.Any() && priorityIndex < userPriorities.Length - 1) {
    74           priorityIndex++;
    75           priorityEntries = taskJobRelations.Where(x => x.JobInfo.OwnerUserId == userPriorities[priorityIndex].UserId).ToArray();
    76         }
    77         if (priorityEntries.Any()) { // tasks with desired user priority found
    78           var priorityEntry = priorityEntries.OrderByDescending(x => x.Task.Priority).ThenBy(x => x.JobInfo.DateCreated).First();
    79           if (defaultEntry.Task.Priority <= priorityEntry.Task.Priority) {
    80             taskJobRelations.Remove(priorityEntry);
    81             scheduledTasks.Add(priorityEntry.Task);
    82             UpdateUserPriority(pm, userPriorities[priorityIndex]);
    83             priorityIndex++;
    84           } else { // there are other tasks with higher priorities
    85             taskJobRelations.Remove(defaultEntry);
    86             scheduledTasks.Add(defaultEntry.Task);
    87           }
    88         } else {
    89           taskJobRelations.Remove(defaultEntry);
    90           scheduledTasks.Add(defaultEntry.Task);
    91         }
    92         if (priorityIndex >= (userPriorities.Length - 1)) priorityIndex = 0;
    93       }
    94       return scheduledTasks;
    95 
     44      return result.Select(x => x.TaskId).ToArray();
    9645    }
    9746
    98     private void UpdateUserPriority(IPersistenceManager pm, DA.UserPriority up) {
    99       pm.UseTransaction(() => {
    100         up.DateEnqueued = DateTime.Now;
    101         pm.SubmitChanges();
    102       });
    103     }
     47    #region Query Strings
     48    private string GetHighestPriorityWaitingTasksQuery = @"
     49WITH rbranch AS(
     50  SELECT ResourceId, ParentResourceId
     51  FROM [Resource]
     52  WHERE ResourceId = {0}
     53  UNION ALL
     54  SELECT r.ResourceId, r.ParentResourceId
     55  FROM [Resource] r
     56  JOIN rbranch rb ON rb.ParentResourceId = r.ResourceId
     57)
     58SELECT TOP ({1}) t.TaskId, j.OwnerUserId
     59FROM Task t
     60  JOIN Job j on t.JobId = j.JobId
     61  JOIN AssignedJobResource ajr on j.JobId = ajr.JobId
     62  JOIN rbranch on ajr.ResourceId = rbranch.ResourceId
     63  JOIN UserPriority u on j.OwnerUserId = u.UserId
     64WHERE NOT (t.IsParentTask = 1 AND t.FinishWhenChildJobsFinished = 1)
     65AND t.TaskState = 'Waiting'
     66AND t.CoresNeeded <= {2}
     67AND t.MemoryNeeded <= {3}
     68AND j.JobState = 'Online'
     69ORDER BY t.Priority DESC, u.DateEnqueued ASC, j.DateCreated ASC";
     70    #endregion
    10471  }
    10572}
  • branches/3026_IntegrationIntoSymSpace/HeuristicLab.Services.Hive/3.3/app.config

    r14748 r17928  
    2828      </setting>
    2929      <setting name="SchedulingPatience" serializeAs="String">
    30         <value>00:00:20</value>
     30        <value>00:01:10</value>
    3131      </setting>
    3232      <setting name="ProfileServicePerformance" serializeAs="String">
Note: See TracChangeset for help on using the changeset viewer.