Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
05/30/17 18:55:38 (7 years ago)
Author:
jkarder
Message:

#2791: improved checkpointing (task is paused and sent back to the server, new one is assigned via next heartbeat)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Manager/TaskManager.cs

    r14185 r15004  
    3737  public class TaskManager {
    3838    private static readonly ReaderWriterLockSlim slaveTasksLocker = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    39     private readonly Dictionary<Guid, SnapshotInfo> slaveTasks;
     39    private readonly Dictionary<Guid, Tuple<SlaveTask, DateTime>> slaveTasks;
    4040    private readonly ILog log;
    4141    private readonly PluginManager pluginManager;
     
    4343    private readonly CancellationToken ct;
    4444    private readonly AutoResetEvent waitHandle;
    45     private readonly WcfService wcfService;
    4645    private readonly TimeSpan checkpointInterval;
    4746    private readonly TimeSpan checkpointCheckInterval;
     
    7069      this.pluginManager = pluginCache;
    7170      this.log = log;
    72       this.slaveTasks = new Dictionary<Guid, SnapshotInfo>();
     71      this.slaveTasks = new Dictionary<Guid, Tuple<SlaveTask, DateTime>>();
    7372
    7473      cts = new CancellationTokenSource();
    7574      ct = cts.Token;
    7675      waitHandle = new AutoResetEvent(true);
    77       wcfService = WcfService.Instance;
    7876      checkpointInterval = Settings.Default.CheckpointInterval;
    7977      checkpointCheckInterval = Settings.Default.CheckpointCheckInterval;
     
    8583    private void Checkpointing() {
    8684      while (!ct.IsCancellationRequested) {
    87         slaveTasksLocker.EnterWriteLock();
     85        slaveTasksLocker.EnterUpgradeableReadLock();
    8886        try {
    8987          foreach (var entry in slaveTasks) {
    90             var taskId = entry.Key;
    91             var snapshotInfo = entry.Value;
    92 
    93             if (DateTime.Now - snapshotInfo.LastSnapshot <= checkpointInterval) continue;
    94 
    95             var task = wcfService.GetTask(taskId);
    96             var snapshot = snapshotInfo.Task.GetTaskDataSnapshot();
    97 
    98             if (snapshot == null) continue;
    99 
    100             slaveTasks[taskId].LastSnapshot = snapshot.Item2;
    101             var slaveId = ConfigManager.Instance.GetClientInfo().Id;
    102             wcfService.UpdateTaskData(task, snapshot.Item1, slaveId, TaskState.Calculating);
     88            if (DateTime.Now - entry.Value.Item2 > checkpointInterval)
     89              PauseTaskAsync(entry.Key);
    10390          }
    104         } finally { slaveTasksLocker.ExitWriteLock(); }
     91        } finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
    10592        waitHandle.WaitOne(checkpointCheckInterval);
    10693      }
     
    146133      try {
    147134        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
    148         SlaveTask slaveTask = slaveTasks[taskId].Task;
     135        SlaveTask slaveTask = slaveTasks[taskId].Item1;
    149136        slaveTask.PauseTask();
    150137      }
     
    156143      try {
    157144        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
    158         SlaveTask slaveTask = slaveTasks[taskId].Task;
     145        SlaveTask slaveTask = slaveTasks[taskId].Item1;
    159146        slaveTask.StopTask();
    160147      }
     
    167154      try {
    168155        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
    169         slaveTask = slaveTasks[taskId].Task;
     156        slaveTask = slaveTasks[taskId].Item1;
    170157        if (!slaveTask.IsPrepared) throw new AppDomainNotCreatedException();
    171158        RemoveSlaveTask(taskId, slaveTask);
     
    181168      try {
    182169        foreach (var slaveTask in slaveTasks.Values) {
    183           slaveTask.Task.PauseTask();
     170          slaveTask.Item1.PauseTask();
    184171        }
    185172      }
     
    191178      try {
    192179        foreach (var slaveTask in slaveTasks.Values) {
    193           slaveTask.Task.StopTask();
     180          slaveTask.Item1.StopTask();
    194181        }
    195182      }
     
    201188      try {
    202189        foreach (var slaveTask in slaveTasks.Values.ToArray()) {
    203           AbortTask(slaveTask.Task.TaskId);
     190          AbortTask(slaveTask.Item1.TaskId);
    204191        }
    205192      }
     
    212199      slaveTasksLocker.EnterWriteLock();
    213200      try {
    214         slaveTasks.Add(task.Id, new SnapshotInfo { Task = slaveTask, LastSnapshot = task.DateCreated.GetValueOrDefault() });
     201        slaveTasks.Add(task.Id, Tuple.Create(slaveTask, DateTime.Now));
    215202        RegisterSlaveTaskEvents(slaveTask);
    216203      }
     
    247234      slaveTasksLocker.EnterUpgradeableReadLock();
    248235      try {
    249         slaveTask = slaveTasks[e.Value].Task;
     236        slaveTask = slaveTasks[e.Value].Item1;
    250237      }
    251238      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
     
    259246      slaveTasksLocker.EnterUpgradeableReadLock();
    260247      try {
    261         slaveTask = slaveTasks[e.Value].Task;
     248        slaveTask = slaveTasks[e.Value].Item1;
    262249        RemoveSlaveTask(e.Value, slaveTask);
    263250      }
     
    281268      slaveTasksLocker.EnterUpgradeableReadLock();
    282269      try {
    283         slaveTask = slaveTasks[e.Value].Task;
     270        slaveTask = slaveTasks[e.Value].Item1;
    284271        RemoveSlaveTask(e.Value, slaveTask);
    285272      }
     
    303290      slaveTasksLocker.EnterUpgradeableReadLock();
    304291      try {
    305         slaveTask = slaveTasks[e.Value].Task;
     292        slaveTask = slaveTasks[e.Value].Item1;
    306293        RemoveSlaveTask(e.Value, slaveTask);
    307294      }
     
    353340      slaveTasksLocker.EnterReadLock();
    354341      try {
    355         return slaveTasks.ToDictionary(x => x.Key, x => x.Value.Task.ExecutionTime);
     342        return slaveTasks.ToDictionary(x => x.Key, x => x.Value.Item1.ExecutionTime);
    356343      }
    357344      finally { slaveTasksLocker.ExitReadLock(); }
    358     }
    359 
    360     private sealed class SnapshotInfo {
    361       public SlaveTask Task { get; set; }
    362       public DateTime LastSnapshot { get; set; }
    363345    }
    364346  }
Note: See TracChangeset for help on using the changeset viewer.