Free cookie consent management tool by TermsFeed Policy Generator

Changeset 15004 for trunk


Ignore:
Timestamp:
05/30/17 18:55:38 (7 years ago)
Author:
jkarder
Message:

#2791: improved checkpointing (task is paused and sent back to the server, new one is assigned via next heartbeat)

Location:
trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Executor.cs

    r14185 r15004  
    185185    #endregion
    186186
    187     public Tuple<TaskData, DateTime> GetTaskDataSnapshot() {
    188       if (taskDataInvalid) return null;
    189 
    190       Tuple<TaskData, DateTime> snapshot = null;
    191       if (task == null) {
    192         if (CurrentException == null) {
    193           CurrentException = new Exception("Task with id " + this.TaskId + " is null, sending empty task");
    194         }
    195       } else {
    196         var taskData = new TaskData();
    197 
    198         var pausedTrigger = new EventWaitHandle(false, EventResetMode.ManualReset);
    199         EventHandler pausedHandler = null;
    200         pausedHandler = (s, e) => {
    201           task.TaskPaused -= pausedHandler;
    202           task.TaskPaused += Task_TaskPaused;
    203           pausedTrigger.Set();
    204         };
    205 
    206         task.TaskPaused -= Task_TaskPaused;
    207         task.TaskPaused += pausedHandler;
    208         task.Pause();
    209         pausedTrigger.WaitOne();
    210 
    211         taskData.Data = PersistenceUtil.Serialize(task);
    212         var timestamp = DateTime.Now;
    213 
    214         EventHandler startedHandler = null;
    215         startedHandler = (s, e) => {
    216           task.TaskStarted -= startedHandler;
    217           task.TaskStarted += Task_TaskStarted;
    218         };
    219 
    220         task.TaskStarted -= Task_TaskStarted;
    221         task.TaskStarted += startedHandler;
    222         task.Start();
    223 
    224         taskData.TaskId = TaskId;
    225         snapshot = Tuple.Create(taskData, timestamp);
    226       }
    227 
    228       return snapshot;
    229     }
    230 
    231187    public TaskData GetTaskData() {
    232188      if (taskDataInvalid) return null;
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Manager/TaskManager.cs

    r14185 r15004  
    3737  public class TaskManager {
    3838    private static readonly ReaderWriterLockSlim slaveTasksLocker = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    39     private readonly Dictionary<Guid, SnapshotInfo> slaveTasks;
     39    private readonly Dictionary<Guid, Tuple<SlaveTask, DateTime>> slaveTasks;
    4040    private readonly ILog log;
    4141    private readonly PluginManager pluginManager;
     
    4343    private readonly CancellationToken ct;
    4444    private readonly AutoResetEvent waitHandle;
    45     private readonly WcfService wcfService;
    4645    private readonly TimeSpan checkpointInterval;
    4746    private readonly TimeSpan checkpointCheckInterval;
     
    7069      this.pluginManager = pluginCache;
    7170      this.log = log;
    72       this.slaveTasks = new Dictionary<Guid, SnapshotInfo>();
     71      this.slaveTasks = new Dictionary<Guid, Tuple<SlaveTask, DateTime>>();
    7372
    7473      cts = new CancellationTokenSource();
    7574      ct = cts.Token;
    7675      waitHandle = new AutoResetEvent(true);
    77       wcfService = WcfService.Instance;
    7876      checkpointInterval = Settings.Default.CheckpointInterval;
    7977      checkpointCheckInterval = Settings.Default.CheckpointCheckInterval;
     
    8583    private void Checkpointing() {
    8684      while (!ct.IsCancellationRequested) {
    87         slaveTasksLocker.EnterWriteLock();
     85        slaveTasksLocker.EnterUpgradeableReadLock();
    8886        try {
    8987          foreach (var entry in slaveTasks) {
    90             var taskId = entry.Key;
    91             var snapshotInfo = entry.Value;
    92 
    93             if (DateTime.Now - snapshotInfo.LastSnapshot <= checkpointInterval) continue;
    94 
    95             var task = wcfService.GetTask(taskId);
    96             var snapshot = snapshotInfo.Task.GetTaskDataSnapshot();
    97 
    98             if (snapshot == null) continue;
    99 
    100             slaveTasks[taskId].LastSnapshot = snapshot.Item2;
    101             var slaveId = ConfigManager.Instance.GetClientInfo().Id;
    102             wcfService.UpdateTaskData(task, snapshot.Item1, slaveId, TaskState.Calculating);
     88            if (DateTime.Now - entry.Value.Item2 > checkpointInterval)
     89              PauseTaskAsync(entry.Key);
    10390          }
    104         } finally { slaveTasksLocker.ExitWriteLock(); }
     91        } finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
    10592        waitHandle.WaitOne(checkpointCheckInterval);
    10693      }
     
    146133      try {
    147134        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
    148         SlaveTask slaveTask = slaveTasks[taskId].Task;
     135        SlaveTask slaveTask = slaveTasks[taskId].Item1;
    149136        slaveTask.PauseTask();
    150137      }
     
    156143      try {
    157144        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
    158         SlaveTask slaveTask = slaveTasks[taskId].Task;
     145        SlaveTask slaveTask = slaveTasks[taskId].Item1;
    159146        slaveTask.StopTask();
    160147      }
     
    167154      try {
    168155        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
    169         slaveTask = slaveTasks[taskId].Task;
     156        slaveTask = slaveTasks[taskId].Item1;
    170157        if (!slaveTask.IsPrepared) throw new AppDomainNotCreatedException();
    171158        RemoveSlaveTask(taskId, slaveTask);
     
    181168      try {
    182169        foreach (var slaveTask in slaveTasks.Values) {
    183           slaveTask.Task.PauseTask();
     170          slaveTask.Item1.PauseTask();
    184171        }
    185172      }
     
    191178      try {
    192179        foreach (var slaveTask in slaveTasks.Values) {
    193           slaveTask.Task.StopTask();
     180          slaveTask.Item1.StopTask();
    194181        }
    195182      }
     
    201188      try {
    202189        foreach (var slaveTask in slaveTasks.Values.ToArray()) {
    203           AbortTask(slaveTask.Task.TaskId);
     190          AbortTask(slaveTask.Item1.TaskId);
    204191        }
    205192      }
     
    212199      slaveTasksLocker.EnterWriteLock();
    213200      try {
    214         slaveTasks.Add(task.Id, new SnapshotInfo { Task = slaveTask, LastSnapshot = task.DateCreated.GetValueOrDefault() });
     201        slaveTasks.Add(task.Id, Tuple.Create(slaveTask, DateTime.Now));
    215202        RegisterSlaveTaskEvents(slaveTask);
    216203      }
     
    247234      slaveTasksLocker.EnterUpgradeableReadLock();
    248235      try {
    249         slaveTask = slaveTasks[e.Value].Task;
     236        slaveTask = slaveTasks[e.Value].Item1;
    250237      }
    251238      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
     
    259246      slaveTasksLocker.EnterUpgradeableReadLock();
    260247      try {
    261         slaveTask = slaveTasks[e.Value].Task;
     248        slaveTask = slaveTasks[e.Value].Item1;
    262249        RemoveSlaveTask(e.Value, slaveTask);
    263250      }
     
    281268      slaveTasksLocker.EnterUpgradeableReadLock();
    282269      try {
    283         slaveTask = slaveTasks[e.Value].Task;
     270        slaveTask = slaveTasks[e.Value].Item1;
    284271        RemoveSlaveTask(e.Value, slaveTask);
    285272      }
     
    303290      slaveTasksLocker.EnterUpgradeableReadLock();
    304291      try {
    305         slaveTask = slaveTasks[e.Value].Task;
     292        slaveTask = slaveTasks[e.Value].Item1;
    306293        RemoveSlaveTask(e.Value, slaveTask);
    307294      }
     
    353340      slaveTasksLocker.EnterReadLock();
    354341      try {
    355         return slaveTasks.ToDictionary(x => x.Key, x => x.Value.Task.ExecutionTime);
     342        return slaveTasks.ToDictionary(x => x.Key, x => x.Value.Item1.ExecutionTime);
    356343      }
    357344      finally { slaveTasksLocker.ExitReadLock(); }
    358     }
    359 
    360     private sealed class SnapshotInfo {
    361       public SlaveTask Task { get; set; }
    362       public DateTime LastSnapshot { get; set; }
    363345    }
    364346  }
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/SlaveTask.cs

    r14185 r15004  
    165165    }
    166166
    167     public Tuple<TaskData, DateTime> GetTaskDataSnapshot() {
    168       Tuple<TaskData, DateTime> snapshot = null;
    169       try {
    170         snapshot = executor.GetTaskDataSnapshot();
    171         if (snapshot == null) return Tuple.Create(originalTaskData, DateTime.Now);
    172       }
    173       catch (Exception ex) {
    174         EventLogManager.LogException(ex);
    175       }
    176       return snapshot;
    177     }
    178 
    179167    public TaskData GetTaskData() {
    180168      TaskData data = null;
Note: See TracChangeset for help on using the changeset viewer.