Changeset 9363 for branches/OaaS/HeuristicLab.Services.Hive/3.3/Manager
- Timestamp:
- 04/16/13 13:13:41 (12 years ago)
- Location:
- branches/OaaS
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/OaaS
- Property svn:ignore
-
old new 21 21 protoc.exe 22 22 _ReSharper.HeuristicLab 3.3 Tests 23 Google.ProtocolBuffers-2.4.1.473.dll 23 24 packages
-
- Property svn:mergeinfo changed
- Property svn:ignore
-
branches/OaaS/HeuristicLab.Services.Hive
-
Property
svn:mergeinfo
set to
(toggle deleted branches)
/trunk/sources/HeuristicLab.Services.Hive merged eligible /branches/Algorithms.GradientDescent/HeuristicLab.Services.Hive 5516-5520 /branches/Benchmarking/sources/HeuristicLab.Services.Hive 6917-7005 /branches/CloningRefactoring/HeuristicLab.Services.Hive 4656-4721 /branches/DataAnalysis Refactoring/HeuristicLab.Services.Hive 5471-5808 /branches/DataAnalysis SolutionEnsembles/HeuristicLab.Services.Hive 5815-6180 /branches/DataAnalysis/HeuristicLab.Services.Hive 4458-4459,4462,4464 /branches/GP.Grammar.Editor/HeuristicLab.Services.Hive 6284-6795 /branches/GP.Symbols (TimeLag, Diff, Integral)/HeuristicLab.Services.Hive 5060 /branches/HiveTaskScheduler/HeuristicLab.Services.Hive 8687-9106 /branches/NET40/sources/HeuristicLab.Services.Hive 5138-5162 /branches/ParallelEngine/HeuristicLab.Services.Hive 5175-5192 /branches/ProblemInstancesRegressionAndClassification/HeuristicLab.Services.Hive 7568-7810 /branches/QAPAlgorithms/HeuristicLab.Services.Hive 6350-6627 /branches/Restructure trunk solution/HeuristicLab.Services.Hive 6828 /branches/RuntimeOptimizer/HeuristicLab.Services.Hive 8943-9078 /branches/ScatterSearch (trunk integration)/HeuristicLab.Services.Hive 7787-8333 /branches/SlaveShutdown/HeuristicLab.Services.Hive 8944-8956 /branches/SuccessProgressAnalysis/HeuristicLab.Services.Hive 5370-5682 /branches/Trunk/HeuristicLab.Services.Hive 6829-6865 /branches/UnloadJobs/HeuristicLab.Services.Hive 9168-9215 /branches/VNS/HeuristicLab.Services.Hive 5594-5752 /branches/histogram/HeuristicLab.Services.Hive 5959-6341
-
Property
svn:mergeinfo
set to
(toggle deleted branches)
-
branches/OaaS/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
r7723 r9363 23 23 using System.Collections.Generic; 24 24 using System.Linq; 25 using System.Threading; 25 26 using HeuristicLab.Services.Hive.DataTransfer; 26 27 using DA = HeuristicLab.Services.Hive.DataAccess; … … 28 29 namespace HeuristicLab.Services.Hive { 29 30 public class HeartbeatManager { 31 private const string MutexName = "HiveTaskSchedulingMutex"; 32 30 33 private IHiveDao dao { 31 34 get { return ServiceLocator.Instance.HiveDao; } 32 35 } 33 private IAuthorizationManager auth { 34 get { return ServiceLocator.Instance.AuthorizationManager; } 36 private ITaskScheduler taskScheduler { 37 get { return ServiceLocator.Instance.TaskScheduler; } 38 } 39 private DataAccess.ITransactionManager trans { 40 get { return ServiceLocator.Instance.TransactionManager; } 35 41 } 36 42 … … 41 47 public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) { 42 48 List<MessageContainer> actions = new List<MessageContainer>(); 43 Slave slave = dao.GetSlave(heartbeat.SlaveId); 49 Slave slave = null; 50 slave = trans.UseTransaction(() => { return dao.GetSlave(heartbeat.SlaveId); }); 51 44 52 if (slave == null) { 45 53 actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello)); … … 47 55 if (heartbeat.HbInterval != slave.HbInterval) { 48 56 actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval)); 57 } 58 if (ShutdownSlaveComputer(slave.Id)) { 59 actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer)); 49 60 } 50 61 … … 56 67 slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle; 57 68 slave.LastHeartbeat = DateTime.Now; 58 dao.UpdateSlave(slave); 69 70 trans.UseTransaction(() => { dao.UpdateSlave(slave); }); 59 71 60 72 // update task data … … 63 75 // assign new task 64 76 if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) { 65 var availableJobs = dao.GetWaitingTasks(slave, 1); 66 if (availableJobs.Count() > 0) { 67 var job = availableJobs.First(); 68 if (AssignJob(slave, job)) 69 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, job.Id)); 77 bool mutexAquired = false; 78 var mutex = new Mutex(false, MutexName); 79 try { 80 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 81 if (!mutexAquired) 82 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); 83 else { 84 IEnumerable<TaskInfoForScheduler> availableTasks = null; 85 availableTasks = trans.UseTransaction(() => { return taskScheduler.Schedule(dao.GetWaitingTasks(slave)); }); 86 if (availableTasks.Any()) { 87 var task = availableTasks.First(); 88 AssignJob(slave, task.TaskId); 89 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 90 } 91 } 92 } 93 catch (AbandonedMutexException) { 94 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned."); 95 } 96 catch (Exception ex) { 97 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString()); 98 } 99 finally { 100 if (mutexAquired) mutex.ReleaseMutex(); 70 101 } 71 102 } … … 74 105 } 75 106 76 // returns true if assignment was successful 77 private bool AssignJob(Slave slave, Task task) { 78 // load task again and check if it is still available (this is an attempt to reduce the race condition which causes multiple heartbeats to get the same task assigned) 79 if (dao.GetTask(task.Id).State != TaskState.Waiting) return false; 107 private void AssignJob(Slave slave, Guid taskId) { 108 trans.UseTransaction(() => { 109 var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.Id, null, null); 80 110 81 task = dao.UpdateTaskState(task.Id, DataAccess.TaskState.Transferring, slave.Id, null, null); 82 83 // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout) 84 task.LastHeartbeat = DateTime.Now; 85 dao.UpdateTask(task); 86 return true; 111 // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout) 112 task.LastHeartbeat = DateTime.Now; 113 dao.UpdateTask(task); 114 }); 87 115 } 88 116 … … 102 130 // process the jobProgresses 103 131 foreach (var jobProgress in heartbeat.JobProgress) { 104 Task curTask = dao.GetTask(jobProgress.Key); 132 Task curTask = null; 133 curTask = trans.UseTransaction(() => { return dao.GetTask(jobProgress.Key); }); 105 134 if (curTask == null) { 106 135 // task does not exist in db … … 131 160 break; 132 161 } 133 dao.UpdateTask(curTask);162 trans.UseTransaction(() => { dao.UpdateTask(curTask); }); 134 163 } 135 164 } … … 140 169 141 170 private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) { 142 var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id); 143 var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id); 144 return assignedResourceIds.Any(x => slaveResourceIds.Contains(x)); 171 return trans.UseTransaction(() => { 172 var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id); 173 var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id); 174 return assignedResourceIds.Any(x => slaveResourceIds.Contains(x)); 175 }); 145 176 } 146 177 147 178 private bool SlaveIsAllowedToCalculate(Guid slaveId) { 148 179 // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also 149 return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0); 180 return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0); }); 181 } 182 183 private bool ShutdownSlaveComputer(Guid slaveId) { 184 return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).Any(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() != 0); }); 150 185 } 151 186 }
Note: See TracChangeset
for help on using the changeset viewer.