Changeset 12920 for trunk/sources
- Timestamp:
- 08/27/15 14:52:42 (9 years ago)
- Location:
- trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3
- Files:
-
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs
r12491 r12920 481 481 SlaveClientCom.Instance.LogMessage("Stopping heartbeat"); 482 482 heartbeatManager.StopHeartBeat(); 483 SlaveClientCom.Instance.LogMessage("Stopping checkpointing"); 484 taskManager.StopCheckpointing(); 485 483 486 abortRequested = true; 484 487 -
trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Executor.cs
r12012 r12920 185 185 #endregion 186 186 187 public Tuple<TaskData, DateTime> GetTaskDataSnapshot() { 188 if (taskDataInvalid) return null; 189 190 Tuple<TaskData, DateTime> snapshot = null; 191 if (task == null) { 192 if (CurrentException == null) { 193 CurrentException = new Exception("Task with id " + this.TaskId + " is null, sending empty task"); 194 } 195 } else { 196 var taskData = new TaskData(); 197 198 var pausedTrigger = new EventWaitHandle(false, EventResetMode.ManualReset); 199 EventHandler pausedHandler = null; 200 pausedHandler = (s, e) => { 201 task.TaskPaused -= pausedHandler; 202 task.TaskPaused += Task_TaskPaused; 203 pausedTrigger.Set(); 204 }; 205 206 task.TaskPaused -= Task_TaskPaused; 207 task.TaskPaused += pausedHandler; 208 task.Pause(); 209 pausedTrigger.WaitOne(); 210 211 taskData.Data = PersistenceUtil.Serialize(task); 212 var timestamp = DateTime.Now; 213 214 EventHandler startedHandler = null; 215 startedHandler = (s, e) => { 216 task.TaskStarted -= startedHandler; 217 task.TaskStarted += Task_TaskStarted; 218 }; 219 220 task.TaskStarted -= Task_TaskStarted; 221 task.TaskStarted += startedHandler; 222 task.Start(); 223 224 taskData.TaskId = TaskId; 225 snapshot = Tuple.Create(taskData, timestamp); 226 } 227 228 return snapshot; 229 } 230 187 231 public TaskData GetTaskData() { 188 232 if (taskDataInvalid) return null; -
trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Manager/TaskManager.cs
r12012 r12920 24 24 using System.Linq; 25 25 using System.Threading; 26 using HeuristicLab.Clients.Hive.SlaveCore.Properties; 26 27 using HeuristicLab.Common; 27 28 using HeuristicLab.Core; … … 32 33 /// Holds a list of slave tasks and manages access to this list. 33 34 /// Forwards events from SlaveTask and forwards commands to SlaveTask. 35 /// Periodically sends task data to the server to avoid loss of progress when the slave crashes. 34 36 /// </summary> 35 37 public class TaskManager { 36 private static ReaderWriterLockSlim slaveTasksLocker = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); 37 private Dictionary<Guid, SlaveTask> slaveTasks; 38 private ILog log; 39 private PluginManager pluginManager; 38 private static readonly ReaderWriterLockSlim slaveTasksLocker = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); 39 private readonly Dictionary<Guid, SnapshotInfo> slaveTasks; 40 private readonly ILog log; 41 private readonly PluginManager pluginManager; 42 private readonly CancellationTokenSource cts; 43 private readonly CancellationToken ct; 44 private readonly AutoResetEvent waitHandle; 45 private readonly WcfService wcfService; 46 private readonly TimeSpan checkpointInterval; 47 private readonly TimeSpan checkpointCheckInterval; 40 48 41 49 public int TaskCount { … … 62 70 this.pluginManager = pluginCache; 63 71 this.log = log; 64 this.slaveTasks = new Dictionary<Guid, SlaveTask>(); 65 } 72 this.slaveTasks = new Dictionary<Guid, SnapshotInfo>(); 73 74 cts = new CancellationTokenSource(); 75 ct = cts.Token; 76 waitHandle = new AutoResetEvent(true); 77 wcfService = WcfService.Instance; 78 checkpointInterval = Settings.Default.CheckpointInterval; 79 checkpointCheckInterval = Settings.Default.CheckpointCheckInterval; 80 81 System.Threading.Tasks.Task.Factory.StartNew(Checkpointing, ct); 82 } 83 84 #region Checkpointing 85 private void Checkpointing() { 86 while (!ct.IsCancellationRequested) { 87 slaveTasksLocker.EnterWriteLock(); 88 try { 89 foreach (var entry in slaveTasks) { 90 var taskId = entry.Key; 91 var snapshotInfo = entry.Value; 92 93 if (DateTime.Now - snapshotInfo.LastSnapshot <= checkpointInterval) continue; 94 95 var task = wcfService.GetTask(taskId); 96 var snapshot = snapshotInfo.Task.GetTaskDataSnapshot(); 97 98 if (snapshot == null) continue; 99 100 slaveTasks[taskId].LastSnapshot = snapshot.Item2; 101 var slaveId = ConfigManager.Instance.GetClientInfo().Id; 102 wcfService.UpdateTaskData(task, snapshot.Item1, slaveId, TaskState.Calculating); 103 } 104 } finally { slaveTasksLocker.ExitWriteLock(); } 105 waitHandle.WaitOne(checkpointCheckInterval); 106 } 107 } 108 109 public void StopCheckpointing() { 110 cts.Cancel(); 111 waitHandle.Set(); 112 waitHandle.Close(); 113 } 114 #endregion 66 115 67 116 #region Task Control methods … … 97 146 try { 98 147 if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId); 99 SlaveTask slaveTask = slaveTasks[taskId] ;148 SlaveTask slaveTask = slaveTasks[taskId].Task; 100 149 slaveTask.PauseTask(); 101 150 } … … 107 156 try { 108 157 if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId); 109 SlaveTask slaveTask = slaveTasks[taskId] ;158 SlaveTask slaveTask = slaveTasks[taskId].Task; 110 159 slaveTask.StopTask(); 111 160 } … … 118 167 try { 119 168 if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId); 120 slaveTask = slaveTasks[taskId] ;169 slaveTask = slaveTasks[taskId].Task; 121 170 if (!slaveTask.IsPrepared) throw new AppDomainNotCreatedException(); 122 171 RemoveSlaveTask(taskId, slaveTask); … … 132 181 try { 133 182 foreach (var slaveTask in slaveTasks.Values) { 134 slaveTask. PauseTask();183 slaveTask.Task.PauseTask(); 135 184 } 136 185 } … … 142 191 try { 143 192 foreach (var slaveTask in slaveTasks.Values) { 144 slaveTask. StopTask();193 slaveTask.Task.StopTask(); 145 194 } 146 195 } … … 152 201 try { 153 202 foreach (var slaveTask in slaveTasks.Values.ToArray()) { 154 AbortTask(slaveTask.Task Id);203 AbortTask(slaveTask.Task.TaskId); 155 204 } 156 205 } … … 163 212 slaveTasksLocker.EnterWriteLock(); 164 213 try { 165 slaveTasks.Add(task.Id, slaveTask);214 slaveTasks.Add(task.Id, new SnapshotInfo { Task = slaveTask, LastSnapshot = task.DateCreated.GetValueOrDefault() }); 166 215 RegisterSlaveTaskEvents(slaveTask); 167 216 } … … 198 247 slaveTasksLocker.EnterUpgradeableReadLock(); 199 248 try { 200 slaveTask = slaveTasks[e.Value] ;249 slaveTask = slaveTasks[e.Value].Task; 201 250 } 202 251 finally { slaveTasksLocker.ExitUpgradeableReadLock(); } … … 210 259 slaveTasksLocker.EnterUpgradeableReadLock(); 211 260 try { 212 slaveTask = slaveTasks[e.Value] ;261 slaveTask = slaveTasks[e.Value].Task; 213 262 RemoveSlaveTask(e.Value, slaveTask); 214 263 } … … 232 281 slaveTasksLocker.EnterUpgradeableReadLock(); 233 282 try { 234 slaveTask = slaveTasks[e.Value] ;283 slaveTask = slaveTasks[e.Value].Task; 235 284 RemoveSlaveTask(e.Value, slaveTask); 236 285 } … … 254 303 slaveTasksLocker.EnterUpgradeableReadLock(); 255 304 try { 256 slaveTask = slaveTasks[e.Value] ;305 slaveTask = slaveTasks[e.Value].Task; 257 306 RemoveSlaveTask(e.Value, slaveTask); 258 307 } … … 304 353 slaveTasksLocker.EnterReadLock(); 305 354 try { 306 return slaveTasks.ToDictionary(x => x.Key, x => x.Value. ExecutionTime);355 return slaveTasks.ToDictionary(x => x.Key, x => x.Value.Task.ExecutionTime); 307 356 } 308 357 finally { slaveTasksLocker.ExitReadLock(); } 358 } 359 360 private sealed class SnapshotInfo { 361 public SlaveTask Task { get; set; } 362 public DateTime LastSnapshot { get; set; } 309 363 } 310 364 } -
trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Properties/Settings.Designer.cs
r11651 r12920 299 299 } 300 300 } 301 302 [global::System.Configuration.UserScopedSettingAttribute()] 303 [global::System.Diagnostics.DebuggerNonUserCodeAttribute()] 304 [global::System.Configuration.DefaultSettingValueAttribute("10:00:00")] 305 public global::System.TimeSpan CheckpointInterval { 306 get { 307 return ((global::System.TimeSpan)(this["CheckpointInterval"])); 308 } 309 set { 310 this["CheckpointInterval"] = value; 311 } 312 } 313 314 [global::System.Configuration.UserScopedSettingAttribute()] 315 [global::System.Diagnostics.DebuggerNonUserCodeAttribute()] 316 [global::System.Configuration.DefaultSettingValueAttribute("00:05:00")] 317 public global::System.TimeSpan CheckpointCheckInterval { 318 get { 319 return ((global::System.TimeSpan)(this["CheckpointCheckInterval"])); 320 } 321 set { 322 this["CheckpointCheckInterval"] = value; 323 } 324 } 301 325 } 302 326 } -
trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Properties/Settings.settings
r11651 r12920 72 72 <Value Profile="(Default)">00:00:40</Value> 73 73 </Setting> 74 <Setting Name="CheckpointInterval" Type="System.TimeSpan" Scope="User"> 75 <Value Profile="(Default)">10:00:00</Value> 76 </Setting> 77 <Setting Name="CheckpointCheckInterval" Type="System.TimeSpan" Scope="User"> 78 <Value Profile="(Default)">00:05:00</Value> 79 </Setting> 74 80 </Settings> 75 81 </SettingsFile> -
trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/SlaveTask.cs
r12012 r12920 169 169 } 170 170 171 public Tuple<TaskData, DateTime> GetTaskDataSnapshot() { 172 Tuple<TaskData, DateTime> snapshot = null; 173 try { 174 snapshot = executor.GetTaskDataSnapshot(); 175 if (snapshot == null) return Tuple.Create(originalTaskData, DateTime.Now); 176 } 177 catch (Exception ex) { 178 EventLogManager.LogException(ex); 179 } 180 return snapshot; 181 } 182 171 183 public TaskData GetTaskData() { 172 184 TaskData data = null; -
trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/app.config
r11651 r12920 135 135 <value>00:00:40</value> 136 136 </setting> 137 <setting name="CheckpointInterval" serializeAs="String"> 138 <value>10:00:00</value> 139 </setting> 140 <setting name="CheckpointCheckInterval" serializeAs="String"> 141 <value>00:05:00</value> 142 </setting> 137 143 </HeuristicLab.Clients.Hive.SlaveCore.Properties.Settings> 138 144 <HeuristicLab.Clients.Hive.Settings>
Note: See TracChangeset
for help on using the changeset viewer.