Free cookie consent management tool by TermsFeed Policy Generator

source: branches/WebJobManager/HeuristicLab.Clients.Hive.Slave/3.3/Manager/TaskManager.cs

Last change on this file was 12920, checked in by jkarder, 9 years ago

#2468: implemented checkpointing

File size: 12.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2015 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Clients.Hive.SlaveCore.Properties;
27using HeuristicLab.Common;
28using HeuristicLab.Core;
29
30namespace HeuristicLab.Clients.Hive.SlaveCore {
31
32  /// <summary>
33  /// Holds a list of slave tasks and manages access to this list.
34  /// Forwards events from SlaveTask and forwards commands to SlaveTask.
35  /// Periodically sends task data to the server to avoid loss of progress when the slave crashes.
36  /// </summary>
37  public class TaskManager {
38    private static readonly ReaderWriterLockSlim slaveTasksLocker = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
39    private readonly Dictionary<Guid, SnapshotInfo> slaveTasks;
40    private readonly ILog log;
41    private readonly PluginManager pluginManager;
42    private readonly CancellationTokenSource cts;
43    private readonly CancellationToken ct;
44    private readonly AutoResetEvent waitHandle;
45    private readonly WcfService wcfService;
46    private readonly TimeSpan checkpointInterval;
47    private readonly TimeSpan checkpointCheckInterval;
48
49    public int TaskCount {
50      get {
51        slaveTasksLocker.EnterReadLock();
52        try {
53          return slaveTasks.Count;
54        }
55        finally { slaveTasksLocker.ExitReadLock(); }
56      }
57    }
58
59    public Guid[] TaskIds {
60      get {
61        slaveTasksLocker.EnterReadLock();
62        try {
63          return slaveTasks.Keys.ToArray();
64        }
65        finally { slaveTasksLocker.ExitReadLock(); }
66      }
67    }
68
69    public TaskManager(PluginManager pluginCache, ILog log) {
70      this.pluginManager = pluginCache;
71      this.log = log;
72      this.slaveTasks = new Dictionary<Guid, SnapshotInfo>();
73
74      cts = new CancellationTokenSource();
75      ct = cts.Token;
76      waitHandle = new AutoResetEvent(true);
77      wcfService = WcfService.Instance;
78      checkpointInterval = Settings.Default.CheckpointInterval;
79      checkpointCheckInterval = Settings.Default.CheckpointCheckInterval;
80
81      System.Threading.Tasks.Task.Factory.StartNew(Checkpointing, ct);
82    }
83
84    #region Checkpointing
85    private void Checkpointing() {
86      while (!ct.IsCancellationRequested) {
87        slaveTasksLocker.EnterWriteLock();
88        try {
89          foreach (var entry in slaveTasks) {
90            var taskId = entry.Key;
91            var snapshotInfo = entry.Value;
92
93            if (DateTime.Now - snapshotInfo.LastSnapshot <= checkpointInterval) continue;
94
95            var task = wcfService.GetTask(taskId);
96            var snapshot = snapshotInfo.Task.GetTaskDataSnapshot();
97
98            if (snapshot == null) continue;
99
100            slaveTasks[taskId].LastSnapshot = snapshot.Item2;
101            var slaveId = ConfigManager.Instance.GetClientInfo().Id;
102            wcfService.UpdateTaskData(task, snapshot.Item1, slaveId, TaskState.Calculating);
103          }
104        } finally { slaveTasksLocker.ExitWriteLock(); }
105        waitHandle.WaitOne(checkpointCheckInterval);
106      }
107    }
108
109    public void StopCheckpointing() {
110      cts.Cancel();
111      waitHandle.Set();
112      waitHandle.Close();
113    }
114    #endregion
115
116    #region Task Control methods
117    public void StartTaskAsync(Task task, TaskData taskData) {
118      SlaveTask slaveTask = null;
119      slaveTasksLocker.EnterUpgradeableReadLock();
120      try {
121        if (slaveTasks.ContainsKey(task.Id)) {
122          SlaveStatusInfo.IncrementTasksFailed();
123          throw new TaskAlreadyRunningException(task.Id);
124        } else {
125          slaveTask = new SlaveTask(pluginManager, task.CoresNeeded, log);
126          AddSlaveTask(task, slaveTask);
127          SlaveStatusInfo.IncrementTasksFetched();
128        }
129      }
130      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
131
132      if (slaveTask != null) {
133        try {
134          slaveTask.StartJobAsync(task, taskData);
135        }
136        catch (Exception) {
137          RemoveSlaveTask(task.Id, slaveTask); // clean up and rethrow
138          slaveTask.DisposeAppDomain();
139          throw;
140        }
141      }
142    }
143
144    public void PauseTaskAsync(Guid taskId) {
145      slaveTasksLocker.EnterUpgradeableReadLock();
146      try {
147        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
148        SlaveTask slaveTask = slaveTasks[taskId].Task;
149        slaveTask.PauseTask();
150      }
151      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
152    }
153
154    public void StopTaskAsync(Guid taskId) {
155      slaveTasksLocker.EnterUpgradeableReadLock();
156      try {
157        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
158        SlaveTask slaveTask = slaveTasks[taskId].Task;
159        slaveTask.StopTask();
160      }
161      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
162    }
163
164    public void AbortTask(Guid taskId) {
165      SlaveTask slaveTask = null;
166      slaveTasksLocker.EnterUpgradeableReadLock();
167      try {
168        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
169        slaveTask = slaveTasks[taskId].Task;
170        if (!slaveTask.IsPrepared) throw new AppDomainNotCreatedException();
171        RemoveSlaveTask(taskId, slaveTask);
172      }
173      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
174      slaveTask.DisposeAppDomain();
175      SlaveStatusInfo.IncrementTasksAborted();
176      OnTaskAborted(slaveTask);
177    }
178
179    public void PauseAllTasksAsync() {
180      slaveTasksLocker.EnterUpgradeableReadLock();
181      try {
182        foreach (var slaveTask in slaveTasks.Values) {
183          slaveTask.Task.PauseTask();
184        }
185      }
186      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
187    }
188
189    public void StopAllTasksAsync() {
190      slaveTasksLocker.EnterUpgradeableReadLock();
191      try {
192        foreach (var slaveTask in slaveTasks.Values) {
193          slaveTask.Task.StopTask();
194        }
195      }
196      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
197    }
198
199    public void AbortAllTasks() {
200      slaveTasksLocker.EnterUpgradeableReadLock();
201      try {
202        foreach (var slaveTask in slaveTasks.Values.ToArray()) {
203          AbortTask(slaveTask.Task.TaskId);
204        }
205      }
206      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
207    }
208    #endregion
209
210    #region Add/Remove SlaveTask
211    private void AddSlaveTask(Task task, SlaveTask slaveTask) {
212      slaveTasksLocker.EnterWriteLock();
213      try {
214        slaveTasks.Add(task.Id, new SnapshotInfo { Task = slaveTask, LastSnapshot = task.DateCreated.GetValueOrDefault() });
215        RegisterSlaveTaskEvents(slaveTask);
216      }
217      finally { slaveTasksLocker.ExitWriteLock(); }
218    }
219
220    private void RemoveSlaveTask(Guid taskId, SlaveTask slaveTask) {
221      slaveTasksLocker.EnterWriteLock();
222      try {
223        slaveTasks.Remove(taskId);
224        DeregisterSlaveTaskEvents(slaveTask);
225      }
226      finally { slaveTasksLocker.ExitWriteLock(); }
227    }
228    #endregion
229
230    #region SlaveTask Events
231    private void RegisterSlaveTaskEvents(SlaveTask slaveTask) {
232      slaveTask.TaskStarted += new EventHandler<EventArgs<Guid>>(slaveTask_TaskStarted);
233      slaveTask.TaskPaused += new EventHandler<EventArgs<Guid>>(slaveTask_TaskPaused);
234      slaveTask.TaskStopped += new EventHandler<EventArgs<Guid>>(slaveTask_TaskStopped);
235      slaveTask.TaskFailed += new EventHandler<EventArgs<Guid, Exception>>(slaveTask_TaskFailed);
236    }
237
238    private void DeregisterSlaveTaskEvents(SlaveTask slaveTask) {
239      slaveTask.TaskStarted -= new EventHandler<EventArgs<Guid>>(slaveTask_TaskStarted);
240      slaveTask.TaskPaused -= new EventHandler<EventArgs<Guid>>(slaveTask_TaskPaused);
241      slaveTask.TaskStopped -= new EventHandler<EventArgs<Guid>>(slaveTask_TaskStopped);
242      slaveTask.TaskFailed -= new EventHandler<EventArgs<Guid, Exception>>(slaveTask_TaskFailed);
243    }
244
245    private void slaveTask_TaskStarted(object sender, EventArgs<Guid> e) {
246      SlaveTask slaveTask;
247      slaveTasksLocker.EnterUpgradeableReadLock();
248      try {
249        slaveTask = slaveTasks[e.Value].Task;
250      }
251      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
252
253      SlaveStatusInfo.IncrementTasksStarted();
254      OnTaskStarted(slaveTask);
255    }
256
257    private void slaveTask_TaskPaused(object sender, EventArgs<Guid> e) {
258      SlaveTask slaveTask;
259      slaveTasksLocker.EnterUpgradeableReadLock();
260      try {
261        slaveTask = slaveTasks[e.Value].Task;
262        RemoveSlaveTask(e.Value, slaveTask);
263      }
264      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
265
266      TaskData taskData = null;
267      try {
268        taskData = slaveTask.GetTaskData();
269        SlaveStatusInfo.IncrementTasksFinished();
270        OnTaskPaused(slaveTask, taskData);
271      }
272      catch (Exception ex) {
273        RemoveSlaveTask(e.Value, slaveTask);
274        SlaveStatusInfo.IncrementTasksFailed();
275        OnTaskFailed(slaveTask, taskData, ex);
276      }
277    }
278
279    private void slaveTask_TaskStopped(object sender, EventArgs<Guid> e) {
280      SlaveTask slaveTask;
281      slaveTasksLocker.EnterUpgradeableReadLock();
282      try {
283        slaveTask = slaveTasks[e.Value].Task;
284        RemoveSlaveTask(e.Value, slaveTask);
285      }
286      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
287
288      TaskData taskData = null;
289      try {
290        taskData = slaveTask.GetTaskData();
291        SlaveStatusInfo.IncrementTasksFinished();
292        OnTaskStopped(slaveTask, taskData);
293      }
294      catch (Exception ex) {
295        RemoveSlaveTask(e.Value, slaveTask);
296        SlaveStatusInfo.IncrementTasksFailed();
297        OnTaskFailed(slaveTask, taskData, ex);
298      }
299    }
300
301    private void slaveTask_TaskFailed(object sender, EventArgs<Guid, Exception> e) {
302      SlaveTask slaveTask;
303      slaveTasksLocker.EnterUpgradeableReadLock();
304      try {
305        slaveTask = slaveTasks[e.Value].Task;
306        RemoveSlaveTask(e.Value, slaveTask);
307      }
308      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
309
310      TaskData taskData = null;
311      try {
312        taskData = slaveTask.GetTaskData();
313      }
314      catch { /* taskData will be null */ }
315      SlaveStatusInfo.IncrementTasksFailed();
316      OnTaskFailed(slaveTask, taskData, e.Value2);
317    }
318    #endregion
319
320    #region EventHandler
321    public event EventHandler<EventArgs<SlaveTask>> TaskStarted;
322    private void OnTaskStarted(SlaveTask slaveTask) {
323      var handler = TaskStarted;
324      if (handler != null) handler(this, new EventArgs<SlaveTask>(slaveTask));
325    }
326
327    public event EventHandler<EventArgs<SlaveTask, TaskData>> TaskStopped;
328    private void OnTaskStopped(SlaveTask slaveTask, TaskData taskData) {
329      var handler = TaskStopped;
330      if (handler != null) handler(this, new EventArgs<SlaveTask, TaskData>(slaveTask, taskData));
331    }
332
333    public event EventHandler<EventArgs<SlaveTask, TaskData>> TaskPaused;
334    private void OnTaskPaused(SlaveTask slaveTask, TaskData taskData) {
335      var handler = TaskPaused;
336      if (handler != null) handler(this, new EventArgs<SlaveTask, TaskData>(slaveTask, taskData));
337    }
338
339    public event EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>> TaskFailed;
340    private void OnTaskFailed(SlaveTask slaveTask, TaskData taskData, Exception exception) {
341      var handler = TaskFailed;
342      if (handler != null) handler(this, new EventArgs<Tuple<SlaveTask, TaskData, Exception>>(new Tuple<SlaveTask, TaskData, Exception>(slaveTask, taskData, exception)));
343    }
344
345    public event EventHandler<EventArgs<SlaveTask>> TaskAborted;
346    private void OnTaskAborted(SlaveTask slaveTask) {
347      var handler = TaskAborted;
348      if (handler != null) handler(this, new EventArgs<SlaveTask>(slaveTask));
349    }
350    #endregion
351
352    public Dictionary<Guid, TimeSpan> GetExecutionTimes() {
353      slaveTasksLocker.EnterReadLock();
354      try {
355        return slaveTasks.ToDictionary(x => x.Key, x => x.Value.Task.ExecutionTime);
356      }
357      finally { slaveTasksLocker.ExitReadLock(); }
358    }
359
360    private sealed class SnapshotInfo {
361      public SlaveTask Task { get; set; }
362      public DateTime LastSnapshot { get; set; }
363    }
364  }
365}
Note: See TracBrowser for help on using the repository browser.