Changeset 9381
- Timestamp:
- 04/18/13 16:35:14 (12 years ago)
- Location:
- branches/HivePerformance/sources
- Files:
-
- 1 added
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HivePerformance/sources/HeuristicLab.Services.Hive.DataAccess/3.3/HiveDataContext.dbml
r9123 r9381 114 114 <Type Name="TaskData"> 115 115 <Column Name="TaskId" Storage="_JobId" Type="System.Guid" DbType="UniqueIdentifier NOT NULL" IsPrimaryKey="true" CanBeNull="false" /> 116 <Column Name="Data" Type="System.Data.Linq.Binary" DbType="VarBinary(MAX)" CanBeNull="false" UpdateCheck="Never" />116 <Column Name="Data" Type="System.Data.Linq.Binary" DbType="VarBinary(MAX)" CanBeNull="false" UpdateCheck="Never" IsDelayLoaded="true" /> 117 117 <Column Name="LastUpdate" Type="System.DateTime" DbType="DateTime" CanBeNull="false" /> 118 118 <Association Name="Task_TaskData" Member="Task" Storage="_Job" ThisKey="TaskId" OtherKey="TaskId" Type="Task" IsForeignKey="true" /> … … 123 123 <Column Name="PluginDataId" Type="System.Guid" DbType="UniqueIdentifier NOT NULL" IsPrimaryKey="true" IsDbGenerated="true" CanBeNull="false" /> 124 124 <Column Name="PluginId" Type="System.Guid" DbType="UniqueIdentifier NOT NULL" CanBeNull="false" /> 125 <Column Name="Data" Type="System.Data.Linq.Binary" DbType="VarBinary(MAX) NOT NULL" CanBeNull="false" UpdateCheck="Never" />125 <Column Name="Data" Type="System.Data.Linq.Binary" DbType="VarBinary(MAX) NOT NULL" CanBeNull="false" UpdateCheck="Never" IsDelayLoaded="true" /> 126 126 <Column Name="FileName" Type="System.String" DbType="VarChar(MAX)" CanBeNull="false" /> 127 127 <Association Name="Plugin_PluginData" Member="Plugin" ThisKey="PluginId" OtherKey="PluginId" Type="Plugin" IsForeignKey="true" /> -
branches/HivePerformance/sources/HeuristicLab.Services.Hive.DataAccess/3.3/HiveDataContext.designer.cs
r9123 r9381 3 3 // <auto-generated> 4 4 // This code was generated by a tool. 5 // Runtime Version:4.0.30319.1 79295 // Runtime Version:4.0.30319.18034 6 6 // 7 7 // Changes to this file may cause incorrect behavior and will be lost if … … 2661 2661 private System.Guid _JobId; 2662 2662 2663 private System.Data.Linq. Binary_Data;2663 private System.Data.Linq.Link<System.Data.Linq.Binary> _Data; 2664 2664 2665 2665 private System.DateTime _LastUpdate; … … 2714 2714 get 2715 2715 { 2716 return this._Data ;2717 } 2718 set 2719 { 2720 if ((this._Data != value))2716 return this._Data.Value; 2717 } 2718 set 2719 { 2720 if ((this._Data.Value != value)) 2721 2721 { 2722 2722 this.OnDataChanging(value); 2723 2723 this.SendPropertyChanging(); 2724 this._Data = value;2724 this._Data.Value = value; 2725 2725 this.SendPropertyChanged("Data"); 2726 2726 this.OnDataChanged(); … … 2814 2814 private System.Guid _PluginId; 2815 2815 2816 private System.Data.Linq. Binary_Data;2816 private System.Data.Linq.Link<System.Data.Linq.Binary> _Data; 2817 2817 2818 2818 private string _FileName; … … 2889 2889 get 2890 2890 { 2891 return this._Data ;2892 } 2893 set 2894 { 2895 if ((this._Data != value))2891 return this._Data.Value; 2892 } 2893 set 2894 { 2895 if ((this._Data.Value != value)) 2896 2896 { 2897 2897 this.OnDataChanging(value); 2898 2898 this.SendPropertyChanging(); 2899 this._Data = value;2899 this._Data.Value = value; 2900 2900 this.SendPropertyChanged("Data"); 2901 2901 this.OnDataChanged(); -
branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/HeuristicLab.Services.Hive-3.3.csproj
r9123 r9381 139 139 <Compile Include="HiveDao.cs" /> 140 140 <Compile Include="HiveJanitor.cs" /> 141 <Compile Include="HiveOperationContext.cs" /> 141 142 <Compile Include="Interfaces\IHiveDao.cs" /> 142 143 <Compile Include="Interfaces\ITaskScheduler.cs" /> -
branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/HiveDao.cs
r9304 r9381 43 43 } 44 44 45 public Task GetTaskDA(Guid id) { 46 var db = HiveOperationContext.Current.DataContext; 47 return db.Tasks.SingleOrDefault(x => x.TaskId == id); 48 } 49 45 50 public IEnumerable<DT.Task> GetTasks(Expression<Func<Task, bool>> predicate) { 46 51 using (var db = CreateContext()) { … … 138 143 db.SubmitChanges(); 139 144 } 145 } 146 147 public void UpdateTaskDA(Task task) { 148 var db = HiveOperationContext.Current.DataContext; 149 db.SubmitChanges(); 140 150 } 141 151 … … 195 205 } 196 206 207 public IEnumerable<TaskInfoForScheduler> GetWaitingTasksDA(Slave slave) { 208 var db = HiveOperationContext.Current.DataContext; 209 var parentResources = GetParentResourcesDA(slave.ResourceId); 210 var resourceIds = parentResources.Select(x => x.ResourceId); 211 //Originally we checked here if there are parent tasks which should be calculated (with GetParentTasks(resourceIds, count, false);). 212 //Because there is at the moment no case where this makes sense (there don't exist parent tasks which need to be calculated), 213 //we skip this step because it's wasted runtime 214 215 var query = from ar in db.AssignedResources 216 where resourceIds.Contains(ar.ResourceId) 217 && !(ar.Task.IsParentTask && ar.Task.FinishWhenChildJobsFinished) 218 && ar.Task.State == TaskState.Waiting 219 && ar.Task.CoresNeeded <= slave.FreeCores 220 && ar.Task.MemoryNeeded <= slave.FreeMemory 221 select new TaskInfoForScheduler() { TaskId = ar.Task.TaskId, JobId = ar.Task.JobId, Priority = ar.Task.Priority }; 222 var waitingTasks = query.ToArray(); 223 return waitingTasks; 224 } 225 197 226 public DT.Task UpdateTaskState(Guid taskId, TaskState taskState, Guid? slaveId, Guid? userId, string exception) { 198 227 using (var db = CreateContext()) { … … 222 251 return DT.Convert.ToDto(task); 223 252 } 253 } 254 255 public Task UpdateTaskStateDA(Guid taskId, TaskState taskState, Guid? slaveId, Guid? userId, string exception) { 256 var db = HiveOperationContext.Current.DataContext; 257 258 db.StateLogs.InsertOnSubmit(new StateLog { 259 TaskId = taskId, 260 State = taskState, 261 SlaveId = slaveId, 262 UserId = userId, 263 Exception = exception, 264 DateTime = DateTime.Now 265 }); 266 267 var task = db.Tasks.SingleOrDefault(x => x.TaskId == taskId); 268 task.State = taskState; 269 270 db.SubmitChanges(); 271 272 return task; 224 273 } 225 274 #endregion … … 516 565 } 517 566 567 public Slave GetSlaveDA(Guid id) { 568 var db = HiveOperationContext.Current.DataContext; 569 return db.Resources.OfType<Slave>().SingleOrDefault(x => x.ResourceId == id); 570 } 571 518 572 public IEnumerable<DT.Slave> GetSlaves(Expression<Func<Slave, bool>> predicate) { 519 573 using (var db = CreateContext()) { … … 538 592 db.SubmitChanges(); 539 593 } 594 } 595 596 public void UpdateSlaveDA(Slave slave) { 597 var db = HiveOperationContext.Current.DataContext; 598 db.SubmitChanges(); 540 599 } 541 600 … … 664 723 return resources.Select(r => DT.Convert.ToDto(r)).ToArray(); 665 724 } 725 } 726 727 public IEnumerable<Resource> GetParentResourcesDA(Guid resourceId) { 728 var db = HiveOperationContext.Current.DataContext; 729 var resources = new List<Resource>(); 730 CollectParentResources(resources, db.Resources.Where(r => r.ResourceId == resourceId).Single()); 731 return resources; 666 732 } 667 733 -
branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Interfaces/IHiveDao.cs
r9266 r9381 30 30 #region Task Methods 31 31 DT.Task GetTask(Guid id); 32 Task GetTaskDA(Guid id); 32 33 IEnumerable<DT.Task> GetTasks(Expression<Func<Task, bool>> predicate); 33 34 IEnumerable<DT.LightweightTask> GetLightweightTasks(Expression<Func<Task, bool>> predicate); … … 37 38 void UpdateTaskAndStateLogs(DT.Task dto); 38 39 void UpdateTask(DT.Task dto); 40 void UpdateTaskDA(Task dto); 39 41 void DeleteTask(Guid id); 40 42 IEnumerable<TaskInfoForScheduler> GetWaitingTasks(DT.Slave slave); 43 IEnumerable<TaskInfoForScheduler> GetWaitingTasksDA(Slave slave); 41 44 IEnumerable<DT.Task> GetParentTasks(IEnumerable<Guid> resourceIds, int count, bool finished); 42 45 DT.Task UpdateTaskState(Guid taskId, TaskState taskState, Guid? slaveId, Guid? userId, string exception); 46 Task UpdateTaskStateDA(Guid taskId, TaskState taskState, Guid? slaveId, Guid? userId, string exception); 43 47 #endregion 44 48 … … 95 99 #region Slave Methods 96 100 DT.Slave GetSlave(Guid id); 101 Slave GetSlaveDA(Guid id); 97 102 IEnumerable<DT.Slave> GetSlaves(Expression<Func<Slave, bool>> predicate); 98 103 Guid AddSlave(DT.Slave dto); 99 104 void UpdateSlave(DT.Slave dto); 105 void UpdateSlaveDA(Slave dto); 100 106 void DeleteSlave(Guid id); 101 107 #endregion … … 118 124 IEnumerable<DT.Resource> GetAssignedResources(Guid jobId); 119 125 IEnumerable<DT.Resource> GetParentResources(Guid resourceId); 126 IEnumerable<Resource> GetParentResourcesDA(Guid resourceId); 120 127 IEnumerable<DT.Resource> GetChildResources(Guid resourceId); 121 128 IEnumerable<DT.Task> GetJobsByResourceId(Guid resourceId); -
branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs
r9257 r9381 47 47 public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) { 48 48 List<MessageContainer> actions = new List<MessageContainer>(); 49 Slave slave = null;50 slave = trans.UseTransaction(() => { return dao.GetSlave(heartbeat.SlaveId); });51 49 52 if (slave == null) { 53 actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello)); 54 } else { 55 if (heartbeat.HbInterval != slave.HbInterval) { 56 actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval)); 50 DA.Slave slave = null; 51 trans.UseTransaction(() => { 52 slave = dao.GetSlaveDA(heartbeat.SlaveId); 53 54 if (slave == null) { 55 actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello)); 56 } else { 57 if (heartbeat.HbInterval != slave.HbInterval) { 58 actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval)); 59 } 60 if (ShutdownSlaveComputer(slave.ResourceId)) { 61 actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer)); 62 } 63 64 // update slave data 65 slave.FreeCores = heartbeat.FreeCores; 66 slave.FreeMemory = heartbeat.FreeMemory; 67 slave.CpuUtilization = heartbeat.CpuUtilization; 68 slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.ResourceId); 69 slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? DA.SlaveState.Calculating : DA.SlaveState.Idle; 70 slave.LastHeartbeat = DateTime.Now; 71 72 dao.UpdateSlaveDA(slave); 57 73 } 58 if (ShutdownSlaveComputer(slave.Id)) { 59 actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer)); 60 } 74 }); 61 75 62 // update slave data 63 slave.FreeCores = heartbeat.FreeCores; 64 slave.FreeMemory = heartbeat.FreeMemory; 65 slave.CpuUtilization = heartbeat.CpuUtilization; 66 slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.Id); 67 slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle; 68 slave.LastHeartbeat = DateTime.Now; 76 // update task data 77 actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate)); 69 78 70 trans.UseTransaction(() => { dao.UpdateSlave(slave); }); 79 // assign new task 80 if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) { 81 bool mutexAquired = false; 82 var mutex = new Mutex(false, MutexName); 83 try { 71 84 72 // update task data 73 actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate)); 74 75 // assign new task 76 if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) { 77 bool mutexAquired = false; 78 var mutex = new Mutex(false, MutexName); 79 try { 80 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 81 if (!mutexAquired) 82 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); 83 else { 85 mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience); 86 if (!mutexAquired) 87 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired."); 88 else { 89 trans.UseTransaction(() => { 84 90 IEnumerable<TaskInfoForScheduler> availableTasks = null; 85 availableTasks = t rans.UseTransaction(() => { return taskScheduler.Schedule(dao.GetWaitingTasks(slave)); });91 availableTasks = taskScheduler.Schedule(dao.GetWaitingTasksDA(slave)); 86 92 if (availableTasks.Any()) { 87 93 var task = availableTasks.First(); … … 89 95 actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId)); 90 96 } 91 } 97 }); 92 98 } 93 catch (AbandonedMutexException) {94 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");95 }96 catch (Exception ex) {97 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());98 }99 finally {100 if (mutexAquired) mutex.ReleaseMutex();101 }99 } 100 catch (AbandonedMutexException) { 101 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned."); 102 } 103 catch (Exception ex) { 104 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString()); 105 } 106 finally { 107 if (mutexAquired) mutex.ReleaseMutex(); 102 108 } 103 109 } … … 105 111 } 106 112 107 private void AssignJob(Slave slave, Guid taskId) { 108 trans.UseTransaction(() => { 109 var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.Id, null, null); 113 private void AssignJob(DA.Slave slave, Guid taskId) { 114 var task = dao.UpdateTaskStateDA(taskId, DataAccess.TaskState.Transferring, slave.ResourceId, null, null); 110 115 111 // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout) 112 task.LastHeartbeat = DateTime.Now; 113 dao.UpdateTask(task); 114 }); 116 // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout) 117 task.LastHeartbeat = DateTime.Now; 118 dao.UpdateTaskDA(task); 115 119 } 116 120 … … 128 132 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll)); 129 133 } else { 130 // process the jobProgresses 131 foreach (var jobProgress in heartbeat.JobProgress) { 132 Task curTask = null; 133 curTask = trans.UseTransaction(() => { return dao.GetTask(jobProgress.Key); }); 134 if (curTask == null) { 135 // task does not exist in db 136 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key)); 137 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); 138 } else { 139 if (curTask.CurrentStateLog.SlaveId == Guid.Empty || curTask.CurrentStateLog.SlaveId != heartbeat.SlaveId) { 140 // assigned slave does not match heartbeat 141 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id)); 142 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask); 143 } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) { 144 // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group 145 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id)); 134 trans.UseTransaction(() => { 135 // process the jobProgresses 136 foreach (var jobProgress in heartbeat.JobProgress) { 137 var curTask = dao.GetTaskDA(jobProgress.Key); 138 if (curTask == null) { 139 // task does not exist in db 140 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key)); 141 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key); 146 142 } else { 147 // save task execution time 148 curTask.ExecutionTime = jobProgress.Value; 149 curTask.LastHeartbeat = DateTime.Now; 143 var currentStateLog = curTask.StateLogs.LastOrDefault(); 144 if (currentStateLog.SlaveId == Guid.Empty || currentStateLog.SlaveId != heartbeat.SlaveId) { 145 // assigned slave does not match heartbeat 146 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 147 DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask); 148 } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) { 149 // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group 150 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 151 } else { 152 // save task execution time 153 curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds; 154 curTask.LastHeartbeat = DateTime.Now; 150 155 151 switch (curTask.Command) { 152 case Command.Stop: 153 actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.Id)); 154 break; 155 case Command.Pause: 156 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id)); 157 break; 158 case Command.Abort: 159 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id)); 160 break; 156 switch (curTask.Command) { 157 case DA.Command.Stop: 158 actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId)); 159 break; 160 case DA.Command.Pause: 161 actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId)); 162 break; 163 case DA.Command.Abort: 164 actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId)); 165 break; 166 } 167 dao.UpdateTaskDA(curTask); 161 168 } 162 trans.UseTransaction(() => { dao.UpdateTask(curTask); });163 169 } 164 170 } 165 } 171 }); 166 172 } 167 173 return actions; 168 174 } 169 175 170 private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) { 171 return trans.UseTransaction(() => { 172 var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id); 173 var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id); 174 return assignedResourceIds.Any(x => slaveResourceIds.Contains(x)); 175 }); 176 private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, DA.Task curTask) { 177 var assignedResourceIds = curTask.AssignedResources.Select(ar => ar.Resource.ResourceId); 178 var slaveResourceIds = dao.GetParentResourcesDA(slaveId).Select(x => x.ResourceId); 179 return assignedResourceIds.Any(r => slaveResourceIds.Contains(r)); 176 180 } 177 181 178 182 private bool SlaveIsAllowedToCalculate(Guid slaveId) { 183 var parentResources = dao.GetParentResourcesDA(slaveId); 179 184 // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also 180 return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0); });185 return parentResources.All(r => r.Downtimes.Where(d => d.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)).Count() == 0); 181 186 } 182 187 183 188 private bool ShutdownSlaveComputer(Guid slaveId) { 184 return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).Any(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() != 0); }); 189 var parentResources = dao.GetParentResourcesDA(slaveId); 190 191 return parentResources.Any(r => r.Downtimes.Where(d => d.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)).Count() != 0); 185 192 } 186 193 }
Note: See TracChangeset
for help on using the changeset viewer.