Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
08/27/15 14:52:42 (9 years ago)
Author:
jkarder
Message:

#2468: implemented checkpointing

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Manager/TaskManager.cs

    r12012 r12920  
    2424using System.Linq;
    2525using System.Threading;
     26using HeuristicLab.Clients.Hive.SlaveCore.Properties;
    2627using HeuristicLab.Common;
    2728using HeuristicLab.Core;
     
    3233  /// Holds a list of slave tasks and manages access to this list.
    3334  /// Forwards events from SlaveTask and forwards commands to SlaveTask.
     35  /// Periodically sends task data to the server to avoid loss of progress when the slave crashes.
    3436  /// </summary>
    3537  public class TaskManager {
    36     private static ReaderWriterLockSlim slaveTasksLocker = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    37     private Dictionary<Guid, SlaveTask> slaveTasks;
    38     private ILog log;
    39     private PluginManager pluginManager;
     38    private static readonly ReaderWriterLockSlim slaveTasksLocker = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
     39    private readonly Dictionary<Guid, SnapshotInfo> slaveTasks;
     40    private readonly ILog log;
     41    private readonly PluginManager pluginManager;
     42    private readonly CancellationTokenSource cts;
     43    private readonly CancellationToken ct;
     44    private readonly AutoResetEvent waitHandle;
     45    private readonly WcfService wcfService;
     46    private readonly TimeSpan checkpointInterval;
     47    private readonly TimeSpan checkpointCheckInterval;
    4048
    4149    public int TaskCount {
     
    6270      this.pluginManager = pluginCache;
    6371      this.log = log;
    64       this.slaveTasks = new Dictionary<Guid, SlaveTask>();
    65     }
     72      this.slaveTasks = new Dictionary<Guid, SnapshotInfo>();
     73
     74      cts = new CancellationTokenSource();
     75      ct = cts.Token;
     76      waitHandle = new AutoResetEvent(true);
     77      wcfService = WcfService.Instance;
     78      checkpointInterval = Settings.Default.CheckpointInterval;
     79      checkpointCheckInterval = Settings.Default.CheckpointCheckInterval;
     80
     81      System.Threading.Tasks.Task.Factory.StartNew(Checkpointing, ct);
     82    }
     83
     84    #region Checkpointing
     85    private void Checkpointing() {
     86      while (!ct.IsCancellationRequested) {
     87        slaveTasksLocker.EnterWriteLock();
     88        try {
     89          foreach (var entry in slaveTasks) {
     90            var taskId = entry.Key;
     91            var snapshotInfo = entry.Value;
     92
     93            if (DateTime.Now - snapshotInfo.LastSnapshot <= checkpointInterval) continue;
     94
     95            var task = wcfService.GetTask(taskId);
     96            var snapshot = snapshotInfo.Task.GetTaskDataSnapshot();
     97
     98            if (snapshot == null) continue;
     99
     100            slaveTasks[taskId].LastSnapshot = snapshot.Item2;
     101            var slaveId = ConfigManager.Instance.GetClientInfo().Id;
     102            wcfService.UpdateTaskData(task, snapshot.Item1, slaveId, TaskState.Calculating);
     103          }
     104        } finally { slaveTasksLocker.ExitWriteLock(); }
     105        waitHandle.WaitOne(checkpointCheckInterval);
     106      }
     107    }
     108
     109    public void StopCheckpointing() {
     110      cts.Cancel();
     111      waitHandle.Set();
     112      waitHandle.Close();
     113    }
     114    #endregion
    66115
    67116    #region Task Control methods
     
    97146      try {
    98147        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
    99         SlaveTask slaveTask = slaveTasks[taskId];
     148        SlaveTask slaveTask = slaveTasks[taskId].Task;
    100149        slaveTask.PauseTask();
    101150      }
     
    107156      try {
    108157        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
    109         SlaveTask slaveTask = slaveTasks[taskId];
     158        SlaveTask slaveTask = slaveTasks[taskId].Task;
    110159        slaveTask.StopTask();
    111160      }
     
    118167      try {
    119168        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
    120         slaveTask = slaveTasks[taskId];
     169        slaveTask = slaveTasks[taskId].Task;
    121170        if (!slaveTask.IsPrepared) throw new AppDomainNotCreatedException();
    122171        RemoveSlaveTask(taskId, slaveTask);
     
    132181      try {
    133182        foreach (var slaveTask in slaveTasks.Values) {
    134           slaveTask.PauseTask();
     183          slaveTask.Task.PauseTask();
    135184        }
    136185      }
     
    142191      try {
    143192        foreach (var slaveTask in slaveTasks.Values) {
    144           slaveTask.StopTask();
     193          slaveTask.Task.StopTask();
    145194        }
    146195      }
     
    152201      try {
    153202        foreach (var slaveTask in slaveTasks.Values.ToArray()) {
    154           AbortTask(slaveTask.TaskId);
     203          AbortTask(slaveTask.Task.TaskId);
    155204        }
    156205      }
     
    163212      slaveTasksLocker.EnterWriteLock();
    164213      try {
    165         slaveTasks.Add(task.Id, slaveTask);
     214        slaveTasks.Add(task.Id, new SnapshotInfo { Task = slaveTask, LastSnapshot = task.DateCreated.GetValueOrDefault() });
    166215        RegisterSlaveTaskEvents(slaveTask);
    167216      }
     
    198247      slaveTasksLocker.EnterUpgradeableReadLock();
    199248      try {
    200         slaveTask = slaveTasks[e.Value];
     249        slaveTask = slaveTasks[e.Value].Task;
    201250      }
    202251      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
     
    210259      slaveTasksLocker.EnterUpgradeableReadLock();
    211260      try {
    212         slaveTask = slaveTasks[e.Value];
     261        slaveTask = slaveTasks[e.Value].Task;
    213262        RemoveSlaveTask(e.Value, slaveTask);
    214263      }
     
    232281      slaveTasksLocker.EnterUpgradeableReadLock();
    233282      try {
    234         slaveTask = slaveTasks[e.Value];
     283        slaveTask = slaveTasks[e.Value].Task;
    235284        RemoveSlaveTask(e.Value, slaveTask);
    236285      }
     
    254303      slaveTasksLocker.EnterUpgradeableReadLock();
    255304      try {
    256         slaveTask = slaveTasks[e.Value];
     305        slaveTask = slaveTasks[e.Value].Task;
    257306        RemoveSlaveTask(e.Value, slaveTask);
    258307      }
     
    304353      slaveTasksLocker.EnterReadLock();
    305354      try {
    306         return slaveTasks.ToDictionary(x => x.Key, x => x.Value.ExecutionTime);
     355        return slaveTasks.ToDictionary(x => x.Key, x => x.Value.Task.ExecutionTime);
    307356      }
    308357      finally { slaveTasksLocker.ExitReadLock(); }
     358    }
     359
     360    private sealed class SnapshotInfo {
     361      public SlaveTask Task { get; set; }
     362      public DateTime LastSnapshot { get; set; }
    309363    }
    310364  }
Note: See TracChangeset for help on using the changeset viewer.