Changeset 9385


Ignore:
Timestamp:
04/19/13 17:16:49 (7 years ago)
Author:
pfleck
Message:

#2030
Replaced lazy loading with specialized queries.
Compiled queries used for Heardbeat queries.
Changed result types to IQueryable<T> for later query modification.

Location:
branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/HiveDao.cs

    r9381 r9385  
    2222using System;
    2323using System.Collections.Generic;
     24using System.Data.Linq;
    2425using System.Linq;
    2526using System.Linq.Expressions;
     
    4546    public Task GetTaskDA(Guid id) {
    4647      var db = HiveOperationContext.Current.DataContext;
    47       return db.Tasks.SingleOrDefault(x => x.TaskId == id);
    48     }
     48      return GetTaskByIdQuery(db, id).SingleOrDefault();
     49    }
     50
     51    private static Func<HiveDataContext, Guid, IQueryable<Task>> GetTaskByIdQuery = CompiledQuery.Compile((HiveDataContext db, Guid id) =>
     52      from t in db.Tasks
     53      where t.TaskId == id
     54      select t
     55    );
    4956
    5057    public IEnumerable<DT.Task> GetTasks(Expression<Func<Task, bool>> predicate) {
     
    205212    }
    206213
    207     public IEnumerable<TaskInfoForScheduler> GetWaitingTasksDA(Slave slave) {
     214    public IQueryable<TaskInfoForScheduler> GetWaitingTasksDA(Slave slave) {
    208215      var db = HiveOperationContext.Current.DataContext;
     216
    209217      var parentResources = GetParentResourcesDA(slave.ResourceId);
    210218      var resourceIds = parentResources.Select(x => x.ResourceId);
     
    213221      //we skip this step because it's wasted runtime
    214222
    215       var query = from ar in db.AssignedResources
    216                   where resourceIds.Contains(ar.ResourceId)
    217                      && !(ar.Task.IsParentTask && ar.Task.FinishWhenChildJobsFinished)
    218                      && ar.Task.State == TaskState.Waiting
    219                      && ar.Task.CoresNeeded <= slave.FreeCores
    220                      && ar.Task.MemoryNeeded <= slave.FreeMemory
    221                   select new TaskInfoForScheduler() { TaskId = ar.Task.TaskId, JobId = ar.Task.JobId, Priority = ar.Task.Priority };
    222       var waitingTasks = query.ToArray();
    223       return waitingTasks;
    224     }
     223      return from ar in db.AssignedResources
     224             where resourceIds.Contains(ar.ResourceId)
     225                 && !(ar.Task.IsParentTask && ar.Task.FinishWhenChildJobsFinished)
     226                 && ar.Task.State == TaskState.Waiting
     227                 && ar.Task.CoresNeeded <= slave.FreeCores
     228                 && ar.Task.MemoryNeeded <= slave.FreeMemory
     229             select new TaskInfoForScheduler() { TaskId = ar.Task.TaskId, JobId = ar.Task.JobId, Priority = ar.Task.Priority };
     230    }
     231
     232    /*private static Func<HiveDataContext, Guid, Slave, IQueryable<TaskInfoForScheduler>> GetWaitingTasksQuery = CompiledQuery.Compile((HiveDataContext db, Guid id, Slave slave) =>
     233      from ar in db.AssignedResources
     234      where ar.ResourceId == id
     235          && !(ar.Task.IsParentTask && ar.Task.FinishWhenChildJobsFinished)
     236          && ar.Task.State == TaskState.Waiting
     237          && ar.Task.CoresNeeded <= slave.FreeCores
     238          && ar.Task.MemoryNeeded <= slave.FreeMemory
     239      select new TaskInfoForScheduler() { TaskId = ar.Task.TaskId, JobId = ar.Task.JobId, Priority = ar.Task.Priority }
     240    );*/
    225241
    226242    public DT.Task UpdateTaskState(Guid taskId, TaskState taskState, Guid? slaveId, Guid? userId, string exception) {
     
    265281      });
    266282
    267       var task = db.Tasks.SingleOrDefault(x => x.TaskId == taskId);
     283      var task = GetTaskDA(taskId);
    268284      task.State = taskState;
    269285
     
    567583    public Slave GetSlaveDA(Guid id) {
    568584      var db = HiveOperationContext.Current.DataContext;
    569       return db.Resources.OfType<Slave>().SingleOrDefault(x => x.ResourceId == id);
    570     }
     585      return GetSlaveByIdQuery(db, id).SingleOrDefault();
     586    }
     587
     588    private static Func<HiveDataContext, Guid, IQueryable<Slave>> GetSlaveByIdQuery = CompiledQuery.Compile((HiveDataContext db, Guid slaveId) =>
     589      from s in db.Resources.OfType<Slave>()
     590      where s.ResourceId == slaveId
     591      select s
     592    );
    571593
    572594    public IEnumerable<DT.Slave> GetSlaves(Expression<Func<Slave, bool>> predicate) {
     
    714736    }
    715737
     738    public IQueryable<Guid> GetAssignedResourcesIdsDA(Guid taskId) {
     739      var db = HiveOperationContext.Current.DataContext;
     740      return GetAssignedResourcesIdQuery(db, taskId);
     741    }
     742
     743    private static Func<HiveDataContext, Guid, IQueryable<Guid>> GetAssignedResourcesIdQuery = CompiledQuery.Compile((HiveDataContext db, Guid taskId) =>
     744     from ar in db.AssignedResources
     745     where ar.TaskId == taskId
     746     select ar.ResourceId
     747   );
     748
    716749    /// <summary>
    717750    /// Returns all parent resources of a resource (the given resource is also added)
     
    727760    public IEnumerable<Resource> GetParentResourcesDA(Guid resourceId) {
    728761      var db = HiveOperationContext.Current.DataContext;
    729       var resources = new List<Resource>();
    730       CollectParentResources(resources, db.Resources.Where(r => r.ResourceId == resourceId).Single());
    731       return resources;
    732     }
    733 
    734     private void CollectParentResources(List<Resource> resources, Resource resource) {
     762      var child = db.Resources.Single(r => r.ResourceId == resourceId);
     763
     764      yield return child;
     765      while (child.ParentResource != null) {
     766        child = child.ParentResource;
     767        yield return child;
     768      }
     769    }
     770
     771    public IEnumerable<Guid> GetParentResourcesIDsDA(Guid resourceId) {
     772      var db = HiveOperationContext.Current.DataContext;
     773      var child = db.Resources.Single(r => r.ResourceId == resourceId);
     774
     775      yield return resourceId;
     776      while (child.ParentResource != null) {
     777        child = child.ParentResource;
     778        yield return child.ResourceId;
     779      }
     780    }
     781
     782    public IQueryable<int> GetNumberOfDowntimesFromParentResourcesAtCurrentTimeDA(Guid resourceId, DowntimeType type) {
     783      var db = HiveOperationContext.Current.DataContext;
     784
     785      var ids = GetParentResourcesIDsDA(resourceId).ToArray();
     786
     787      return from r in db.Resources
     788             where ids.Contains(r.ResourceId)
     789             select (from d in db.Downtimes
     790                     where d.ResourceId == r.ResourceId && d.DowntimeType == type && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)
     791                     select d).Count();
     792    }
     793
     794    /*private static Func<HiveDataContext, Guid, DowntimeType, int> GetNumberOfDowntimesAtCurrentTimeQuery =
     795      CompiledQuery.Compile((HiveDataContext db, Guid ids, DowntimeType type) =>
     796        (from d in db.Downtimes
     797         where d.ResourceId == ids && d.DowntimeType == type && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)
     798         select d).Count()
     799    );*/
     800
     801    private static void CollectParentResources(ICollection<Resource> resources, Resource resource) {
    735802      if (resource == null) return;
    736803      resources.Add(resource);
  • branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Interfaces/IHiveDao.cs

    r9381 r9385  
    2222using System;
    2323using System.Collections.Generic;
     24using System.Linq;
    2425using System.Linq.Expressions;
    2526using HeuristicLab.Services.Hive.DataAccess;
     
    4142    void DeleteTask(Guid id);
    4243    IEnumerable<TaskInfoForScheduler> GetWaitingTasks(DT.Slave slave);
    43     IEnumerable<TaskInfoForScheduler> GetWaitingTasksDA(Slave slave);
     44    IQueryable<TaskInfoForScheduler> GetWaitingTasksDA(Slave slave);
    4445    IEnumerable<DT.Task> GetParentTasks(IEnumerable<Guid> resourceIds, int count, bool finished);
    4546    DT.Task UpdateTaskState(Guid taskId, TaskState taskState, Guid? slaveId, Guid? userId, string exception);
     
    123124    void AssignJobToResource(Guid taskId, IEnumerable<Guid> resourceIds);
    124125    IEnumerable<DT.Resource> GetAssignedResources(Guid jobId);
     126    IQueryable<Guid> GetAssignedResourcesIdsDA(Guid taskId);
    125127    IEnumerable<DT.Resource> GetParentResources(Guid resourceId);
    126128    IEnumerable<Resource> GetParentResourcesDA(Guid resourceId);
     129    IEnumerable<Guid> GetParentResourcesIDsDA(Guid resourceId);
     130    IQueryable<int> GetNumberOfDowntimesFromParentResourcesAtCurrentTimeDA(Guid resourceId, DowntimeType type);
    127131    IEnumerable<DT.Resource> GetChildResources(Guid resourceId);
    128132    IEnumerable<DT.Task> GetJobsByResourceId(Guid resourceId);
  • branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs

    r9381 r9385  
    7474      });
    7575
    76       // update task data
    77       actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate));
     76      if (slave != null) {
     77        // update task data
     78        actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate));
    7879
    79       // assign new task
    80       if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
    81         bool mutexAquired = false;
    82         var mutex = new Mutex(false, MutexName);
    83         try {
     80        // assign new task
     81        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
     82          bool mutexAquired = false;
     83          var mutex = new Mutex(false, MutexName);
     84          try {
    8485
    85           mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
    86           if (!mutexAquired)
    87             DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
    88           else {
    89             trans.UseTransaction(() => {
    90               IEnumerable<TaskInfoForScheduler> availableTasks = null;
    91               availableTasks = taskScheduler.Schedule(dao.GetWaitingTasksDA(slave));
    92               if (availableTasks.Any()) {
    93                 var task = availableTasks.First();
    94                 AssignJob(slave, task.TaskId);
    95                 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
    96               }
    97             });
     86            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
     87            if (!mutexAquired)
     88              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
     89            else {
     90              trans.UseTransaction(() => {
     91                IEnumerable<TaskInfoForScheduler> availableTasks = null;
     92                availableTasks = taskScheduler.Schedule(dao.GetWaitingTasksDA(slave).ToArray());
     93                if (availableTasks.Any()) {
     94                  var task = availableTasks.First();
     95                  AssignJob(slave, task.TaskId);
     96                  actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
     97                }
     98              });
     99            }
    98100          }
    99         }
    100         catch (AbandonedMutexException) {
    101           DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
    102         }
    103         catch (Exception ex) {
    104           DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());
    105         }
    106         finally {
    107           if (mutexAquired) mutex.ReleaseMutex();
     101          catch (AbandonedMutexException) {
     102            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
     103          }
     104          catch (Exception ex) {
     105            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());
     106          }
     107          finally {
     108            if (mutexAquired) mutex.ReleaseMutex();
     109          }
    108110        }
    109111      }
     
    132134        actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll));
    133135      } else {
    134         trans.UseTransaction(() => {
    135           // process the jobProgresses
    136           foreach (var jobProgress in heartbeat.JobProgress) {
     136        // process the jobProgresses
     137        foreach (var jobProgress in heartbeat.JobProgress) {
     138          trans.UseTransaction(() => {
    137139            var curTask = dao.GetTaskDA(jobProgress.Key);
    138140            if (curTask == null) {
     
    141143              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
    142144            } else {
    143               var currentStateLog = curTask.StateLogs.LastOrDefault();
     145              var currentStateLog = curTask.StateLogs.Last();
    144146              if (currentStateLog.SlaveId == Guid.Empty || currentStateLog.SlaveId != heartbeat.SlaveId) {
    145147                // assigned slave does not match heartbeat
     
    168170              }
    169171            }
    170           }
    171         });
     172          });
     173        }
    172174      }
    173175      return actions;
     
    175177
    176178    private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, DA.Task curTask) {
    177       var assignedResourceIds = curTask.AssignedResources.Select(ar => ar.Resource.ResourceId);
    178       var slaveResourceIds = dao.GetParentResourcesDA(slaveId).Select(x => x.ResourceId);
     179      var assignedResourceIds = dao.GetAssignedResourcesIdsDA(curTask.TaskId);
     180      var slaveResourceIds = dao.GetParentResourcesIDsDA(slaveId).ToArray();
    179181      return assignedResourceIds.Any(r => slaveResourceIds.Contains(r));
    180182    }
    181183
    182184    private bool SlaveIsAllowedToCalculate(Guid slaveId) {
    183       var parentResources = dao.GetParentResourcesDA(slaveId);
     185      var downtimes = dao.GetNumberOfDowntimesFromParentResourcesAtCurrentTimeDA(slaveId, DA.DowntimeType.Offline);
    184186      // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also
    185       return parentResources.All(r => r.Downtimes.Where(d => d.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)).Count() == 0);
     187      return downtimes.All(x => x == 0);
    186188    }
    187189
    188190    private bool ShutdownSlaveComputer(Guid slaveId) {
    189       var parentResources = dao.GetParentResourcesDA(slaveId);
    190 
    191       return parentResources.Any(r => r.Downtimes.Where(d => d.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)).Count() != 0);
     191      var downtimes = dao.GetNumberOfDowntimesFromParentResourcesAtCurrentTimeDA(slaveId, DA.DowntimeType.Shutdown);
     192      return downtimes.Any(x => x != 0);
    192193    }
    193194  }
Note: See TracChangeset for help on using the changeset viewer.