Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Manager/TaskManager.cs @ 11082

Last change on this file since 11082 was 11082, checked in by ascheibe, 10 years ago

#2153

  • added a new method HandleStartStopPauseError in Executor to handle error conditions in the same way
  • added timeouts for semaphores so that failed tasks or tasks with endless loops don't block the slave
  • removed ExceptionOccured events from Executor/SlaveTask/TaskManager and use TaskFailed instead
  • removed another ExcpetionOccured event in HeartbeatManager that was never used
File size: 10.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2013 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Common;
27using HeuristicLab.Core;
28
29namespace HeuristicLab.Clients.Hive.SlaveCore {
30
31  /// <summary>
32  /// Holds a list of slave tasks and manages access to this list.
33  /// Forwards events from SlaveTask and forwards commands to SlaveTask.
34  /// </summary>
35  public class TaskManager {
36    private static ReaderWriterLockSlim slaveTasksLocker = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
37    private Dictionary<Guid, SlaveTask> slaveTasks;
38    private ILog log;
39    private PluginManager pluginManager;
40
41    public int TaskCount {
42      get {
43        slaveTasksLocker.EnterReadLock();
44        try {
45          return slaveTasks.Count;
46        }
47        finally { slaveTasksLocker.ExitReadLock(); }
48      }
49    }
50
51    public Guid[] TaskIds {
52      get {
53        slaveTasksLocker.EnterReadLock();
54        try {
55          return slaveTasks.Keys.ToArray();
56        }
57        finally { slaveTasksLocker.ExitReadLock(); }
58      }
59    }
60
61    public TaskManager(PluginManager pluginCache, ILog log) {
62      this.pluginManager = pluginCache;
63      this.log = log;
64      this.slaveTasks = new Dictionary<Guid, SlaveTask>();
65    }
66
67    #region Task Control methods
68    public void StartTaskAsync(Task task, TaskData taskData) {
69      SlaveTask slaveTask = null;
70      slaveTasksLocker.EnterUpgradeableReadLock();
71      try {
72        if (slaveTasks.ContainsKey(task.Id)) {
73          SlaveStatusInfo.IncrementTasksFailed();
74          throw new TaskAlreadyRunningException(task.Id);
75        } else {
76          slaveTask = new SlaveTask(pluginManager, task.CoresNeeded, log);
77          AddSlaveTask(task, slaveTask);
78          SlaveStatusInfo.IncrementTasksFetched();
79        }
80      }
81      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
82
83      if (slaveTask != null) {
84        try {
85          slaveTask.StartJobAsync(task, taskData);
86        }
87        catch (Exception) {
88          RemoveSlaveTask(task.Id, slaveTask); // clean up and rethrow
89          slaveTask.DisposeAppDomain();
90          throw;
91        }
92      }
93    }
94
95    public void PauseTaskAsync(Guid taskId) {
96      slaveTasksLocker.EnterUpgradeableReadLock();
97      try {
98        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
99        SlaveTask slaveTask = slaveTasks[taskId];
100        slaveTask.PauseTask();
101      }
102      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
103    }
104
105    public void StopTaskAsync(Guid taskId) {
106      slaveTasksLocker.EnterUpgradeableReadLock();
107      try {
108        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
109        SlaveTask slaveTask = slaveTasks[taskId];
110        slaveTask.StopTask();
111      }
112      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
113    }
114
115    public void AbortTask(Guid taskId) {
116      SlaveTask slaveTask = null;
117      slaveTasksLocker.EnterUpgradeableReadLock();
118      try {
119        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
120        slaveTask = slaveTasks[taskId];
121        if (!slaveTask.IsPrepared) throw new AppDomainNotCreatedException();
122        RemoveSlaveTask(taskId, slaveTask);
123      }
124      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
125      slaveTask.DisposeAppDomain();
126      SlaveStatusInfo.IncrementTasksAborted();
127      OnTaskAborted(slaveTask);
128    }
129
130    public void PauseAllTasksAsync() {
131      slaveTasksLocker.EnterUpgradeableReadLock();
132      try {
133        foreach (var slaveTask in slaveTasks.Values) {
134          slaveTask.PauseTask();
135        }
136      }
137      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
138    }
139
140    public void StopAllTasksAsync() {
141      slaveTasksLocker.EnterUpgradeableReadLock();
142      try {
143        foreach (var slaveTask in slaveTasks.Values) {
144          slaveTask.StopTask();
145        }
146      }
147      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
148    }
149
150    public void AbortAllTasks() {
151      slaveTasksLocker.EnterUpgradeableReadLock();
152      try {
153        foreach (var slaveTask in slaveTasks.Values.ToArray()) {
154          AbortTask(slaveTask.TaskId);
155        }
156      }
157      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
158    }
159    #endregion
160
161    #region Add/Remove SlaveTask
162    private void AddSlaveTask(Task task, SlaveTask slaveTask) {
163      slaveTasksLocker.EnterWriteLock();
164      try {
165        slaveTasks.Add(task.Id, slaveTask);
166        RegisterSlaveTaskEvents(slaveTask);
167      }
168      finally { slaveTasksLocker.ExitWriteLock(); }
169    }
170
171    private void RemoveSlaveTask(Guid taskId, SlaveTask slaveTask) {
172      slaveTasksLocker.EnterWriteLock();
173      try {
174        slaveTasks.Remove(taskId);
175        DeregisterSlaveTaskEvents(slaveTask);
176      }
177      finally { slaveTasksLocker.ExitWriteLock(); }
178    }
179    #endregion
180
181    #region SlaveTask Events
182    private void RegisterSlaveTaskEvents(SlaveTask slaveTask) {
183      slaveTask.TaskStarted += new EventHandler<EventArgs<Guid>>(slaveTask_TaskStarted);
184      slaveTask.TaskPaused += new EventHandler<EventArgs<Guid>>(slaveTask_TaskPaused);
185      slaveTask.TaskStopped += new EventHandler<EventArgs<Guid>>(slaveTask_TaskStopped);
186      slaveTask.TaskFailed += new EventHandler<EventArgs<Guid, Exception>>(slaveTask_TaskFailed);
187    }
188
189    private void DeregisterSlaveTaskEvents(SlaveTask slaveTask) {
190      slaveTask.TaskStarted -= new EventHandler<EventArgs<Guid>>(slaveTask_TaskStarted);
191      slaveTask.TaskPaused -= new EventHandler<EventArgs<Guid>>(slaveTask_TaskPaused);
192      slaveTask.TaskStopped -= new EventHandler<EventArgs<Guid>>(slaveTask_TaskStopped);
193      slaveTask.TaskFailed -= new EventHandler<EventArgs<Guid, Exception>>(slaveTask_TaskFailed);
194    }
195
196    private void slaveTask_TaskStarted(object sender, EventArgs<Guid> e) {
197      SlaveTask slaveTask;
198      slaveTasksLocker.EnterUpgradeableReadLock();
199      try {
200        slaveTask = slaveTasks[e.Value];
201      }
202      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
203
204      SlaveStatusInfo.IncrementTasksStarted();
205      OnTaskStarted(slaveTask);
206    }
207
208    private void slaveTask_TaskPaused(object sender, EventArgs<Guid> e) {
209      SlaveTask slaveTask;
210      slaveTasksLocker.EnterUpgradeableReadLock();
211      try {
212        slaveTask = slaveTasks[e.Value];
213        RemoveSlaveTask(e.Value, slaveTask);
214      }
215      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
216
217      TaskData taskData = null;
218      try {
219        taskData = slaveTask.GetTaskData();
220        SlaveStatusInfo.IncrementTasksFinished();
221        OnTaskPaused(slaveTask, taskData);
222      }
223      catch (Exception ex) {
224        RemoveSlaveTask(e.Value, slaveTask);
225        SlaveStatusInfo.IncrementTasksFailed();
226        OnTaskFailed(slaveTask, taskData, ex);
227      }
228    }
229
230    private void slaveTask_TaskStopped(object sender, EventArgs<Guid> e) {
231      SlaveTask slaveTask;
232      slaveTasksLocker.EnterUpgradeableReadLock();
233      try {
234        slaveTask = slaveTasks[e.Value];
235        RemoveSlaveTask(e.Value, slaveTask);
236      }
237      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
238
239      TaskData taskData = null;
240      try {
241        taskData = slaveTask.GetTaskData();
242        SlaveStatusInfo.IncrementTasksFinished();
243        OnTaskStopped(slaveTask, taskData);
244      }
245      catch (Exception ex) {
246        RemoveSlaveTask(e.Value, slaveTask);
247        SlaveStatusInfo.IncrementTasksFailed();
248        OnTaskFailed(slaveTask, taskData, ex);
249      }
250    }
251
252    private void slaveTask_TaskFailed(object sender, EventArgs<Guid, Exception> e) {
253      SlaveTask slaveTask;
254      slaveTasksLocker.EnterUpgradeableReadLock();
255      try {
256        slaveTask = slaveTasks[e.Value];
257        RemoveSlaveTask(e.Value, slaveTask);
258      }
259      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
260
261      TaskData taskData = null;
262      try {
263        taskData = slaveTask.GetTaskData();
264      }
265      catch { /* taskData will be null */ }
266      SlaveStatusInfo.IncrementTasksFailed();
267      OnTaskFailed(slaveTask, taskData, e.Value2);
268    }
269    #endregion
270
271    #region EventHandler
272    public event EventHandler<EventArgs<SlaveTask>> TaskStarted;
273    private void OnTaskStarted(SlaveTask slaveTask) {
274      var handler = TaskStarted;
275      if (handler != null) handler(this, new EventArgs<SlaveTask>(slaveTask));
276    }
277
278    public event EventHandler<EventArgs<SlaveTask, TaskData>> TaskStopped;
279    private void OnTaskStopped(SlaveTask slaveTask, TaskData taskData) {
280      var handler = TaskStopped;
281      if (handler != null) handler(this, new EventArgs<SlaveTask, TaskData>(slaveTask, taskData));
282    }
283
284    public event EventHandler<EventArgs<SlaveTask, TaskData>> TaskPaused;
285    private void OnTaskPaused(SlaveTask slaveTask, TaskData taskData) {
286      var handler = TaskPaused;
287      if (handler != null) handler(this, new EventArgs<SlaveTask, TaskData>(slaveTask, taskData));
288    }
289
290    public event EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>> TaskFailed;
291    private void OnTaskFailed(SlaveTask slaveTask, TaskData taskData, Exception exception) {
292      var handler = TaskFailed;
293      if (handler != null) handler(this, new EventArgs<Tuple<SlaveTask, TaskData, Exception>>(new Tuple<SlaveTask, TaskData, Exception>(slaveTask, taskData, exception)));
294    }
295
296    public event EventHandler<EventArgs<SlaveTask>> TaskAborted;
297    private void OnTaskAborted(SlaveTask slaveTask) {
298      var handler = TaskAborted;
299      if (handler != null) handler(this, new EventArgs<SlaveTask>(slaveTask));
300    }
301    #endregion
302
303    public Dictionary<Guid, TimeSpan> GetExecutionTimes() {
304      slaveTasksLocker.EnterReadLock();
305      try {
306        return slaveTasks.ToDictionary(x => x.Key, x => x.Value.ExecutionTime);
307      }
308      finally { slaveTasksLocker.ExitReadLock(); }
309    }
310  }
311}
Note: See TracBrowser for help on using the repository browser.