Changeset 9123 for trunk/sources/HeuristicLab.Services.Hive/3.3/Manager
- Timestamp:
- 01/07/13 22:00:04 (12 years ago)
- Location:
- trunk/sources/HeuristicLab.Services.Hive
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.Services.Hive
-
Property
svn:mergeinfo
set to
/branches/HiveTaskScheduler/HeuristicLab.Services.Hive merged eligible
-
Property
svn:mergeinfo
set to
-
trunk/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
r8957 r9123 23 23 using System.Collections.Generic; 24 24 using System.Linq; 25 using System.Threading; 25 26 using HeuristicLab.Services.Hive.DataTransfer; 26 27 using DA = HeuristicLab.Services.Hive.DataAccess; … … 28 29 namespace HeuristicLab.Services.Hive { 29 30 public class HeartbeatManager { 31 private const string MutexName = "HiveTaskSchedulingMutex"; 32 30 33 private IHiveDao dao { 31 34 get { return ServiceLocator.Instance.HiveDao; } 32 35 } 33 private I AuthorizationManager auth{34 get { return ServiceLocator.Instance. AuthorizationManager; }36 private ITaskScheduler taskScheduler { 37 get { return ServiceLocator.Instance.TaskScheduler; } 35 38 } 36 39 … … 66 69 // assign new task 67 70 if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) { 68 var availableJobs = dao.GetWaitingTasks(slave, 1); 69 if (availableJobs.Count() > 0) { 70 var job = availableJobs.First(); 71 if (AssignJob(slave, job)) 72 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, job.Id)); 71 bool mutexAquired = false; 72 var mutex = new Mutex(false, MutexName); 73 try { 74 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 75 if (!mutexAquired) 76 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); 77 else { 78 var availableTasks = taskScheduler.Schedule(dao.GetWaitingTasks(slave)); 79 if (availableTasks.Any()) { 80 var task = availableTasks.First(); 81 AssignJob(slave, task.TaskId); 82 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 83 } 84 } 85 } 86 catch (AbandonedMutexException) { 87 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned."); 88 } 89 catch (Exception ex) { 90 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString()); 91 } 92 finally { 93 if (mutexAquired) mutex.ReleaseMutex(); 73 94 } 74 95 } … … 77 98 } 78 99 79 // returns true if assignment was successful 80 private bool AssignJob(Slave slave, Task task) { 81 // load task again and check if it is still available (this is an attempt to reduce the race condition which causes multiple heartbeats to get the same task assigned) 82 if (dao.GetTask(task.Id).State != TaskState.Waiting) return false; 83 84 task = dao.UpdateTaskState(task.Id, DataAccess.TaskState.Transferring, slave.Id, null, null); 100 private void AssignJob(Slave slave, Guid taskId) { 101 var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.Id, null, null); 85 102 86 103 // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout) 87 104 task.LastHeartbeat = DateTime.Now; 88 105 dao.UpdateTask(task); 89 return true;90 106 } 91 107
Note: See TracChangeset
for help on using the changeset viewer.