Changeset 9665 for trunk/sources/HeuristicLab.Services.Hive/3.3
- Timestamp:
- 06/28/13 12:05:53 (11 years ago)
- Location:
- trunk/sources/HeuristicLab.Services.Hive
- Files:
-
- 10 edited
- 3 copied
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.Services.Hive
- Property svn:mergeinfo changed
/branches/HivePerformance/sources/HeuristicLab.Services.Hive (added) merged: 9369,9381,9385,9391,9393,9397,9399,9434,9444,9469,9485,9539,9634-9636
- Property svn:mergeinfo changed
-
trunk/sources/HeuristicLab.Services.Hive/3.3
- Property svn:ignore
-
old new 2 2 bin 3 3 obj 4 *.user
-
- Property svn:ignore
-
trunk/sources/HeuristicLab.Services.Hive/3.3/Convert.cs
r9456 r9665 102 102 public static DT.TaskData ToDto(DB.TaskData source) { 103 103 if (source == null) return null; 104 return new DT.TaskData { TaskId = source.TaskId, Data = source.Data .ToArray(), LastUpdate = source.LastUpdate };104 return new DT.TaskData { TaskId = source.TaskId, Data = source.Data, LastUpdate = source.LastUpdate }; 105 105 } 106 106 public static DB.TaskData ToEntity(DT.TaskData source) { … … 111 111 public static void ToEntity(DT.TaskData source, DB.TaskData target) { 112 112 if ((source != null) && (target != null)) { 113 target.TaskId = source.TaskId; target.Data = new Binary(source.Data); target.LastUpdate = source.LastUpdate; 113 target.TaskId = source.TaskId; 114 target.Data = source.Data; 115 target.LastUpdate = source.LastUpdate; 114 116 } 115 117 } … … 213 215 public static void ToEntity(DT.PluginData source, DB.PluginData target) { 214 216 if ((source != null) && (target != null)) { 215 target.PluginDataId = source.Id; target.PluginId = source.PluginId; target.Data = new Binary(source.Data); target.FileName = source.FileName; 217 target.PluginDataId = source.Id; 218 target.PluginId = source.PluginId; 219 target.Data = source.Data; 220 target.FileName = source.FileName; 216 221 } 217 222 } -
trunk/sources/HeuristicLab.Services.Hive/3.3/HeuristicLab.Services.Hive-3.3.csproj
r9123 r9665 137 137 <Compile Include="DataTransfer\Statistics.cs" /> 138 138 <Compile Include="DataTransfer\UserStatistics.cs" /> 139 <Compile Include="OptimizedHiveDao.cs" /> 139 140 <Compile Include="HiveDao.cs" /> 140 141 <Compile Include="HiveJanitor.cs" /> 142 <Compile Include="HiveOperationContext.cs" /> 143 <Compile Include="Interfaces\IOptimizedHiveDao.cs" /> 141 144 <Compile Include="Interfaces\IHiveDao.cs" /> 142 145 <Compile Include="Interfaces\ITaskScheduler.cs" /> -
trunk/sources/HeuristicLab.Services.Hive/3.3/HiveDao.cs
r9456 r9665 35 35 } 36 36 37 public HiveDao() { }38 39 37 #region Task Methods 40 38 public DT.Task GetTask(Guid id) { … … 666 664 } 667 665 668 private void CollectParentResources(List<Resource> resources, Resource resource) {666 private static void CollectParentResources(ICollection<Resource> resources, Resource resource) { 669 667 if (resource == null) return; 670 668 resources.Add(resource); -
trunk/sources/HeuristicLab.Services.Hive/3.3/HiveService.cs
r9456 r9665 37 37 /// </summary> 38 38 [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall, IgnoreExtensionDataObject = true)] 39 [HiveOperationContextBehavior] 39 40 public class HiveService : IHiveService { 40 41 private IHiveDao dao { 41 42 get { return ServiceLocator.Instance.HiveDao; } 42 43 } 44 private IOptimizedHiveDao optimizedDao { 45 get { return ServiceLocator.Instance.OptimizedHiveDao; } 46 } 43 47 private Access.IRoleVerifier authen { 44 48 get { return ServiceLocator.Instance.RoleVerifier; } … … 64 68 authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client); 65 69 return trans.UseTransaction(() => { 66 task.Id = dao.AddTask(task); 67 taskData.TaskId = task.Id; 68 taskData.LastUpdate = DateTime.Now; 69 dao.AssignJobToResource(task.Id, resourceIds); 70 dao.AddTaskData(taskData); 71 dao.UpdateTaskState(task.Id, DA.TaskState.Waiting, null, userManager.CurrentUserId, null); 72 return taskData.TaskId; 70 var t = DT.Convert.ToEntity(task); 71 t.RequiredPlugins.AddRange(task.PluginsNeededIds.Select(pluginId => new DA.RequiredPlugin { Task = t, PluginId = pluginId })); 72 73 t.JobData = DT.Convert.ToEntity(taskData); 74 t.JobData.LastUpdate = DateTime.Now; 75 76 optimizedDao.AddTask(t); 77 78 dao.AssignJobToResource(t.TaskId, resourceIds); 79 80 optimizedDao.UpdateTaskState(t.TaskId, DA.TaskState.Waiting, null, userManager.CurrentUserId, null); 81 82 return t.TaskId; 73 83 }, false, true); 74 84 } … … 76 86 public Guid AddChildTask(Guid parentTaskId, Task task, TaskData taskData) { 77 87 authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client); 78 return trans.UseTransaction(() => { 79 task.ParentTaskId = parentTaskId; 80 return AddTask(task, taskData, dao.GetAssignedResources(parentTaskId).Select(x => x.Id)); 81 }, false, true); 88 task.ParentTaskId = parentTaskId; 89 return AddTask(task, taskData, optimizedDao.GetAssignedResourceIds(parentTaskId)); 82 90 } 83 91 … … 87 95 88 96 return trans.UseTransaction(() => { 89 return dao.GetTask(taskId);97 return DT.Convert.ToDto(optimizedDao.GetTaskById(taskId)); 90 98 }, false, false); 91 99 } … … 128 136 129 137 return trans.UseTransaction(() => { 130 return dao.GetLightweightTasks(task => task.JobId ==jobId).ToArray();138 return optimizedDao.GetLightweightTasks(jobId).ToArray(); 131 139 }, false, true); 132 140 } … … 155 163 156 164 trans.UseTransaction(() => { 157 dao.UpdateTaskAndPlugins(taskDto); 165 var task = optimizedDao.GetTaskByDto(taskDto); 166 optimizedDao.UpdateTask(task); 158 167 }); 159 168 } … … 164 173 165 174 trans.UseTransaction(() => { 166 dao.UpdateTaskAndPlugins(task); 167 }); 168 169 trans.UseTransaction(() => { 170 taskData.LastUpdate = DateTime.Now; 171 dao.UpdateTaskData(taskData); 175 var t = optimizedDao.GetTaskByDto(task); 176 optimizedDao.UpdateTask(t); 177 }); 178 179 trans.UseTransaction(() => { 180 var data = optimizedDao.GetTaskDataByDto(taskData); 181 data.LastUpdate = DateTime.Now; 182 optimizedDao.UpdateTaskData(data); 172 183 }); 173 184 } … … 196 207 authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave); 197 208 author.AuthorizeForTask(taskId, Permission.Full); 198 return trans.UseTransaction(() => { 199 Task task = dao.UpdateTaskState(taskId, DataTransfer.Convert.ToEntity(taskState), slaveId, userId, exception); 200 201 if (task.Command.HasValue && task.Command.Value == Command.Pause && task.State == TaskState.Paused) { 209 210 return trans.UseTransaction(() => { 211 var task = optimizedDao.UpdateTaskState(taskId, DT.Convert.ToEntity(taskState), slaveId, userId, exception); 212 213 if (task.Command.HasValue && task.Command.Value == DA.Command.Pause && task.State == DA.TaskState.Paused) { 202 214 task.Command = null; 203 } else if (task.Command.HasValue && task.Command.Value == Command.Abort && task.State ==TaskState.Aborted) {215 } else if (task.Command.HasValue && task.Command.Value == DA.Command.Abort && task.State == DA.TaskState.Aborted) { 204 216 task.Command = null; 205 } else if (task.Command.HasValue && task.Command.Value == Command.Stop && task.State ==TaskState.Aborted) {217 } else if (task.Command.HasValue && task.Command.Value == DA.Command.Stop && task.State == DA.TaskState.Aborted) { 206 218 task.Command = null; 207 219 } else if (taskState == TaskState.Paused && !task.Command.HasValue) { 208 220 // slave paused and uploaded the task (no user-command) -> set waiting. 209 task = dao.UpdateTaskState(taskId, DataTransfer.Convert.ToEntity(TaskState.Waiting), slaveId, userId, exception); 210 } 211 212 dao.UpdateTaskAndPlugins(task); 213 return task; 221 task = optimizedDao.UpdateTaskState(taskId, DA.TaskState.Waiting, slaveId, userId, exception); 222 } 223 224 return DT.Convert.ToDto(task); 214 225 }); 215 226 } … … 463 474 authen.AuthenticateForAnyRole(HiveRoles.Administrator, HiveRoles.Client, HiveRoles.Slave); 464 475 return trans.UseTransaction(() => { 465 return dao.GetPlugin(pluginId);476 return DT.Convert.ToDto(optimizedDao.GetPluginById(pluginId)); 466 477 }); 467 478 } -
trunk/sources/HeuristicLab.Services.Hive/3.3/Interfaces/IServiceLocator.cs
r9456 r9665 27 27 IAuthorizationManager AuthorizationManager { get; } 28 28 IHiveDao HiveDao { get; } 29 IOptimizedHiveDao OptimizedHiveDao { get; } 29 30 IEventManager EventManager { get; } 30 31 ITransactionManager TransactionManager { get; } -
trunk/sources/HeuristicLab.Services.Hive/3.3/Manager/EventManager.cs
r9456 r9665 78 78 //we have to find another way to deal with this. 79 79 //until then the next line is commented out... 80 //stats.UserStatistics = d ao.GetUserStatistics();80 //stats.UserStatistics = dtoDao.GetUserStatistics(); 81 81 dao.AddStatistics(stats); 82 82 } -
trunk/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
r9456 r9665 24 24 using System.Linq; 25 25 using System.Threading; 26 using HeuristicLab.Services.Hive.Data Transfer;27 using DA = HeuristicLab.Services.Hive.DataAccess;26 using HeuristicLab.Services.Hive.DataAccess; 27 using Heartbeat = HeuristicLab.Services.Hive.DataTransfer.Heartbeat; 28 28 29 29 namespace HeuristicLab.Services.Hive { … … 31 31 private const string MutexName = "HiveTaskSchedulingMutex"; 32 32 33 private I HiveDao dao {34 get { return ServiceLocator.Instance. HiveDao; }33 private IOptimizedHiveDao dao { 34 get { return ServiceLocator.Instance.OptimizedHiveDao; } 35 35 } 36 36 private ITaskScheduler taskScheduler { … … 47 47 public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) { 48 48 List<MessageContainer> actions = new List<MessageContainer>(); 49 49 50 Slave slave = null; 50 slave = trans.UseTransaction(() => { return dao.GetSlave(heartbeat.SlaveId); }); 51 51 trans.UseTransaction(() => { 52 slave = dao.GetSlaveById(heartbeat.SlaveId); 53 }); 52 54 if (slave == null) { 53 55 actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello)); … … 56 58 actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval)); 57 59 } 58 if ( ShutdownSlaveComputer(slave.Id)) {60 if (dao.SlaveHasToShutdownComputer(slave.ResourceId)) { 59 61 actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer)); 60 62 } … … 64 66 slave.FreeMemory = heartbeat.FreeMemory; 65 67 slave.CpuUtilization = heartbeat.CpuUtilization; 66 slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.Id);68 slave.IsAllowedToCalculate = dao.SlaveIsAllowedToCalculate(slave.ResourceId); 67 69 slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle; 68 70 slave.LastHeartbeat = DateTime.Now; 69 71 70 trans.UseTransaction(() => { dao.UpdateSlave(slave); }); 72 trans.UseTransaction(() => { 73 dao.UpdateSlave(slave); 74 }); 71 75 72 76 // update task data … … 78 82 var mutex = new Mutex(false, MutexName); 79 83 try { 84 80 85 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 81 86 if (!mutexAquired) 82 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");87 LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); 83 88 else { 84 IEnumerable<TaskInfoForScheduler> availableTasks = null; 85 availableTasks = trans.UseTransaction(() => { return taskScheduler.Schedule(dao.GetWaitingTasks(slave)); }); 86 if (availableTasks.Any()) { 87 var task = availableTasks.First(); 88 AssignJob(slave, task.TaskId); 89 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 90 } 89 trans.UseTransaction(() => { 90 IEnumerable<TaskInfoForScheduler> availableTasks = null; 91 availableTasks = taskScheduler.Schedule(dao.GetWaitingTasks(slave).ToArray()); 92 if (availableTasks.Any()) { 93 var task = availableTasks.First(); 94 AssignTask(slave, task.TaskId); 95 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 96 } 97 }); 91 98 } 92 99 } 93 100 catch (AbandonedMutexException) { 94 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");101 LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned."); 95 102 } 96 103 catch (Exception ex) { 97 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());104 LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString()); 98 105 } 99 106 finally { … … 105 112 } 106 113 107 private void AssignJob(Slave slave, Guid taskId) { 108 trans.UseTransaction(() => { 109 var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.Id, null, null); 114 private void AssignTask(Slave slave, Guid taskId) { 115 var task = dao.UpdateTaskState(taskId, TaskState.Transferring, slave.ResourceId, null, null); 110 116 111 // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout) 112 task.LastHeartbeat = DateTime.Now; 113 dao.UpdateTask(task); 114 }); 117 // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout) 118 task.LastHeartbeat = DateTime.Now; 119 dao.UpdateTask(task); 115 120 } 116 121 … … 130 135 // process the jobProgresses 131 136 foreach (var jobProgress in heartbeat.JobProgress) { 132 Task curTask = null; 133 curTask = trans.UseTransaction(() => { return dao.GetTask(jobProgress.Key); }); 137 Tuple<Task, Guid?> taskWithLastStateLogSlaveId = null; 138 trans.UseTransaction(() => { 139 taskWithLastStateLogSlaveId = dao.GetTaskByIdAndLastStateLogSlaveId(jobProgress.Key); 140 }); 141 var curTask = taskWithLastStateLogSlaveId != null ? taskWithLastStateLogSlaveId.Item1 : null; 134 142 if (curTask == null) { 135 143 // task does not exist in db 136 144 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key)); 137 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);145 LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); 138 146 } else { 139 if (curTask.CurrentStateLog.SlaveId == Guid.Empty || curTask.CurrentStateLog.SlaveId != heartbeat.SlaveId) { 147 var slaveId = taskWithLastStateLogSlaveId.Item2; 148 if (slaveId == Guid.Empty || slaveId != heartbeat.SlaveId) { 140 149 // assigned slave does not match heartbeat 141 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask. Id));142 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask);143 } else if (! TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) {150 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 151 LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask); 152 } else if (!dao.TaskIsAllowedToBeCalculatedBySlave(curTask.TaskId, heartbeat.SlaveId)) { 144 153 // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group 145 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask. Id));154 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 146 155 } else { 147 156 // save task execution time 148 curTask.ExecutionTime = jobProgress.Value;157 curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds; 149 158 curTask.LastHeartbeat = DateTime.Now; 150 159 151 160 switch (curTask.Command) { 152 161 case Command.Stop: 153 actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask. Id));162 actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId)); 154 163 break; 155 164 case Command.Pause: 156 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask. Id));165 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 157 166 break; 158 167 case Command.Abort: 159 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask. Id));168 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 160 169 break; 161 170 } 162 trans.UseTransaction(() => { dao.UpdateTask(curTask); }); 171 trans.UseTransaction(() => { 172 dao.UpdateTask(curTask); 173 }); 163 174 } 164 175 } … … 167 178 return actions; 168 179 } 169 170 private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) {171 return trans.UseTransaction(() => {172 var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id);173 var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id);174 return assignedResourceIds.Any(x => slaveResourceIds.Contains(x));175 });176 }177 178 private bool SlaveIsAllowedToCalculate(Guid slaveId) {179 // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also180 return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0); });181 }182 183 private bool ShutdownSlaveComputer(Guid slaveId) {184 return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).Any(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() != 0); });185 }186 180 } 187 181 } -
trunk/sources/HeuristicLab.Services.Hive/3.3/ServiceLocator.cs
r9456 r9665 39 39 if (hiveDao == null) hiveDao = new HiveDao(); 40 40 return hiveDao; 41 } 42 } 43 44 public IOptimizedHiveDao OptimizedHiveDao { 45 get { 46 var dataContext = HiveOperationContext.Current != null 47 ? HiveOperationContext.Current.DataContext 48 : new HiveDataContext(Settings.Default.HeuristicLab_Hive_LinqConnectionString); 49 return new OptimizedHiveDao(dataContext); 41 50 } 42 51 }
Note: See TracChangeset
for help on using the changeset viewer.