Free cookie consent management tool by TermsFeed Policy Generator

Changeset 9257 for trunk


Ignore:
Timestamp:
02/28/13 13:57:49 (12 years ago)
Author:
ascheibe
Message:

#2019

  • added missing transactions in the Hive service
  • split scheduling transaction into smaller transactions
  • improved speed of job uploading (AddTask)
  • changed highest isolation level from Serializable to RepeatableRead as phantom reads shouldn't be a problem
Location:
trunk/sources
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Services.Hive.DataAccess/3.3/TransactionManager.cs

    r7259 r9257  
    2222using System;
    2323using System.Transactions;
    24 using HeuristicLab.Services.Hive.DataAccess;
    2524
    26 namespace HeuristicLab.Services.Hive.DataAccess { 
    27   public class TransactionManager : ITransactionManager {   
    28     public void UseTransaction(Action call, bool serializable = false, bool longRunning = false) {
     25namespace HeuristicLab.Services.Hive.DataAccess {
     26  public class TransactionManager : ITransactionManager {
     27    public void UseTransaction(Action call, bool repeatableRead = false, bool longRunning = false) {
    2928      int n = 10;
    3029      while (n > 0) {
    31         TransactionScope transaction = CreateTransaction(serializable, longRunning);
     30        TransactionScope transaction = CreateTransaction(repeatableRead, longRunning);
    3231        try {
    3332          call();
     
    3736        catch (System.Data.SqlClient.SqlException e) {
    3837          n--; // probably deadlock situation, let it roll back and repeat the transaction n times
    39           LogFactory.GetLogger(typeof(TransactionManager).Namespace).Log(string.Format("Exception occured, repeating transaction {0} more times. Details: {1}", n, e.ToString()));         
     38          LogFactory.GetLogger(typeof(TransactionManager).Namespace).Log(string.Format("Exception occured, repeating transaction {0} more times. Details: {1}", n, e.ToString()));
    4039          if (n <= 0) throw;
    4140        }
     
    4645    }
    4746
    48     public T UseTransaction<T>(Func<T> call, bool serializable = false, bool longRunning = false) {
     47    public T UseTransaction<T>(Func<T> call, bool repeatableRead = false, bool longRunning = false) {
    4948      int n = 10;
    5049      while (n > 0) {
    51         TransactionScope transaction = CreateTransaction(serializable, longRunning);
     50        TransactionScope transaction = CreateTransaction(repeatableRead, longRunning);
    5251        try {
    5352          T result = call();
     
    6867    }
    6968
    70     private TransactionScope CreateTransaction(bool serializable, bool longRunning) {
     69    private TransactionScope CreateTransaction(bool repeatableRead, bool longRunning) {
    7170      var options = new TransactionOptions();
    72       if (serializable)
    73         options.IsolationLevel = IsolationLevel.Serializable;
     71      if (repeatableRead)
     72        options.IsolationLevel = IsolationLevel.RepeatableRead;
    7473      else
    7574        options.IsolationLevel = IsolationLevel.ReadUncommitted;
  • trunk/sources/HeuristicLab.Services.Hive/3.3/HiveDao.cs

    r9219 r9257  
    613613    }
    614614
    615     public void AssignJobToResource(Guid jobId, Guid resourceId) {
    616       using (var db = CreateContext()) {
    617         var job = db.Tasks.Where(x => x.TaskId == jobId).Single();
    618         job.AssignedResources.Add(new AssignedResource() { TaskId = jobId, ResourceId = resourceId });
     615    public void AssignJobToResource(Guid taskId, IEnumerable<Guid> resourceIds) {
     616      using (var db = CreateContext()) {
     617        var task = db.Tasks.Where(x => x.TaskId == taskId).Single();
     618        foreach (Guid rId in resourceIds) {
     619          task.AssignedResources.Add(new AssignedResource() { TaskId = taskId, ResourceId = rId });
     620        }
    619621        db.SubmitChanges();
    620622      }
  • trunk/sources/HeuristicLab.Services.Hive/3.3/HiveService.cs

    r9232 r9257  
    6767        taskData.TaskId = task.Id;
    6868        taskData.LastUpdate = DateTime.Now;
    69         foreach (Guid slaveGroupId in resourceIds) {
    70           dao.AssignJobToResource(task.Id, slaveGroupId);
    71         }
     69        dao.AssignJobToResource(task.Id, resourceIds);
    7270        dao.AddTaskData(taskData);
    7371        dao.UpdateTaskState(task.Id, DA.TaskState.Waiting, null, userManager.CurrentUserId, null);
     
    9593    public IEnumerable<Task> GetTasks() {
    9694      authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client);
    97       var tasks = dao.GetTasks(x => true);
    98       foreach (var task in tasks)
    99         author.AuthorizeForTask(task.Id, Permission.Read);
    100       return tasks;
     95      return trans.UseTransaction(() => {
     96        var tasks = dao.GetTasks(x => true);
     97        foreach (var task in tasks)
     98          author.AuthorizeForTask(task.Id, Permission.Read);
     99        return tasks;
     100      });
    101101    }
    102102
     
    144144      authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave);
    145145      author.AuthorizeForTask(taskId, Permission.Read);
    146       return dao.GetTaskData(taskId);
     146
     147      return trans.UseTransaction(() => {
     148        return dao.GetTaskData(taskId);
     149      });
    147150    }
    148151
     
    150153      authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave);
    151154      author.AuthorizeForTask(taskDto.Id, Permission.Full);
     155
    152156      trans.UseTransaction(() => {
    153157        dao.UpdateTaskAndPlugins(taskDto);
     
    158162      authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave);
    159163      author.AuthorizeForTask(task.Id, Permission.Full);
    160       author.AuthorizeForTask(taskData.TaskId, Permission.Full);
    161       //trans.UseTransaction(() => { // cneumuel: try without transaction
    162       taskData.LastUpdate = DateTime.Now;
    163       dao.UpdateTaskAndPlugins(task);
    164       dao.UpdateTaskData(taskData);
    165       //}, false, true);
     164
     165      trans.UseTransaction(() => {
     166        dao.UpdateTaskAndPlugins(task);
     167      });
     168
     169      trans.UseTransaction(() => {
     170        taskData.LastUpdate = DateTime.Now;
     171        dao.UpdateTaskData(taskData);
     172      });
    166173    }
    167174
     
    415422      authen.AuthenticateForAnyRole(HiveRoles.Slave);
    416423
    417       List<MessageContainer> result = trans.UseTransaction(() => heartbeatManager.ProcessHeartbeat(heartbeat));
     424      List<MessageContainer> result = new List<MessageContainer>();
     425      try {
     426        result = heartbeatManager.ProcessHeartbeat(heartbeat);
     427      }
     428      catch (Exception ex) {
     429        DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Exception processing Heartbeat: " + ex.ToString());
     430      }
    418431
    419432      if (HeuristicLab.Services.Hive.Properties.Settings.Default.TriggerEventManagerInHeartbeat) {
     
    449462    public Plugin GetPlugin(Guid pluginId) {
    450463      authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave);
    451       return dao.GetPlugin(pluginId);
     464      return trans.UseTransaction(() => {
     465        return dao.GetPlugin(pluginId);
     466      });
    452467    }
    453468
    454469    public Plugin GetPluginByHash(byte[] hash) {
    455470      authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave);
    456       return dao.GetPlugins(x => x.Hash == hash).FirstOrDefault();
     471      return trans.UseTransaction(() => {
     472        return dao.GetPlugins(x => x.Hash == hash).FirstOrDefault();
     473      });
    457474    }
    458475
     
    461478    public IEnumerable<Plugin> GetPlugins() {
    462479      authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave);
    463       return dao.GetPlugins(x => x.Hash != null);
     480      return trans.UseTransaction(() => {
     481        return dao.GetPlugins(x => x.Hash != null);
     482      });
    464483    }
    465484
     
    477496    public void DeletePlugin(Guid pluginId) {
    478497      authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave);
    479       dao.DeletePlugin(pluginId);
     498      trans.UseTransaction(() => {
     499        dao.DeletePlugin(pluginId);
     500      });
    480501    }
    481502    #endregion
     
    516537    #region Resource Methods
    517538    public IEnumerable<Resource> GetChildResources(Guid resourceId) {
    518       return dao.GetChildResources(resourceId);
     539      return trans.UseTransaction(() => { return dao.GetChildResources(resourceId); });
    519540    }
    520541    #endregion
     
    523544    public int GetNewHeartbeatInterval(Guid slaveId) {
    524545      authen.AuthenticateForAnyRole(HiveRoles.Slave);
    525       Slave s = dao.GetSlave(slaveId);
     546
     547      Slave s = trans.UseTransaction(() => { return dao.GetSlave(slaveId); });
    526548      if (s != null) {
    527549        return s.HbInterval;
     
    543565    public Slave GetSlave(Guid slaveId) {
    544566      authen.AuthenticateForAnyRole(HiveRoles.Administrator);
    545       return dao.GetSlave(slaveId);
     567      return trans.UseTransaction(() => { return dao.GetSlave(slaveId); });
    546568    }
    547569
    548570    public SlaveGroup GetSlaveGroup(Guid slaveGroupId) {
    549571      authen.AuthenticateForAnyRole(HiveRoles.Administrator);
    550       return dao.GetSlaveGroup(slaveGroupId);
     572      return trans.UseTransaction(() => { return dao.GetSlaveGroup(slaveGroupId); });
    551573    }
    552574
    553575    public IEnumerable<Slave> GetSlaves() {
    554576      authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client);
    555       return dao.GetSlaves(x => true).Where(x => x.OwnerUserId == null
    556                                          || x.OwnerUserId == userManager.CurrentUserId
    557                                          || userManager.VerifyUser(userManager.CurrentUserId, GetResourcePermissions(x.Id).Select(y => y.GrantedUserId).ToList())
    558                                          || authen.IsInRole(HiveRoles.Administrator)).ToArray();
     577      return trans.UseTransaction(() => {
     578        return dao.GetSlaves(x => true).Where(x => x.OwnerUserId == null
     579                                           || x.OwnerUserId == userManager.CurrentUserId
     580                                           || userManager.VerifyUser(userManager.CurrentUserId, GetResourcePermissions(x.Id).Select(y => y.GrantedUserId).ToList())
     581                                           || authen.IsInRole(HiveRoles.Administrator)).ToArray();
     582      });
    559583    }
    560584
    561585    public IEnumerable<SlaveGroup> GetSlaveGroups() {
    562586      authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client);
    563       return dao.GetSlaveGroups(x => true).Where(x => x.OwnerUserId == null
    564                                               || x.OwnerUserId == userManager.CurrentUserId
    565                                               || userManager.VerifyUser(userManager.CurrentUserId, GetResourcePermissions(x.Id).Select(y => y.GrantedUserId).ToList())
    566                                               || authen.IsInRole(HiveRoles.Administrator)).ToArray();
     587      return trans.UseTransaction(() => {
     588        return dao.GetSlaveGroups(x => true).Where(x => x.OwnerUserId == null
     589                                                || x.OwnerUserId == userManager.CurrentUserId
     590                                                || userManager.VerifyUser(userManager.CurrentUserId, GetResourcePermissions(x.Id).Select(y => y.GrantedUserId).ToList())
     591                                                || authen.IsInRole(HiveRoles.Administrator)).ToArray();
     592      });
    567593    }
    568594
     
    717743    #region Statistics Methods
    718744    public IEnumerable<Statistics> GetStatistics() {
    719       return dao.GetStatistics(x => true);
     745      return trans.UseTransaction(() => { return dao.GetStatistics(x => true); });
    720746    }
    721747    public IEnumerable<Statistics> GetStatisticsForTimePeriod(DateTime from, DateTime to) {
    722       return dao.GetStatistics(x => x.Timestamp >= from && x.Timestamp <= to);
     748      return trans.UseTransaction(() => { return dao.GetStatistics(x => x.Timestamp >= from && x.Timestamp <= to); });
    723749    }
    724750    #endregion
  • trunk/sources/HeuristicLab.Services.Hive/3.3/Interfaces/IHiveDao.cs

    r9219 r9257  
    114114    void UpdateResource(DT.Resource dto);
    115115    void DeleteResource(Guid id);
    116     void AssignJobToResource(Guid jobId, Guid resourceId);
     116    void AssignJobToResource(Guid taskId, IEnumerable<Guid> resourceIds);
    117117    IEnumerable<DT.Resource> GetAssignedResources(Guid jobId);
    118118    IEnumerable<DT.Resource> GetParentResources(Guid resourceId);
  • trunk/sources/HeuristicLab.Services.Hive/3.3/Manager/EventManager.cs

    r7862 r9257  
    5050        SetTimeoutTasksWaiting();
    5151        DeleteObsoleteSlaves();
    52       }, true);
     52      });
    5353
    5454      trans.UseTransaction(() => {
    5555        FinishParentTasks();
    5656        UpdateStatistics();
    57       }, false);
     57      });
    5858    }
    5959
  • trunk/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs

    r9123 r9257  
    3737      get { return ServiceLocator.Instance.TaskScheduler; }
    3838    }
     39    private DataAccess.ITransactionManager trans {
     40      get { return ServiceLocator.Instance.TransactionManager; }
     41    }
    3942
    4043    /// <summary>
     
    4447    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
    4548      List<MessageContainer> actions = new List<MessageContainer>();
    46       Slave slave = dao.GetSlave(heartbeat.SlaveId);
     49      Slave slave = null;
     50      slave = trans.UseTransaction(() => { return dao.GetSlave(heartbeat.SlaveId); });
     51
    4752      if (slave == null) {
    4853        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
     
    6267        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
    6368        slave.LastHeartbeat = DateTime.Now;
    64         dao.UpdateSlave(slave);
     69
     70        trans.UseTransaction(() => { dao.UpdateSlave(slave); });
    6571
    6672        // update task data
     
    7682              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
    7783            else {
    78               var availableTasks = taskScheduler.Schedule(dao.GetWaitingTasks(slave));
     84              IEnumerable<TaskInfoForScheduler> availableTasks = null;
     85              availableTasks = trans.UseTransaction(() => { return taskScheduler.Schedule(dao.GetWaitingTasks(slave)); });
    7986              if (availableTasks.Any()) {
    8087                var task = availableTasks.First();
     
    99106
    100107    private void AssignJob(Slave slave, Guid taskId) {
    101       var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.Id, null, null);
     108      trans.UseTransaction(() => {
     109        var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.Id, null, null);
    102110
    103       // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
    104       task.LastHeartbeat = DateTime.Now;
    105       dao.UpdateTask(task);
     111        // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
     112        task.LastHeartbeat = DateTime.Now;
     113        dao.UpdateTask(task);
     114      });
    106115    }
    107116
     
    121130        // process the jobProgresses
    122131        foreach (var jobProgress in heartbeat.JobProgress) {
    123           Task curTask = dao.GetTask(jobProgress.Key);
     132          Task curTask = null;
     133          curTask = trans.UseTransaction(() => { return dao.GetTask(jobProgress.Key); });
    124134          if (curTask == null) {
    125135            // task does not exist in db
     
    150160                  break;
    151161              }
    152               dao.UpdateTask(curTask);
     162              trans.UseTransaction(() => { dao.UpdateTask(curTask); });
    153163            }
    154164          }
     
    159169
    160170    private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) {
    161       var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id);
    162       var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id);
    163       return assignedResourceIds.Any(x => slaveResourceIds.Contains(x));
     171      return trans.UseTransaction(() => {
     172        var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id);
     173        var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id);
     174        return assignedResourceIds.Any(x => slaveResourceIds.Contains(x));
     175      });
    164176    }
    165177
    166178    private bool SlaveIsAllowedToCalculate(Guid slaveId) {
    167179      // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also
    168       return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0);
     180      return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0); });
    169181    }
    170182
    171183    private bool ShutdownSlaveComputer(Guid slaveId) {
    172       return dao.GetParentResources(slaveId).Any(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() != 0);
     184      return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).Any(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() != 0); });
    173185    }
    174186  }
Note: See TracChangeset for help on using the changeset viewer.