Changeset 9397


Ignore:
Timestamp:
04/25/13 19:52:24 (7 years ago)
Author:
pfleck
Message:

#2030
Changed recursive Linq2Sql queries to native SQL queries.

Location:
branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/HiveDao.cs

    r9393 r9397  
    3131
    3232    #region Task Methods
    33     public Task GetTaskById(Guid id) {
    34       return GetTaskByIdQuery(Db, id).SingleOrDefault();
    35     }
    36 
    37     private static Func<HiveDataContext, Guid, IQueryable<Task>> GetTaskByIdQuery = CompiledQuery.Compile((HiveDataContext db, Guid id) =>
     33    public Task GetTaskById(Guid taskId) {
     34      return GetTaskByIdQuery(Db, taskId).SingleOrDefault();
     35    }
     36
     37    private static Func<HiveDataContext, Guid, IQueryable<Task>> GetTaskByIdQuery = CompiledQuery.Compile((HiveDataContext db, Guid taskId) =>
    3838      from t in db.Tasks
    39       where t.TaskId == id
     39      where t.TaskId == taskId
    4040      select t
    4141    );
    4242
    43     public void UpdateTaskAndPlugins(Task task) {       /*
    44       foreach (var pluginId in task.RequiredPlugins.Select(p => p.PluginId)) {
    45         if (db.RequiredPlugins.Count(p => p.PluginId == pluginId) == 0) {
    46           db.RequiredPlugins.InsertOnSubmit(new RequiredPlugin() { TaskId = task.TaskId, PluginId = pluginId });
    47         }
    48       }
    49       db.SubmitChanges();                                   */
    50     }
    51 
    52     public void UpdateTask(Task task) {
    53       Db.SubmitChanges();
    54     }
    55 
    56     public IQueryable<TaskInfoForScheduler> GetWaitingTasks(Slave slave) {
    57       var parentResources = GetParentResources(slave.ResourceId);
    58       var resourceIds = parentResources.Select(x => x.ResourceId);
     43    public Tuple<Task, Guid?> GetTaskByIdAndLastStateLogSlaveId(Guid taskId) {
     44      return GetTaskByIdAndLastStateLogSlaveIdQuery(Db, taskId).SingleOrDefault();
     45    }
     46
     47    private static Func<HiveDataContext, Guid, IQueryable<Tuple<Task, Guid?>>> GetTaskByIdAndLastStateLogSlaveIdQuery = CompiledQuery.Compile((HiveDataContext db, Guid taskId) =>
     48       from t in db.Tasks
     49       join sl in db.StateLogs on t.TaskId equals sl.TaskId
     50       where t.TaskId == taskId
     51       orderby sl.DateTime descending
     52       group sl by t into logs
     53       select new Tuple<Task, Guid?>(logs.Key, logs.First().SlaveId)
     54    );
     55
     56    private const string GetWaitingTasksQueryString = @"
     57      WITH pr AS (
     58        SELECT ResourceId, ParentResourceId
     59        FROM [Resource]
     60        WHERE ResourceId = {0}
     61        UNION ALL
     62        SELECT r.ResourceId, r.ParentResourceId
     63        FROM [Resource] r JOIN pr ON r.ResourceId = pr.ParentResourceId
     64      )
     65      SELECT DISTINCT t.TaskId, t.JobId, t.Priority
     66      FROM pr JOIN AssignedResources ar ON ar.ResourceId = pr.ResourceId
     67          JOIN Task t ON t.TaskId = ar.TaskId
     68      WHERE NOT (t.IsParentTask = 1 AND t.FinishWhenChildJobsFinished = 1)
     69          AND t.TaskState = {1}
     70          AND t.CoresNeeded <= {2}
     71          AND t.MemoryNeeded <= {3}
     72    ";
     73
     74    public IEnumerable<TaskInfoForScheduler> GetWaitingTasks(Slave slave) {
    5975      //Originally we checked here if there are parent tasks which should be calculated (with GetParentTasks(resourceIds, count, false);).
    6076      //Because there is at the moment no case where this makes sense (there don't exist parent tasks which need to be calculated),
    6177      //we skip this step because it's wasted runtime
    62 
    63       return from ar in Db.AssignedResources
    64              where resourceIds.Contains(ar.ResourceId)
    65                  && !(ar.Task.IsParentTask && ar.Task.FinishWhenChildJobsFinished)
    66                  && ar.Task.State == TaskState.Waiting
    67                  && ar.Task.CoresNeeded <= slave.FreeCores
    68                  && ar.Task.MemoryNeeded <= slave.FreeMemory
    69              select new TaskInfoForScheduler() { TaskId = ar.Task.TaskId, JobId = ar.Task.JobId, Priority = ar.Task.Priority };
    70     }
    71 
    72     /*private static Func<HiveDataContext, Guid, Slave, IQueryable<TaskInfoForScheduler>> GetWaitingTasksQuery = CompiledQuery.Compile((HiveDataContext db, Guid id, Slave slave) =>
    73       from ar in db.AssignedResources
    74       where ar.ResourceId == id
    75           && !(ar.Task.IsParentTask && ar.Task.FinishWhenChildJobsFinished)
    76           && ar.Task.State == TaskState.Waiting
    77           && ar.Task.CoresNeeded <= slave.FreeCores
    78           && ar.Task.MemoryNeeded <= slave.FreeMemory
    79       select new TaskInfoForScheduler() { TaskId = ar.Task.TaskId, JobId = ar.Task.JobId, Priority = ar.Task.Priority }
    80     );*/
     78      return Db.ExecuteQuery<TaskInfoForScheduler>(GetWaitingTasksQueryString, slave.ResourceId, Enum.GetName(typeof(TaskState), TaskState.Waiting), slave.FreeCores, slave.FreeMemory);
     79    }
     80
     81    public IQueryable<DT.LightweightTask> GetLightweightTasks(Guid jobId) {
     82      return GetLightweightTasksQuery(Db, jobId);
     83    }
     84
     85    private static Func<HiveDataContext, Guid, IQueryable<DT.LightweightTask>> GetLightweightTasksQuery = CompiledQuery.Compile((HiveDataContext db, Guid jobId) =>
     86        from task in db.Tasks
     87        where task.JobId == jobId
     88        select new DT.LightweightTask {
     89          Id = task.TaskId,
     90          ExecutionTime = TimeSpan.FromMilliseconds(task.ExecutionTimeMs),
     91          ParentTaskId = task.ParentTaskId,
     92          StateLog = task.StateLogs.Select(sl => ConvertStateLog(sl)).ToList(),
     93          State = ConvertTaskState(task.State),
     94          Command = ConvertCommand(task.Command),
     95          LastTaskDataUpdate = task.JobData.LastUpdate
     96        }
     97    );
     98
     99    static Func<StateLog, DT.StateLog> ConvertStateLog = sl => DT.Convert.ToDto(sl);
     100    static Func<TaskState, DT.TaskState> ConvertTaskState = ts => DT.Convert.ToDto(ts);
     101    static Func<Command?, DT.Command?> ConvertCommand = c => DT.Convert.ToDto(c);
     102
     103    public void UpdateTask(Task task) {
     104      Db.SubmitChanges();
     105    }
    81106
    82107    public Task UpdateTaskState(Guid taskId, TaskState taskState, Guid? slaveId, Guid? userId, string exception) {
     
    97122      return task;
    98123    }
     124
     125    private const string TaskIsAllowedToBeCalculatedBySlaveQueryString = @"
     126      WITH pr AS (
     127        SELECT ResourceId, ParentResourceId
     128        FROM [Resource]
     129        WHERE ResourceId = {0}
     130        UNION ALL
     131        SELECT r.ResourceId, r.ParentResourceId
     132        FROM [Resource] r JOIN pr ON r.ResourceId = pr.ParentResourceId
     133      )
     134      SELECT COUNT(ar.TaskId)
     135      FROM pr JOIN AssignedResources ar ON pr.ResourceId = ar.ResourceId
     136      WHERE ar.TaskId = {1}
     137    ";
     138
     139    public bool TaskIsAllowedToBeCalculatedBySlave(Guid taskId, Guid slaveId) {
     140      return Db.ExecuteQuery<int>(TaskIsAllowedToBeCalculatedBySlaveQueryString, slaveId, taskId).First() > 0;
     141    }
    99142    #endregion
    100143
     
    155198      Db.SubmitChanges();
    156199    }
     200
     201    private const string DowntimeQueryString = @"
     202      WITH pr AS (
     203        SELECT ResourceId, ParentResourceId
     204        FROM [Resource]
     205        WHERE ResourceId = {0}
     206        UNION ALL
     207        SELECT r.ResourceId, r.ParentResourceId
     208        FROM [Resource] r JOIN pr ON r.ResourceId = pr.ParentResourceId
     209      )
     210      SELECT COUNT(dt.DowntimeId)
     211      FROM pr JOIN [Downtime] dt ON pr.ResourceId = dt.ResourceId
     212      WHERE {1} BETWEEN dt.StartDate AND dt.EndDate
     213        AND dt.DowntimeType = {2}
     214      ";
     215
     216    public bool SlaveHasToShutdownComputer(Guid slaveId) {
     217      return Db.ExecuteQuery<int>(DowntimeQueryString, slaveId, DateTime.Now, DowntimeType.Shutdown).First() > 0;
     218    }
     219
     220    public bool SlaveIsAllowedToCalculate(Guid slaveId) {
     221      return Db.ExecuteQuery<int>(DowntimeQueryString, slaveId, DateTime.Now, DowntimeType.Offline).First() == 0;
     222    }
    157223    #endregion
    158224
     
    162228
    163229    #region Resource Methods
    164     public IQueryable<Guid> GetAssignedResourcesIds(Guid taskId) {
    165       return GetAssignedResourcesIdQuery(Db, taskId);
    166     }
    167 
    168     private static Func<HiveDataContext, Guid, IQueryable<Guid>> GetAssignedResourcesIdQuery = CompiledQuery.Compile((HiveDataContext db, Guid taskId) =>
    169       from ar in db.AssignedResources
    170       where ar.TaskId == taskId
    171       select ar.ResourceId
    172     );
    173 
    174     public IEnumerable<Resource> GetParentResources(Guid resourceId) {
    175       var child = Db.Resources.Single(r => r.ResourceId == resourceId);
    176 
    177       yield return child;
    178       while (child.ParentResource != null) {
    179         child = child.ParentResource;
    180         yield return child;
    181       }
    182     }
    183 
    184     public IEnumerable<Guid> GetParentResourcesIDs(Guid resourceId) {
    185       var child = Db.Resources.Single(r => r.ResourceId == resourceId);
    186 
    187       yield return resourceId;
    188       while (child.ParentResource != null) {
    189         child = child.ParentResource;
    190         yield return child.ResourceId;
    191       }
    192     }
    193 
    194     private static void CollectParentResources(ICollection<Resource> resources, Resource resource) {
    195       if (resource == null) return;
    196       resources.Add(resource);
    197       CollectParentResources(resources, resource.ParentResource);
    198     }
    199230    #endregion
    200231
     
    212243
    213244    #region Downtime Methods
    214     public IQueryable<int> GetNumberOfDowntimesFromParentResourcesAtCurrentTime(Guid resourceId, DowntimeType type) {
    215       var ids = GetParentResourcesIDs(resourceId).ToArray();
    216 
    217       return from r in Db.Resources
    218              where ids.Contains(r.ResourceId)
    219              select (from d in Db.Downtimes
    220                      where d.ResourceId == r.ResourceId && d.DowntimeType == type && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)
    221                      select d).Count();
    222     }
    223 
    224     /*private static Func<HiveDataContext, Guid, DowntimeType, int> GetNumberOfDowntimesAtCurrentTimeQuery =
    225       CompiledQuery.Compile((HiveDataContext db, Guid ids, DowntimeType type) =>
    226         (from d in db.Downtimes
    227          where d.ResourceId == ids && d.DowntimeType == type && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)
    228          select d).Count()
    229     );*/
    230 
    231245    #endregion
    232246
  • branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/HiveService.cs

    r9391 r9397  
    132132
    133133      return trans.UseTransaction(() => {
    134         return dtoDao.GetLightweightTasks(task => task.JobId == jobId).ToArray();
     134        return dao.GetLightweightTasks(jobId).ToArray();
    135135      }, false, true);
    136136    }
     
    217217        }
    218218
    219         //dtoDao.UpdateTaskAndPluginsDA(task); no idea why this is needed
     219        //dao.UpdateTaskAndPlugins(task); no idea why this is needed
    220220        return DT.Convert.ToDto(task);
    221221      });
  • branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Interfaces/IHiveDao.cs

    r9393 r9397  
    2929  public interface IHiveDao {
    3030    #region Task Methods
    31     Task GetTaskById(Guid id);
     31    Task GetTaskById(Guid task);
     32    Tuple<Task, Guid?> GetTaskByIdAndLastStateLogSlaveId(Guid taskId);
    3233
    33     IQueryable<TaskInfoForScheduler> GetWaitingTasks(Slave slave);
     34    IEnumerable<TaskInfoForScheduler> GetWaitingTasks(Slave slave);
     35    IQueryable<DT.LightweightTask> GetLightweightTasks(Guid jobId);
    3436
    3537    void UpdateTask(Task task);
     
    3739    Task UpdateTaskState(Guid taskId, TaskState taskState, Guid? slaveId, Guid? userId, string exception);
    3840
     41    bool TaskIsAllowedToBeCalculatedBySlave(Guid taskId, Guid slaveId);
    3942    #endregion
    4043
     
    6871
    6972    void UpdateSlave(Slave slave);
     73
     74    bool SlaveHasToShutdownComputer(Guid slaveId);
     75    bool SlaveIsAllowedToCalculate(Guid slaveId);
    7076    #endregion
    7177
     
    7581
    7682    #region Resource Methods
    77     IQueryable<Guid> GetAssignedResourcesIds(Guid taskId);
    78     IEnumerable<Resource> GetParentResources(Guid resourceId);
    79     IEnumerable<Guid> GetParentResourcesIDs(Guid resourceId);
     83
    8084    #endregion
    8185
     
    9397
    9498    #region Downtime Methods
    95     IQueryable<int> GetNumberOfDowntimesFromParentResourcesAtCurrentTime(Guid resourceId, DowntimeType type);
    9699    #endregion
    97100
  • branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs

    r9391 r9397  
    5151      trans.UseTransaction(() => {
    5252        slave = dao.GetSlaveById(heartbeat.SlaveId);
     53      });
     54      if (slave == null) {
     55        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
     56      } else {
     57        if (heartbeat.HbInterval != slave.HbInterval) {
     58          actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
     59        }
     60        if (dao.SlaveHasToShutdownComputer(slave.ResourceId)) {
     61          actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
     62        }
    5363
    54         if (slave == null) {
    55           actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
    56         } else {
    57           if (heartbeat.HbInterval != slave.HbInterval) {
    58             actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
    59           }
    60           if (ShutdownSlaveComputer(slave.ResourceId)) {
    61             actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
    62           }
     64        // update slave data
     65        slave.FreeCores = heartbeat.FreeCores;
     66        slave.FreeMemory = heartbeat.FreeMemory;
     67        slave.CpuUtilization = heartbeat.CpuUtilization;
     68        slave.IsAllowedToCalculate = dao.SlaveIsAllowedToCalculate(slave.ResourceId);
     69        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
     70        slave.LastHeartbeat = DateTime.Now;
    6371
    64           // update slave data
    65           slave.FreeCores = heartbeat.FreeCores;
    66           slave.FreeMemory = heartbeat.FreeMemory;
    67           slave.CpuUtilization = heartbeat.CpuUtilization;
    68           slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.ResourceId);
    69           slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
    70           slave.LastHeartbeat = DateTime.Now;
    71 
     72        trans.UseTransaction(() => {
    7273          dao.UpdateSlave(slave);
    73         }
    74       });
     74        });
     75      }
    7576
    7677      if (slave != null) {
     
    136137        // process the jobProgresses
    137138        foreach (var jobProgress in heartbeat.JobProgress) {
     139          Tuple<Task, Guid?> taskWithLastStateLogSlaveId = null;
    138140          trans.UseTransaction(() => {
    139             var curTask = dao.GetTaskById(jobProgress.Key);
    140             if (curTask == null) {
    141               // task does not exist in db
    142               actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key));
    143               LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
     141            taskWithLastStateLogSlaveId = dao.GetTaskByIdAndLastStateLogSlaveId(jobProgress.Key);
     142          });
     143          var curTask = taskWithLastStateLogSlaveId != null ? taskWithLastStateLogSlaveId.Item1 : null;
     144          if (curTask == null) {
     145            // task does not exist in db
     146            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key));
     147            LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
     148          } else {
     149            var slaveId = taskWithLastStateLogSlaveId.Item2;
     150            if (slaveId == Guid.Empty || slaveId != heartbeat.SlaveId) {
     151              // assigned slave does not match heartbeat
     152              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
     153              LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask);
     154            } else if (!dao.TaskIsAllowedToBeCalculatedBySlave(curTask.TaskId, heartbeat.SlaveId)) {
     155              // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
     156              actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
    144157            } else {
    145               var currentStateLog = curTask.StateLogs.Last();
    146               if (currentStateLog.SlaveId == Guid.Empty || currentStateLog.SlaveId != heartbeat.SlaveId) {
    147                 // assigned slave does not match heartbeat
    148                 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
    149                 LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask);
    150               } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) {
    151                 // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
    152                 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
    153               } else {
    154                 // save task execution time
    155                 curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds;
    156                 curTask.LastHeartbeat = DateTime.Now;
     158              // save task execution time
     159              curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds;
     160              curTask.LastHeartbeat = DateTime.Now;
    157161
    158                 switch (curTask.Command) {
    159                   case Command.Stop:
    160                     actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId));
    161                     break;
    162                   case Command.Pause:
    163                     actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
    164                     break;
    165                   case Command.Abort:
    166                     actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
    167                     break;
    168                 }
     162              switch (curTask.Command) {
     163                case Command.Stop:
     164                  actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId));
     165                  break;
     166                case Command.Pause:
     167                  actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
     168                  break;
     169                case Command.Abort:
     170                  actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
     171                  break;
     172              }
     173              trans.UseTransaction(() => {
    169174                dao.UpdateTask(curTask);
    170               }
     175              });
    171176            }
    172           });
     177          }
    173178        }
    174179      }
    175180      return actions;
    176181    }
    177 
    178     private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) {
    179       var assignedResourceIds = dao.GetAssignedResourcesIds(curTask.TaskId);
    180       var slaveResourceIds = dao.GetParentResourcesIDs(slaveId).ToArray();
    181       return assignedResourceIds.Any(r => slaveResourceIds.Contains(r));
    182     }
    183 
    184     private bool SlaveIsAllowedToCalculate(Guid slaveId) {
    185       var downtimes = dao.GetNumberOfDowntimesFromParentResourcesAtCurrentTime(slaveId, DowntimeType.Offline);
    186       // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also
    187       return downtimes.All(x => x == 0);
    188     }
    189 
    190     private bool ShutdownSlaveComputer(Guid slaveId) {
    191       var downtimes = dao.GetNumberOfDowntimesFromParentResourcesAtCurrentTime(slaveId, DowntimeType.Shutdown);
    192       return downtimes.Any(x => x != 0);
    193     }
    194182  }
    195183}
Note: See TracChangeset for help on using the changeset viewer.