Changeset 9257
- Timestamp:
- 02/28/13 13:57:49 (12 years ago)
- Location:
- trunk/sources
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.Services.Hive.DataAccess/3.3/TransactionManager.cs
r7259 r9257 22 22 using System; 23 23 using System.Transactions; 24 using HeuristicLab.Services.Hive.DataAccess;25 24 26 namespace HeuristicLab.Services.Hive.DataAccess { 27 public class TransactionManager : ITransactionManager { 28 public void UseTransaction(Action call, bool serializable= false, bool longRunning = false) {25 namespace HeuristicLab.Services.Hive.DataAccess { 26 public class TransactionManager : ITransactionManager { 27 public void UseTransaction(Action call, bool repeatableRead = false, bool longRunning = false) { 29 28 int n = 10; 30 29 while (n > 0) { 31 TransactionScope transaction = CreateTransaction( serializable, longRunning);30 TransactionScope transaction = CreateTransaction(repeatableRead, longRunning); 32 31 try { 33 32 call(); … … 37 36 catch (System.Data.SqlClient.SqlException e) { 38 37 n--; // probably deadlock situation, let it roll back and repeat the transaction n times 39 LogFactory.GetLogger(typeof(TransactionManager).Namespace).Log(string.Format("Exception occured, repeating transaction {0} more times. Details: {1}", n, e.ToString())); 38 LogFactory.GetLogger(typeof(TransactionManager).Namespace).Log(string.Format("Exception occured, repeating transaction {0} more times. Details: {1}", n, e.ToString())); 40 39 if (n <= 0) throw; 41 40 } … … 46 45 } 47 46 48 public T UseTransaction<T>(Func<T> call, bool serializable= false, bool longRunning = false) {47 public T UseTransaction<T>(Func<T> call, bool repeatableRead = false, bool longRunning = false) { 49 48 int n = 10; 50 49 while (n > 0) { 51 TransactionScope transaction = CreateTransaction( serializable, longRunning);50 TransactionScope transaction = CreateTransaction(repeatableRead, longRunning); 52 51 try { 53 52 T result = call(); … … 68 67 } 69 68 70 private TransactionScope CreateTransaction(bool serializable, bool longRunning) {69 private TransactionScope CreateTransaction(bool repeatableRead, bool longRunning) { 71 70 var options = new TransactionOptions(); 72 if ( serializable)73 options.IsolationLevel = IsolationLevel. Serializable;71 if (repeatableRead) 72 options.IsolationLevel = IsolationLevel.RepeatableRead; 74 73 else 75 74 options.IsolationLevel = IsolationLevel.ReadUncommitted; -
trunk/sources/HeuristicLab.Services.Hive/3.3/HiveDao.cs
r9219 r9257 613 613 } 614 614 615 public void AssignJobToResource(Guid jobId, Guid resourceId) { 616 using (var db = CreateContext()) { 617 var job = db.Tasks.Where(x => x.TaskId == jobId).Single(); 618 job.AssignedResources.Add(new AssignedResource() { TaskId = jobId, ResourceId = resourceId }); 615 public void AssignJobToResource(Guid taskId, IEnumerable<Guid> resourceIds) { 616 using (var db = CreateContext()) { 617 var task = db.Tasks.Where(x => x.TaskId == taskId).Single(); 618 foreach (Guid rId in resourceIds) { 619 task.AssignedResources.Add(new AssignedResource() { TaskId = taskId, ResourceId = rId }); 620 } 619 621 db.SubmitChanges(); 620 622 } -
trunk/sources/HeuristicLab.Services.Hive/3.3/HiveService.cs
r9232 r9257 67 67 taskData.TaskId = task.Id; 68 68 taskData.LastUpdate = DateTime.Now; 69 foreach (Guid slaveGroupId in resourceIds) { 70 dao.AssignJobToResource(task.Id, slaveGroupId); 71 } 69 dao.AssignJobToResource(task.Id, resourceIds); 72 70 dao.AddTaskData(taskData); 73 71 dao.UpdateTaskState(task.Id, DA.TaskState.Waiting, null, userManager.CurrentUserId, null); … … 95 93 public IEnumerable<Task> GetTasks() { 96 94 authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client); 97 var tasks = dao.GetTasks(x => true); 98 foreach (var task in tasks) 99 author.AuthorizeForTask(task.Id, Permission.Read); 100 return tasks; 95 return trans.UseTransaction(() => { 96 var tasks = dao.GetTasks(x => true); 97 foreach (var task in tasks) 98 author.AuthorizeForTask(task.Id, Permission.Read); 99 return tasks; 100 }); 101 101 } 102 102 … … 144 144 authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave); 145 145 author.AuthorizeForTask(taskId, Permission.Read); 146 return dao.GetTaskData(taskId); 146 147 return trans.UseTransaction(() => { 148 return dao.GetTaskData(taskId); 149 }); 147 150 } 148 151 … … 150 153 authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave); 151 154 author.AuthorizeForTask(taskDto.Id, Permission.Full); 155 152 156 trans.UseTransaction(() => { 153 157 dao.UpdateTaskAndPlugins(taskDto); … … 158 162 authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave); 159 163 author.AuthorizeForTask(task.Id, Permission.Full); 160 author.AuthorizeForTask(taskData.TaskId, Permission.Full); 161 //trans.UseTransaction(() => { // cneumuel: try without transaction 162 taskData.LastUpdate = DateTime.Now; 163 dao.UpdateTaskAndPlugins(task); 164 dao.UpdateTaskData(taskData); 165 //}, false, true); 164 165 trans.UseTransaction(() => { 166 dao.UpdateTaskAndPlugins(task); 167 }); 168 169 trans.UseTransaction(() => { 170 taskData.LastUpdate = DateTime.Now; 171 dao.UpdateTaskData(taskData); 172 }); 166 173 } 167 174 … … 415 422 authen.AuthenticateForAnyRole(HiveRoles.Slave); 416 423 417 List<MessageContainer> result = trans.UseTransaction(() => heartbeatManager.ProcessHeartbeat(heartbeat)); 424 List<MessageContainer> result = new List<MessageContainer>(); 425 try { 426 result = heartbeatManager.ProcessHeartbeat(heartbeat); 427 } 428 catch (Exception ex) { 429 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Exception processing Heartbeat: " + ex.ToString()); 430 } 418 431 419 432 if (HeuristicLab.Services.Hive.Properties.Settings.Default.TriggerEventManagerInHeartbeat) { … … 449 462 public Plugin GetPlugin(Guid pluginId) { 450 463 authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave); 451 return dao.GetPlugin(pluginId); 464 return trans.UseTransaction(() => { 465 return dao.GetPlugin(pluginId); 466 }); 452 467 } 453 468 454 469 public Plugin GetPluginByHash(byte[] hash) { 455 470 authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave); 456 return dao.GetPlugins(x => x.Hash == hash).FirstOrDefault(); 471 return trans.UseTransaction(() => { 472 return dao.GetPlugins(x => x.Hash == hash).FirstOrDefault(); 473 }); 457 474 } 458 475 … … 461 478 public IEnumerable<Plugin> GetPlugins() { 462 479 authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave); 463 return dao.GetPlugins(x => x.Hash != null); 480 return trans.UseTransaction(() => { 481 return dao.GetPlugins(x => x.Hash != null); 482 }); 464 483 } 465 484 … … 477 496 public void DeletePlugin(Guid pluginId) { 478 497 authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave); 479 dao.DeletePlugin(pluginId); 498 trans.UseTransaction(() => { 499 dao.DeletePlugin(pluginId); 500 }); 480 501 } 481 502 #endregion … … 516 537 #region Resource Methods 517 538 public IEnumerable<Resource> GetChildResources(Guid resourceId) { 518 return dao.GetChildResources(resourceId);539 return trans.UseTransaction(() => { return dao.GetChildResources(resourceId); }); 519 540 } 520 541 #endregion … … 523 544 public int GetNewHeartbeatInterval(Guid slaveId) { 524 545 authen.AuthenticateForAnyRole(HiveRoles.Slave); 525 Slave s = dao.GetSlave(slaveId); 546 547 Slave s = trans.UseTransaction(() => { return dao.GetSlave(slaveId); }); 526 548 if (s != null) { 527 549 return s.HbInterval; … … 543 565 public Slave GetSlave(Guid slaveId) { 544 566 authen.AuthenticateForAnyRole(HiveRoles.Administrator); 545 return dao.GetSlave(slaveId);567 return trans.UseTransaction(() => { return dao.GetSlave(slaveId); }); 546 568 } 547 569 548 570 public SlaveGroup GetSlaveGroup(Guid slaveGroupId) { 549 571 authen.AuthenticateForAnyRole(HiveRoles.Administrator); 550 return dao.GetSlaveGroup(slaveGroupId);572 return trans.UseTransaction(() => { return dao.GetSlaveGroup(slaveGroupId); }); 551 573 } 552 574 553 575 public IEnumerable<Slave> GetSlaves() { 554 576 authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client); 555 return dao.GetSlaves(x => true).Where(x => x.OwnerUserId == null 556 || x.OwnerUserId == userManager.CurrentUserId 557 || userManager.VerifyUser(userManager.CurrentUserId, GetResourcePermissions(x.Id).Select(y => y.GrantedUserId).ToList()) 558 || authen.IsInRole(HiveRoles.Administrator)).ToArray(); 577 return trans.UseTransaction(() => { 578 return dao.GetSlaves(x => true).Where(x => x.OwnerUserId == null 579 || x.OwnerUserId == userManager.CurrentUserId 580 || userManager.VerifyUser(userManager.CurrentUserId, GetResourcePermissions(x.Id).Select(y => y.GrantedUserId).ToList()) 581 || authen.IsInRole(HiveRoles.Administrator)).ToArray(); 582 }); 559 583 } 560 584 561 585 public IEnumerable<SlaveGroup> GetSlaveGroups() { 562 586 authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client); 563 return dao.GetSlaveGroups(x => true).Where(x => x.OwnerUserId == null 564 || x.OwnerUserId == userManager.CurrentUserId 565 || userManager.VerifyUser(userManager.CurrentUserId, GetResourcePermissions(x.Id).Select(y => y.GrantedUserId).ToList()) 566 || authen.IsInRole(HiveRoles.Administrator)).ToArray(); 587 return trans.UseTransaction(() => { 588 return dao.GetSlaveGroups(x => true).Where(x => x.OwnerUserId == null 589 || x.OwnerUserId == userManager.CurrentUserId 590 || userManager.VerifyUser(userManager.CurrentUserId, GetResourcePermissions(x.Id).Select(y => y.GrantedUserId).ToList()) 591 || authen.IsInRole(HiveRoles.Administrator)).ToArray(); 592 }); 567 593 } 568 594 … … 717 743 #region Statistics Methods 718 744 public IEnumerable<Statistics> GetStatistics() { 719 return dao.GetStatistics(x => true);745 return trans.UseTransaction(() => { return dao.GetStatistics(x => true); }); 720 746 } 721 747 public IEnumerable<Statistics> GetStatisticsForTimePeriod(DateTime from, DateTime to) { 722 return dao.GetStatistics(x => x.Timestamp >= from && x.Timestamp <= to);748 return trans.UseTransaction(() => { return dao.GetStatistics(x => x.Timestamp >= from && x.Timestamp <= to); }); 723 749 } 724 750 #endregion -
trunk/sources/HeuristicLab.Services.Hive/3.3/Interfaces/IHiveDao.cs
r9219 r9257 114 114 void UpdateResource(DT.Resource dto); 115 115 void DeleteResource(Guid id); 116 void AssignJobToResource(Guid jobId, Guid resourceId);116 void AssignJobToResource(Guid taskId, IEnumerable<Guid> resourceIds); 117 117 IEnumerable<DT.Resource> GetAssignedResources(Guid jobId); 118 118 IEnumerable<DT.Resource> GetParentResources(Guid resourceId); -
trunk/sources/HeuristicLab.Services.Hive/3.3/Manager/EventManager.cs
r7862 r9257 50 50 SetTimeoutTasksWaiting(); 51 51 DeleteObsoleteSlaves(); 52 } , true);52 }); 53 53 54 54 trans.UseTransaction(() => { 55 55 FinishParentTasks(); 56 56 UpdateStatistics(); 57 } , false);57 }); 58 58 } 59 59 -
trunk/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
r9123 r9257 37 37 get { return ServiceLocator.Instance.TaskScheduler; } 38 38 } 39 private DataAccess.ITransactionManager trans { 40 get { return ServiceLocator.Instance.TransactionManager; } 41 } 39 42 40 43 /// <summary> … … 44 47 public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) { 45 48 List<MessageContainer> actions = new List<MessageContainer>(); 46 Slave slave = dao.GetSlave(heartbeat.SlaveId); 49 Slave slave = null; 50 slave = trans.UseTransaction(() => { return dao.GetSlave(heartbeat.SlaveId); }); 51 47 52 if (slave == null) { 48 53 actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello)); … … 62 67 slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle; 63 68 slave.LastHeartbeat = DateTime.Now; 64 dao.UpdateSlave(slave); 69 70 trans.UseTransaction(() => { dao.UpdateSlave(slave); }); 65 71 66 72 // update task data … … 76 82 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); 77 83 else { 78 var availableTasks = taskScheduler.Schedule(dao.GetWaitingTasks(slave)); 84 IEnumerable<TaskInfoForScheduler> availableTasks = null; 85 availableTasks = trans.UseTransaction(() => { return taskScheduler.Schedule(dao.GetWaitingTasks(slave)); }); 79 86 if (availableTasks.Any()) { 80 87 var task = availableTasks.First(); … … 99 106 100 107 private void AssignJob(Slave slave, Guid taskId) { 101 var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.Id, null, null); 108 trans.UseTransaction(() => { 109 var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.Id, null, null); 102 110 103 // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout) 104 task.LastHeartbeat = DateTime.Now; 105 dao.UpdateTask(task); 111 // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout) 112 task.LastHeartbeat = DateTime.Now; 113 dao.UpdateTask(task); 114 }); 106 115 } 107 116 … … 121 130 // process the jobProgresses 122 131 foreach (var jobProgress in heartbeat.JobProgress) { 123 Task curTask = dao.GetTask(jobProgress.Key); 132 Task curTask = null; 133 curTask = trans.UseTransaction(() => { return dao.GetTask(jobProgress.Key); }); 124 134 if (curTask == null) { 125 135 // task does not exist in db … … 150 160 break; 151 161 } 152 dao.UpdateTask(curTask);162 trans.UseTransaction(() => { dao.UpdateTask(curTask); }); 153 163 } 154 164 } … … 159 169 160 170 private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) { 161 var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id); 162 var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id); 163 return assignedResourceIds.Any(x => slaveResourceIds.Contains(x)); 171 return trans.UseTransaction(() => { 172 var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id); 173 var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id); 174 return assignedResourceIds.Any(x => slaveResourceIds.Contains(x)); 175 }); 164 176 } 165 177 166 178 private bool SlaveIsAllowedToCalculate(Guid slaveId) { 167 179 // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also 168 return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0);180 return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0); }); 169 181 } 170 182 171 183 private bool ShutdownSlaveComputer(Guid slaveId) { 172 return dao.GetParentResources(slaveId).Any(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() != 0);184 return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).Any(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() != 0); }); 173 185 } 174 186 }
Note: See TracChangeset
for help on using the changeset viewer.