source: trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Manager/TaskManager.cs @ 15004

Last change on this file since 15004 was 15004, checked in by jkarder, 5 years ago

#2791: improved checkpointing (task is paused and sent back to the server, new one is assigned via next heartbeat)

File size: 12.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2016 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Clients.Hive.SlaveCore.Properties;
27using HeuristicLab.Common;
28using HeuristicLab.Core;
29
30namespace HeuristicLab.Clients.Hive.SlaveCore {
31
32  /// <summary>
33  /// Holds a list of slave tasks and manages access to this list.
34  /// Forwards events from SlaveTask and forwards commands to SlaveTask.
35  /// Periodically sends task data to the server to avoid loss of progress when the slave crashes.
36  /// </summary>
37  public class TaskManager {
38    private static readonly ReaderWriterLockSlim slaveTasksLocker = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
39    private readonly Dictionary<Guid, Tuple<SlaveTask, DateTime>> slaveTasks;
40    private readonly ILog log;
41    private readonly PluginManager pluginManager;
42    private readonly CancellationTokenSource cts;
43    private readonly CancellationToken ct;
44    private readonly AutoResetEvent waitHandle;
45    private readonly TimeSpan checkpointInterval;
46    private readonly TimeSpan checkpointCheckInterval;
47
48    public int TaskCount {
49      get {
50        slaveTasksLocker.EnterReadLock();
51        try {
52          return slaveTasks.Count;
53        }
54        finally { slaveTasksLocker.ExitReadLock(); }
55      }
56    }
57
58    public Guid[] TaskIds {
59      get {
60        slaveTasksLocker.EnterReadLock();
61        try {
62          return slaveTasks.Keys.ToArray();
63        }
64        finally { slaveTasksLocker.ExitReadLock(); }
65      }
66    }
67
68    public TaskManager(PluginManager pluginCache, ILog log) {
69      this.pluginManager = pluginCache;
70      this.log = log;
71      this.slaveTasks = new Dictionary<Guid, Tuple<SlaveTask, DateTime>>();
72
73      cts = new CancellationTokenSource();
74      ct = cts.Token;
75      waitHandle = new AutoResetEvent(true);
76      checkpointInterval = Settings.Default.CheckpointInterval;
77      checkpointCheckInterval = Settings.Default.CheckpointCheckInterval;
78
79      System.Threading.Tasks.Task.Factory.StartNew(Checkpointing, ct);
80    }
81
82    #region Checkpointing
83    private void Checkpointing() {
84      while (!ct.IsCancellationRequested) {
85        slaveTasksLocker.EnterUpgradeableReadLock();
86        try {
87          foreach (var entry in slaveTasks) {
88            if (DateTime.Now - entry.Value.Item2 > checkpointInterval)
89              PauseTaskAsync(entry.Key);
90          }
91        } finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
92        waitHandle.WaitOne(checkpointCheckInterval);
93      }
94    }
95
96    public void StopCheckpointing() {
97      cts.Cancel();
98      waitHandle.Set();
99      waitHandle.Close();
100    }
101    #endregion
102
103    #region Task Control methods
104    public void StartTaskAsync(Task task, TaskData taskData) {
105      SlaveTask slaveTask = null;
106      slaveTasksLocker.EnterUpgradeableReadLock();
107      try {
108        if (slaveTasks.ContainsKey(task.Id)) {
109          SlaveStatusInfo.IncrementTasksFailed();
110          throw new TaskAlreadyRunningException(task.Id);
111        } else {
112          slaveTask = new SlaveTask(pluginManager, task.CoresNeeded, log);
113          AddSlaveTask(task, slaveTask);
114          SlaveStatusInfo.IncrementTasksFetched();
115        }
116      }
117      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
118
119      if (slaveTask != null) {
120        try {
121          slaveTask.StartJobAsync(task, taskData);
122        }
123        catch (Exception) {
124          RemoveSlaveTask(task.Id, slaveTask); // clean up and rethrow
125          slaveTask.DisposeAppDomain();
126          throw;
127        }
128      }
129    }
130
131    public void PauseTaskAsync(Guid taskId) {
132      slaveTasksLocker.EnterUpgradeableReadLock();
133      try {
134        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
135        SlaveTask slaveTask = slaveTasks[taskId].Item1;
136        slaveTask.PauseTask();
137      }
138      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
139    }
140
141    public void StopTaskAsync(Guid taskId) {
142      slaveTasksLocker.EnterUpgradeableReadLock();
143      try {
144        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
145        SlaveTask slaveTask = slaveTasks[taskId].Item1;
146        slaveTask.StopTask();
147      }
148      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
149    }
150
151    public void AbortTask(Guid taskId) {
152      SlaveTask slaveTask = null;
153      slaveTasksLocker.EnterUpgradeableReadLock();
154      try {
155        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
156        slaveTask = slaveTasks[taskId].Item1;
157        if (!slaveTask.IsPrepared) throw new AppDomainNotCreatedException();
158        RemoveSlaveTask(taskId, slaveTask);
159      }
160      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
161      slaveTask.DisposeAppDomain();
162      SlaveStatusInfo.IncrementTasksAborted();
163      OnTaskAborted(slaveTask);
164    }
165
166    public void PauseAllTasksAsync() {
167      slaveTasksLocker.EnterUpgradeableReadLock();
168      try {
169        foreach (var slaveTask in slaveTasks.Values) {
170          slaveTask.Item1.PauseTask();
171        }
172      }
173      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
174    }
175
176    public void StopAllTasksAsync() {
177      slaveTasksLocker.EnterUpgradeableReadLock();
178      try {
179        foreach (var slaveTask in slaveTasks.Values) {
180          slaveTask.Item1.StopTask();
181        }
182      }
183      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
184    }
185
186    public void AbortAllTasks() {
187      slaveTasksLocker.EnterUpgradeableReadLock();
188      try {
189        foreach (var slaveTask in slaveTasks.Values.ToArray()) {
190          AbortTask(slaveTask.Item1.TaskId);
191        }
192      }
193      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
194    }
195    #endregion
196
197    #region Add/Remove SlaveTask
198    private void AddSlaveTask(Task task, SlaveTask slaveTask) {
199      slaveTasksLocker.EnterWriteLock();
200      try {
201        slaveTasks.Add(task.Id, Tuple.Create(slaveTask, DateTime.Now));
202        RegisterSlaveTaskEvents(slaveTask);
203      }
204      finally { slaveTasksLocker.ExitWriteLock(); }
205    }
206
207    private void RemoveSlaveTask(Guid taskId, SlaveTask slaveTask) {
208      slaveTasksLocker.EnterWriteLock();
209      try {
210        slaveTasks.Remove(taskId);
211        DeregisterSlaveTaskEvents(slaveTask);
212      }
213      finally { slaveTasksLocker.ExitWriteLock(); }
214    }
215    #endregion
216
217    #region SlaveTask Events
218    private void RegisterSlaveTaskEvents(SlaveTask slaveTask) {
219      slaveTask.TaskStarted += new EventHandler<EventArgs<Guid>>(slaveTask_TaskStarted);
220      slaveTask.TaskPaused += new EventHandler<EventArgs<Guid>>(slaveTask_TaskPaused);
221      slaveTask.TaskStopped += new EventHandler<EventArgs<Guid>>(slaveTask_TaskStopped);
222      slaveTask.TaskFailed += new EventHandler<EventArgs<Guid, Exception>>(slaveTask_TaskFailed);
223    }
224
225    private void DeregisterSlaveTaskEvents(SlaveTask slaveTask) {
226      slaveTask.TaskStarted -= new EventHandler<EventArgs<Guid>>(slaveTask_TaskStarted);
227      slaveTask.TaskPaused -= new EventHandler<EventArgs<Guid>>(slaveTask_TaskPaused);
228      slaveTask.TaskStopped -= new EventHandler<EventArgs<Guid>>(slaveTask_TaskStopped);
229      slaveTask.TaskFailed -= new EventHandler<EventArgs<Guid, Exception>>(slaveTask_TaskFailed);
230    }
231
232    private void slaveTask_TaskStarted(object sender, EventArgs<Guid> e) {
233      SlaveTask slaveTask;
234      slaveTasksLocker.EnterUpgradeableReadLock();
235      try {
236        slaveTask = slaveTasks[e.Value].Item1;
237      }
238      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
239
240      SlaveStatusInfo.IncrementTasksStarted();
241      OnTaskStarted(slaveTask);
242    }
243
244    private void slaveTask_TaskPaused(object sender, EventArgs<Guid> e) {
245      SlaveTask slaveTask;
246      slaveTasksLocker.EnterUpgradeableReadLock();
247      try {
248        slaveTask = slaveTasks[e.Value].Item1;
249        RemoveSlaveTask(e.Value, slaveTask);
250      }
251      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
252
253      TaskData taskData = null;
254      try {
255        taskData = slaveTask.GetTaskData();
256        SlaveStatusInfo.IncrementTasksFinished();
257        OnTaskPaused(slaveTask, taskData);
258      }
259      catch (Exception ex) {
260        RemoveSlaveTask(e.Value, slaveTask);
261        SlaveStatusInfo.IncrementTasksFailed();
262        OnTaskFailed(slaveTask, taskData, ex);
263      }
264    }
265
266    private void slaveTask_TaskStopped(object sender, EventArgs<Guid> e) {
267      SlaveTask slaveTask;
268      slaveTasksLocker.EnterUpgradeableReadLock();
269      try {
270        slaveTask = slaveTasks[e.Value].Item1;
271        RemoveSlaveTask(e.Value, slaveTask);
272      }
273      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
274
275      TaskData taskData = null;
276      try {
277        taskData = slaveTask.GetTaskData();
278        SlaveStatusInfo.IncrementTasksFinished();
279        OnTaskStopped(slaveTask, taskData);
280      }
281      catch (Exception ex) {
282        RemoveSlaveTask(e.Value, slaveTask);
283        SlaveStatusInfo.IncrementTasksFailed();
284        OnTaskFailed(slaveTask, taskData, ex);
285      }
286    }
287
288    private void slaveTask_TaskFailed(object sender, EventArgs<Guid, Exception> e) {
289      SlaveTask slaveTask;
290      slaveTasksLocker.EnterUpgradeableReadLock();
291      try {
292        slaveTask = slaveTasks[e.Value].Item1;
293        RemoveSlaveTask(e.Value, slaveTask);
294      }
295      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
296
297      TaskData taskData = null;
298      try {
299        taskData = slaveTask.GetTaskData();
300      }
301      catch { /* taskData will be null */ }
302      SlaveStatusInfo.IncrementTasksFailed();
303      OnTaskFailed(slaveTask, taskData, e.Value2);
304    }
305    #endregion
306
307    #region EventHandler
308    public event EventHandler<EventArgs<SlaveTask>> TaskStarted;
309    private void OnTaskStarted(SlaveTask slaveTask) {
310      var handler = TaskStarted;
311      if (handler != null) handler(this, new EventArgs<SlaveTask>(slaveTask));
312    }
313
314    public event EventHandler<EventArgs<SlaveTask, TaskData>> TaskStopped;
315    private void OnTaskStopped(SlaveTask slaveTask, TaskData taskData) {
316      var handler = TaskStopped;
317      if (handler != null) handler(this, new EventArgs<SlaveTask, TaskData>(slaveTask, taskData));
318    }
319
320    public event EventHandler<EventArgs<SlaveTask, TaskData>> TaskPaused;
321    private void OnTaskPaused(SlaveTask slaveTask, TaskData taskData) {
322      var handler = TaskPaused;
323      if (handler != null) handler(this, new EventArgs<SlaveTask, TaskData>(slaveTask, taskData));
324    }
325
326    public event EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>> TaskFailed;
327    private void OnTaskFailed(SlaveTask slaveTask, TaskData taskData, Exception exception) {
328      var handler = TaskFailed;
329      if (handler != null) handler(this, new EventArgs<Tuple<SlaveTask, TaskData, Exception>>(new Tuple<SlaveTask, TaskData, Exception>(slaveTask, taskData, exception)));
330    }
331
332    public event EventHandler<EventArgs<SlaveTask>> TaskAborted;
333    private void OnTaskAborted(SlaveTask slaveTask) {
334      var handler = TaskAborted;
335      if (handler != null) handler(this, new EventArgs<SlaveTask>(slaveTask));
336    }
337    #endregion
338
339    public Dictionary<Guid, TimeSpan> GetExecutionTimes() {
340      slaveTasksLocker.EnterReadLock();
341      try {
342        return slaveTasks.ToDictionary(x => x.Key, x => x.Value.Item1.ExecutionTime);
343      }
344      finally { slaveTasksLocker.ExitReadLock(); }
345    }
346  }
347}
Note: See TracBrowser for help on using the repository browser.