- Timestamp:
- 08/27/15 14:52:42 (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Manager/TaskManager.cs
r12012 r12920 24 24 using System.Linq; 25 25 using System.Threading; 26 using HeuristicLab.Clients.Hive.SlaveCore.Properties; 26 27 using HeuristicLab.Common; 27 28 using HeuristicLab.Core; … … 32 33 /// Holds a list of slave tasks and manages access to this list. 33 34 /// Forwards events from SlaveTask and forwards commands to SlaveTask. 35 /// Periodically sends task data to the server to avoid loss of progress when the slave crashes. 34 36 /// </summary> 35 37 public class TaskManager { 36 private static ReaderWriterLockSlim slaveTasksLocker = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); 37 private Dictionary<Guid, SlaveTask> slaveTasks; 38 private ILog log; 39 private PluginManager pluginManager; 38 private static readonly ReaderWriterLockSlim slaveTasksLocker = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); 39 private readonly Dictionary<Guid, SnapshotInfo> slaveTasks; 40 private readonly ILog log; 41 private readonly PluginManager pluginManager; 42 private readonly CancellationTokenSource cts; 43 private readonly CancellationToken ct; 44 private readonly AutoResetEvent waitHandle; 45 private readonly WcfService wcfService; 46 private readonly TimeSpan checkpointInterval; 47 private readonly TimeSpan checkpointCheckInterval; 40 48 41 49 public int TaskCount { … … 62 70 this.pluginManager = pluginCache; 63 71 this.log = log; 64 this.slaveTasks = new Dictionary<Guid, SlaveTask>(); 65 } 72 this.slaveTasks = new Dictionary<Guid, SnapshotInfo>(); 73 74 cts = new CancellationTokenSource(); 75 ct = cts.Token; 76 waitHandle = new AutoResetEvent(true); 77 wcfService = WcfService.Instance; 78 checkpointInterval = Settings.Default.CheckpointInterval; 79 checkpointCheckInterval = Settings.Default.CheckpointCheckInterval; 80 81 System.Threading.Tasks.Task.Factory.StartNew(Checkpointing, ct); 82 } 83 84 #region Checkpointing 85 private void Checkpointing() { 86 while (!ct.IsCancellationRequested) { 87 slaveTasksLocker.EnterWriteLock(); 88 try { 89 foreach (var entry in slaveTasks) { 90 var taskId = entry.Key; 91 var snapshotInfo = entry.Value; 92 93 if (DateTime.Now - snapshotInfo.LastSnapshot <= checkpointInterval) continue; 94 95 var task = wcfService.GetTask(taskId); 96 var snapshot = snapshotInfo.Task.GetTaskDataSnapshot(); 97 98 if (snapshot == null) continue; 99 100 slaveTasks[taskId].LastSnapshot = snapshot.Item2; 101 var slaveId = ConfigManager.Instance.GetClientInfo().Id; 102 wcfService.UpdateTaskData(task, snapshot.Item1, slaveId, TaskState.Calculating); 103 } 104 } finally { slaveTasksLocker.ExitWriteLock(); } 105 waitHandle.WaitOne(checkpointCheckInterval); 106 } 107 } 108 109 public void StopCheckpointing() { 110 cts.Cancel(); 111 waitHandle.Set(); 112 waitHandle.Close(); 113 } 114 #endregion 66 115 67 116 #region Task Control methods … … 97 146 try { 98 147 if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId); 99 SlaveTask slaveTask = slaveTasks[taskId] ;148 SlaveTask slaveTask = slaveTasks[taskId].Task; 100 149 slaveTask.PauseTask(); 101 150 } … … 107 156 try { 108 157 if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId); 109 SlaveTask slaveTask = slaveTasks[taskId] ;158 SlaveTask slaveTask = slaveTasks[taskId].Task; 110 159 slaveTask.StopTask(); 111 160 } … … 118 167 try { 119 168 if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId); 120 slaveTask = slaveTasks[taskId] ;169 slaveTask = slaveTasks[taskId].Task; 121 170 if (!slaveTask.IsPrepared) throw new AppDomainNotCreatedException(); 122 171 RemoveSlaveTask(taskId, slaveTask); … … 132 181 try { 133 182 foreach (var slaveTask in slaveTasks.Values) { 134 slaveTask. PauseTask();183 slaveTask.Task.PauseTask(); 135 184 } 136 185 } … … 142 191 try { 143 192 foreach (var slaveTask in slaveTasks.Values) { 144 slaveTask. StopTask();193 slaveTask.Task.StopTask(); 145 194 } 146 195 } … … 152 201 try { 153 202 foreach (var slaveTask in slaveTasks.Values.ToArray()) { 154 AbortTask(slaveTask.Task Id);203 AbortTask(slaveTask.Task.TaskId); 155 204 } 156 205 } … … 163 212 slaveTasksLocker.EnterWriteLock(); 164 213 try { 165 slaveTasks.Add(task.Id, slaveTask);214 slaveTasks.Add(task.Id, new SnapshotInfo { Task = slaveTask, LastSnapshot = task.DateCreated.GetValueOrDefault() }); 166 215 RegisterSlaveTaskEvents(slaveTask); 167 216 } … … 198 247 slaveTasksLocker.EnterUpgradeableReadLock(); 199 248 try { 200 slaveTask = slaveTasks[e.Value] ;249 slaveTask = slaveTasks[e.Value].Task; 201 250 } 202 251 finally { slaveTasksLocker.ExitUpgradeableReadLock(); } … … 210 259 slaveTasksLocker.EnterUpgradeableReadLock(); 211 260 try { 212 slaveTask = slaveTasks[e.Value] ;261 slaveTask = slaveTasks[e.Value].Task; 213 262 RemoveSlaveTask(e.Value, slaveTask); 214 263 } … … 232 281 slaveTasksLocker.EnterUpgradeableReadLock(); 233 282 try { 234 slaveTask = slaveTasks[e.Value] ;283 slaveTask = slaveTasks[e.Value].Task; 235 284 RemoveSlaveTask(e.Value, slaveTask); 236 285 } … … 254 303 slaveTasksLocker.EnterUpgradeableReadLock(); 255 304 try { 256 slaveTask = slaveTasks[e.Value] ;305 slaveTask = slaveTasks[e.Value].Task; 257 306 RemoveSlaveTask(e.Value, slaveTask); 258 307 } … … 304 353 slaveTasksLocker.EnterReadLock(); 305 354 try { 306 return slaveTasks.ToDictionary(x => x.Key, x => x.Value. ExecutionTime);355 return slaveTasks.ToDictionary(x => x.Key, x => x.Value.Task.ExecutionTime); 307 356 } 308 357 finally { slaveTasksLocker.ExitReadLock(); } 358 } 359 360 private sealed class SnapshotInfo { 361 public SlaveTask Task { get; set; } 362 public DateTime LastSnapshot { get; set; } 309 363 } 310 364 }
Note: See TracChangeset
for help on using the changeset viewer.