Free cookie consent management tool by TermsFeed Policy Generator

Changeset 12920


Ignore:
Timestamp:
08/27/15 14:52:42 (9 years ago)
Author:
jkarder
Message:

#2468: implemented checkpointing

Location:
trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs

    r12491 r12920  
    481481      SlaveClientCom.Instance.LogMessage("Stopping heartbeat");
    482482      heartbeatManager.StopHeartBeat();
     483      SlaveClientCom.Instance.LogMessage("Stopping checkpointing");
     484      taskManager.StopCheckpointing();
     485
    483486      abortRequested = true;
    484487
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Executor.cs

    r12012 r12920  
    185185    #endregion
    186186
     187    public Tuple<TaskData, DateTime> GetTaskDataSnapshot() {
     188      if (taskDataInvalid) return null;
     189
     190      Tuple<TaskData, DateTime> snapshot = null;
     191      if (task == null) {
     192        if (CurrentException == null) {
     193          CurrentException = new Exception("Task with id " + this.TaskId + " is null, sending empty task");
     194        }
     195      } else {
     196        var taskData = new TaskData();
     197
     198        var pausedTrigger = new EventWaitHandle(false, EventResetMode.ManualReset);
     199        EventHandler pausedHandler = null;
     200        pausedHandler = (s, e) => {
     201          task.TaskPaused -= pausedHandler;
     202          task.TaskPaused += Task_TaskPaused;
     203          pausedTrigger.Set();
     204        };
     205
     206        task.TaskPaused -= Task_TaskPaused;
     207        task.TaskPaused += pausedHandler;
     208        task.Pause();
     209        pausedTrigger.WaitOne();
     210
     211        taskData.Data = PersistenceUtil.Serialize(task);
     212        var timestamp = DateTime.Now;
     213
     214        EventHandler startedHandler = null;
     215        startedHandler = (s, e) => {
     216          task.TaskStarted -= startedHandler;
     217          task.TaskStarted += Task_TaskStarted;
     218        };
     219
     220        task.TaskStarted -= Task_TaskStarted;
     221        task.TaskStarted += startedHandler;
     222        task.Start();
     223
     224        taskData.TaskId = TaskId;
     225        snapshot = Tuple.Create(taskData, timestamp);
     226      }
     227
     228      return snapshot;
     229    }
     230
    187231    public TaskData GetTaskData() {
    188232      if (taskDataInvalid) return null;
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Manager/TaskManager.cs

    r12012 r12920  
    2424using System.Linq;
    2525using System.Threading;
     26using HeuristicLab.Clients.Hive.SlaveCore.Properties;
    2627using HeuristicLab.Common;
    2728using HeuristicLab.Core;
     
    3233  /// Holds a list of slave tasks and manages access to this list.
    3334  /// Forwards events from SlaveTask and forwards commands to SlaveTask.
     35  /// Periodically sends task data to the server to avoid loss of progress when the slave crashes.
    3436  /// </summary>
    3537  public class TaskManager {
    36     private static ReaderWriterLockSlim slaveTasksLocker = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    37     private Dictionary<Guid, SlaveTask> slaveTasks;
    38     private ILog log;
    39     private PluginManager pluginManager;
     38    private static readonly ReaderWriterLockSlim slaveTasksLocker = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
     39    private readonly Dictionary<Guid, SnapshotInfo> slaveTasks;
     40    private readonly ILog log;
     41    private readonly PluginManager pluginManager;
     42    private readonly CancellationTokenSource cts;
     43    private readonly CancellationToken ct;
     44    private readonly AutoResetEvent waitHandle;
     45    private readonly WcfService wcfService;
     46    private readonly TimeSpan checkpointInterval;
     47    private readonly TimeSpan checkpointCheckInterval;
    4048
    4149    public int TaskCount {
     
    6270      this.pluginManager = pluginCache;
    6371      this.log = log;
    64       this.slaveTasks = new Dictionary<Guid, SlaveTask>();
    65     }
     72      this.slaveTasks = new Dictionary<Guid, SnapshotInfo>();
     73
     74      cts = new CancellationTokenSource();
     75      ct = cts.Token;
     76      waitHandle = new AutoResetEvent(true);
     77      wcfService = WcfService.Instance;
     78      checkpointInterval = Settings.Default.CheckpointInterval;
     79      checkpointCheckInterval = Settings.Default.CheckpointCheckInterval;
     80
     81      System.Threading.Tasks.Task.Factory.StartNew(Checkpointing, ct);
     82    }
     83
     84    #region Checkpointing
     85    private void Checkpointing() {
     86      while (!ct.IsCancellationRequested) {
     87        slaveTasksLocker.EnterWriteLock();
     88        try {
     89          foreach (var entry in slaveTasks) {
     90            var taskId = entry.Key;
     91            var snapshotInfo = entry.Value;
     92
     93            if (DateTime.Now - snapshotInfo.LastSnapshot <= checkpointInterval) continue;
     94
     95            var task = wcfService.GetTask(taskId);
     96            var snapshot = snapshotInfo.Task.GetTaskDataSnapshot();
     97
     98            if (snapshot == null) continue;
     99
     100            slaveTasks[taskId].LastSnapshot = snapshot.Item2;
     101            var slaveId = ConfigManager.Instance.GetClientInfo().Id;
     102            wcfService.UpdateTaskData(task, snapshot.Item1, slaveId, TaskState.Calculating);
     103          }
     104        } finally { slaveTasksLocker.ExitWriteLock(); }
     105        waitHandle.WaitOne(checkpointCheckInterval);
     106      }
     107    }
     108
     109    public void StopCheckpointing() {
     110      cts.Cancel();
     111      waitHandle.Set();
     112      waitHandle.Close();
     113    }
     114    #endregion
    66115
    67116    #region Task Control methods
     
    97146      try {
    98147        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
    99         SlaveTask slaveTask = slaveTasks[taskId];
     148        SlaveTask slaveTask = slaveTasks[taskId].Task;
    100149        slaveTask.PauseTask();
    101150      }
     
    107156      try {
    108157        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
    109         SlaveTask slaveTask = slaveTasks[taskId];
     158        SlaveTask slaveTask = slaveTasks[taskId].Task;
    110159        slaveTask.StopTask();
    111160      }
     
    118167      try {
    119168        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
    120         slaveTask = slaveTasks[taskId];
     169        slaveTask = slaveTasks[taskId].Task;
    121170        if (!slaveTask.IsPrepared) throw new AppDomainNotCreatedException();
    122171        RemoveSlaveTask(taskId, slaveTask);
     
    132181      try {
    133182        foreach (var slaveTask in slaveTasks.Values) {
    134           slaveTask.PauseTask();
     183          slaveTask.Task.PauseTask();
    135184        }
    136185      }
     
    142191      try {
    143192        foreach (var slaveTask in slaveTasks.Values) {
    144           slaveTask.StopTask();
     193          slaveTask.Task.StopTask();
    145194        }
    146195      }
     
    152201      try {
    153202        foreach (var slaveTask in slaveTasks.Values.ToArray()) {
    154           AbortTask(slaveTask.TaskId);
     203          AbortTask(slaveTask.Task.TaskId);
    155204        }
    156205      }
     
    163212      slaveTasksLocker.EnterWriteLock();
    164213      try {
    165         slaveTasks.Add(task.Id, slaveTask);
     214        slaveTasks.Add(task.Id, new SnapshotInfo { Task = slaveTask, LastSnapshot = task.DateCreated.GetValueOrDefault() });
    166215        RegisterSlaveTaskEvents(slaveTask);
    167216      }
     
    198247      slaveTasksLocker.EnterUpgradeableReadLock();
    199248      try {
    200         slaveTask = slaveTasks[e.Value];
     249        slaveTask = slaveTasks[e.Value].Task;
    201250      }
    202251      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
     
    210259      slaveTasksLocker.EnterUpgradeableReadLock();
    211260      try {
    212         slaveTask = slaveTasks[e.Value];
     261        slaveTask = slaveTasks[e.Value].Task;
    213262        RemoveSlaveTask(e.Value, slaveTask);
    214263      }
     
    232281      slaveTasksLocker.EnterUpgradeableReadLock();
    233282      try {
    234         slaveTask = slaveTasks[e.Value];
     283        slaveTask = slaveTasks[e.Value].Task;
    235284        RemoveSlaveTask(e.Value, slaveTask);
    236285      }
     
    254303      slaveTasksLocker.EnterUpgradeableReadLock();
    255304      try {
    256         slaveTask = slaveTasks[e.Value];
     305        slaveTask = slaveTasks[e.Value].Task;
    257306        RemoveSlaveTask(e.Value, slaveTask);
    258307      }
     
    304353      slaveTasksLocker.EnterReadLock();
    305354      try {
    306         return slaveTasks.ToDictionary(x => x.Key, x => x.Value.ExecutionTime);
     355        return slaveTasks.ToDictionary(x => x.Key, x => x.Value.Task.ExecutionTime);
    307356      }
    308357      finally { slaveTasksLocker.ExitReadLock(); }
     358    }
     359
     360    private sealed class SnapshotInfo {
     361      public SlaveTask Task { get; set; }
     362      public DateTime LastSnapshot { get; set; }
    309363    }
    310364  }
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Properties/Settings.Designer.cs

    r11651 r12920  
    299299            }
    300300        }
     301       
     302        [global::System.Configuration.UserScopedSettingAttribute()]
     303        [global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
     304        [global::System.Configuration.DefaultSettingValueAttribute("10:00:00")]
     305        public global::System.TimeSpan CheckpointInterval {
     306            get {
     307                return ((global::System.TimeSpan)(this["CheckpointInterval"]));
     308            }
     309            set {
     310                this["CheckpointInterval"] = value;
     311            }
     312        }
     313       
     314        [global::System.Configuration.UserScopedSettingAttribute()]
     315        [global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
     316        [global::System.Configuration.DefaultSettingValueAttribute("00:05:00")]
     317        public global::System.TimeSpan CheckpointCheckInterval {
     318            get {
     319                return ((global::System.TimeSpan)(this["CheckpointCheckInterval"]));
     320            }
     321            set {
     322                this["CheckpointCheckInterval"] = value;
     323            }
     324        }
    301325    }
    302326}
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Properties/Settings.settings

    r11651 r12920  
    7272      <Value Profile="(Default)">00:00:40</Value>
    7373    </Setting>
     74    <Setting Name="CheckpointInterval" Type="System.TimeSpan" Scope="User">
     75      <Value Profile="(Default)">10:00:00</Value>
     76    </Setting>
     77    <Setting Name="CheckpointCheckInterval" Type="System.TimeSpan" Scope="User">
     78      <Value Profile="(Default)">00:05:00</Value>
     79    </Setting>
    7480  </Settings>
    7581</SettingsFile>
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/SlaveTask.cs

    r12012 r12920  
    169169    }
    170170
     171    public Tuple<TaskData, DateTime> GetTaskDataSnapshot() {
     172      Tuple<TaskData, DateTime> snapshot = null;
     173      try {
     174        snapshot = executor.GetTaskDataSnapshot();
     175        if (snapshot == null) return Tuple.Create(originalTaskData, DateTime.Now);
     176      }
     177      catch (Exception ex) {
     178        EventLogManager.LogException(ex);
     179      }
     180      return snapshot;
     181    }
     182
    171183    public TaskData GetTaskData() {
    172184      TaskData data = null;
  • trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/app.config

    r11651 r12920  
    135135        <value>00:00:40</value>
    136136      </setting>
     137      <setting name="CheckpointInterval" serializeAs="String">
     138        <value>10:00:00</value>
     139      </setting>
     140      <setting name="CheckpointCheckInterval" serializeAs="String">
     141        <value>00:05:00</value>
     142      </setting>
    137143    </HeuristicLab.Clients.Hive.SlaveCore.Properties.Settings>
    138144    <HeuristicLab.Clients.Hive.Settings>
Note: See TracChangeset for help on using the changeset viewer.