Free cookie consent management tool by TermsFeed Policy Generator

source: branches/DataPreprocessing/HeuristicLab.Clients.Hive.Slave/3.3/Manager/TaskManager.cs @ 10187

Last change on this file since 10187 was 9456, checked in by swagner, 12 years ago

Updated copyright year and added some missing license headers (#1889)

File size: 11.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2013 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Common;
27using HeuristicLab.Core;
28
29namespace HeuristicLab.Clients.Hive.SlaveCore {
30
31  /// <summary>
32  /// Holds a list of slave tasks and manages access to this list.
33  /// Forwards events from SlaveTask and forwards commands to SlaveTask.
34  /// </summary>
35  public class TaskManager {
36    private static ReaderWriterLockSlim slaveTasksLocker = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
37    private Dictionary<Guid, SlaveTask> slaveTasks;
38    private ILog log;
39    private PluginManager pluginManager;
40
41    public int TaskCount {
42      get {
43        slaveTasksLocker.EnterReadLock();
44        try {
45          return slaveTasks.Count;
46        }
47        finally { slaveTasksLocker.ExitReadLock(); }
48      }
49    }
50
51    public Guid[] TaskIds {
52      get {
53        slaveTasksLocker.EnterReadLock();
54        try {
55          return slaveTasks.Keys.ToArray();
56        }
57        finally { slaveTasksLocker.ExitReadLock(); }
58      }
59    }
60
61    public TaskManager(PluginManager pluginCache, ILog log) {
62      this.pluginManager = pluginCache;
63      this.log = log;
64      this.slaveTasks = new Dictionary<Guid, SlaveTask>();
65    }
66
67    #region Task Control methods
68    public void StartTaskAsync(Task task, TaskData taskData) {
69      SlaveTask slaveTask = null;
70      slaveTasksLocker.EnterUpgradeableReadLock();
71      try {
72        if (slaveTasks.ContainsKey(task.Id)) {
73          SlaveStatusInfo.IncrementExceptionOccured();
74          throw new TaskAlreadyRunningException(task.Id);
75        } else {
76          slaveTask = new SlaveTask(pluginManager, task.CoresNeeded, log);
77          AddSlaveTask(task, slaveTask);
78          SlaveStatusInfo.IncrementTasksFetched();
79        }
80      }
81      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
82
83      if (slaveTask != null) {
84        try {
85          slaveTask.StartJobAsync(task, taskData);
86        }
87        catch (Exception) {
88          RemoveSlaveTask(task.Id, slaveTask); // clean up and rethrow
89          slaveTask.DisposeAppDomain();
90          throw;
91        }
92      }
93    }
94
95    public void PauseTaskAsync(Guid taskId) {
96      slaveTasksLocker.EnterUpgradeableReadLock();
97      try {
98        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
99        SlaveTask slaveTask = slaveTasks[taskId];
100        slaveTask.PauseTask();
101      }
102      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
103    }
104
105    public void StopTaskAsync(Guid taskId) {
106      slaveTasksLocker.EnterUpgradeableReadLock();
107      try {
108        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
109        SlaveTask slaveTask = slaveTasks[taskId];
110        slaveTask.StopTask();
111      }
112      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
113    }
114
115    public void AbortTask(Guid taskId) {
116      SlaveTask slaveTask = null;
117      slaveTasksLocker.EnterUpgradeableReadLock();
118      try {
119        if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
120        slaveTask = slaveTasks[taskId];
121        if (!slaveTask.IsPrepared) throw new AppDomainNotCreatedException();
122        RemoveSlaveTask(taskId, slaveTask);
123      }
124      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
125      slaveTask.DisposeAppDomain();
126      SlaveStatusInfo.IncrementTasksAborted();
127      OnTaskAborted(slaveTask);
128    }
129
130    public void PauseAllTasksAsync() {
131      slaveTasksLocker.EnterUpgradeableReadLock();
132      try {
133        foreach (var slaveTask in slaveTasks.Values) {
134          slaveTask.PauseTask();
135        }
136      }
137      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
138    }
139
140    public void StopAllTasksAsync() {
141      slaveTasksLocker.EnterUpgradeableReadLock();
142      try {
143        foreach (var slaveTask in slaveTasks.Values) {
144          slaveTask.StopTask();
145        }
146      }
147      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
148    }
149
150    public void AbortAllTasks() {
151      slaveTasksLocker.EnterUpgradeableReadLock();
152      try {
153        foreach (var slaveTask in slaveTasks.Values.ToArray()) {
154          AbortTask(slaveTask.TaskId);
155        }
156      }
157      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
158    }
159    #endregion
160
161    #region Add/Remove SlaveTask
162    private void AddSlaveTask(Task task, SlaveTask slaveTask) {
163      slaveTasksLocker.EnterWriteLock();
164      try {
165        slaveTasks.Add(task.Id, slaveTask);
166        RegisterSlaveTaskEvents(slaveTask);
167      }
168      finally { slaveTasksLocker.ExitWriteLock(); }
169    }
170
171    private void RemoveSlaveTask(Guid taskId, SlaveTask slaveTask) {
172      slaveTasksLocker.EnterWriteLock();
173      try {
174        slaveTasks.Remove(taskId);
175        DeregisterSlaveTaskEvents(slaveTask);
176      }
177      finally { slaveTasksLocker.ExitWriteLock(); }
178    }
179    #endregion
180
181    #region SlaveTask Events
182    private void RegisterSlaveTaskEvents(SlaveTask slaveTask) {
183      slaveTask.TaskStarted += new EventHandler<EventArgs<Guid>>(slaveTask_TaskStarted);
184      slaveTask.TaskPaused += new EventHandler<EventArgs<Guid>>(slaveTask_TaskPaused);
185      slaveTask.TaskStopped += new EventHandler<EventArgs<Guid>>(slaveTask_TaskStopped);
186      slaveTask.TaskFailed += new EventHandler<EventArgs<Guid, Exception>>(slaveTask_TaskFailed);
187      slaveTask.ExceptionOccured += new EventHandler<EventArgs<Guid, Exception>>(slaveTask_ExceptionOccured);
188    }
189
190    private void DeregisterSlaveTaskEvents(SlaveTask slaveTask) {
191      slaveTask.TaskStarted -= new EventHandler<EventArgs<Guid>>(slaveTask_TaskStarted);
192      slaveTask.TaskPaused -= new EventHandler<EventArgs<Guid>>(slaveTask_TaskPaused);
193      slaveTask.TaskStopped -= new EventHandler<EventArgs<Guid>>(slaveTask_TaskStopped);
194      slaveTask.TaskFailed -= new EventHandler<EventArgs<Guid, Exception>>(slaveTask_TaskFailed);
195      slaveTask.ExceptionOccured -= new EventHandler<EventArgs<Guid, Exception>>(slaveTask_ExceptionOccured);
196    }
197
198    private void slaveTask_TaskStarted(object sender, EventArgs<Guid> e) {
199      SlaveTask slaveTask;
200      slaveTasksLocker.EnterUpgradeableReadLock();
201      try {
202        slaveTask = slaveTasks[e.Value];
203      }
204      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
205
206      SlaveStatusInfo.IncrementTasksStarted();
207      OnTaskStarted(slaveTask);
208    }
209
210    private void slaveTask_TaskPaused(object sender, EventArgs<Guid> e) {
211      SlaveTask slaveTask;
212      slaveTasksLocker.EnterUpgradeableReadLock();
213      try {
214        slaveTask = slaveTasks[e.Value];
215        RemoveSlaveTask(e.Value, slaveTask);
216      }
217      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
218
219      TaskData taskData = null;
220      try {
221        taskData = slaveTask.GetTaskData();
222        if (taskData == null) throw new SerializationException();
223        SlaveStatusInfo.IncrementTasksFinished();
224        OnTaskPaused(slaveTask, taskData);
225      }
226      catch (Exception ex) {
227        RemoveSlaveTask(e.Value, slaveTask);
228        SlaveStatusInfo.IncrementTasksFailed();
229        OnTaskFailed(slaveTask, taskData, ex);
230      }
231    }
232
233    private void slaveTask_TaskStopped(object sender, EventArgs<Guid> e) {
234      SlaveTask slaveTask;
235      slaveTasksLocker.EnterUpgradeableReadLock();
236      try {
237        slaveTask = slaveTasks[e.Value];
238        RemoveSlaveTask(e.Value, slaveTask);
239      }
240      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
241
242      TaskData taskData = null;
243      try {
244        taskData = slaveTask.GetTaskData();
245        if (taskData == null) throw new SerializationException();
246        SlaveStatusInfo.IncrementTasksFinished();
247        OnTaskStopped(slaveTask, taskData);
248      }
249      catch (Exception ex) {
250        RemoveSlaveTask(e.Value, slaveTask);
251        SlaveStatusInfo.IncrementTasksFailed();
252        OnTaskFailed(slaveTask, taskData, ex);
253      }
254    }
255
256    private void slaveTask_TaskFailed(object sender, EventArgs<Guid, Exception> e) {
257      SlaveTask slaveTask;
258      slaveTasksLocker.EnterUpgradeableReadLock();
259      try {
260        slaveTask = slaveTasks[e.Value];
261        RemoveSlaveTask(e.Value, slaveTask);
262      }
263      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
264
265      TaskData taskData = null;
266      try {
267        taskData = slaveTask.GetTaskData();
268        if (taskData == null) throw new SerializationException();
269      }
270      catch { /* taskData will be null */ }
271      SlaveStatusInfo.IncrementTasksFailed();
272      OnTaskFailed(slaveTask, taskData, e.Value2);
273    }
274
275    private void slaveTask_ExceptionOccured(object sender, EventArgs<Guid, Exception> e) {
276      SlaveTask slaveTask;
277      slaveTasksLocker.EnterUpgradeableReadLock();
278      try {
279        slaveTask = slaveTasks[e.Value];
280        RemoveSlaveTask(e.Value, slaveTask);
281      }
282      finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
283
284      SlaveStatusInfo.IncrementExceptionOccured();
285      OnExceptionOccured(slaveTask, e.Value2);
286    }
287    #endregion
288
289    #region EventHandler
290    public event EventHandler<EventArgs<SlaveTask>> TaskStarted;
291    private void OnTaskStarted(SlaveTask slaveTask) {
292      var handler = TaskStarted;
293      if (handler != null) handler(this, new EventArgs<SlaveTask>(slaveTask));
294    }
295
296    public event EventHandler<EventArgs<SlaveTask, TaskData>> TaskStopped;
297    private void OnTaskStopped(SlaveTask slaveTask, TaskData taskData) {
298      var handler = TaskStopped;
299      if (handler != null) handler(this, new EventArgs<SlaveTask, TaskData>(slaveTask, taskData));
300    }
301
302    public event EventHandler<EventArgs<SlaveTask, TaskData>> TaskPaused;
303    private void OnTaskPaused(SlaveTask slaveTask, TaskData taskData) {
304      var handler = TaskPaused;
305      if (handler != null) handler(this, new EventArgs<SlaveTask, TaskData>(slaveTask, taskData));
306    }
307
308    public event EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>> TaskFailed;
309    private void OnTaskFailed(SlaveTask slaveTask, TaskData taskData, Exception exception) {
310      var handler = TaskFailed;
311      if (handler != null) handler(this, new EventArgs<Tuple<SlaveTask, TaskData, Exception>>(new Tuple<SlaveTask, TaskData, Exception>(slaveTask, taskData, exception)));
312    }
313
314    public event EventHandler<EventArgs<SlaveTask, Exception>> ExceptionOccured;
315    private void OnExceptionOccured(SlaveTask slaveTask, Exception exception) {
316      var handler = ExceptionOccured;
317      if (handler != null) handler(this, new EventArgs<SlaveTask, Exception>(slaveTask, exception));
318    }
319
320    public event EventHandler<EventArgs<SlaveTask>> TaskAborted;
321    private void OnTaskAborted(SlaveTask slaveTask) {
322      var handler = TaskAborted;
323      if (handler != null) handler(this, new EventArgs<SlaveTask>(slaveTask));
324    }
325    #endregion
326
327    public Dictionary<Guid, TimeSpan> GetExecutionTimes() {
328      slaveTasksLocker.EnterReadLock();
329      try {
330        return slaveTasks.ToDictionary(x => x.Key, x => x.Value.ExecutionTime);
331      }
332      finally { slaveTasksLocker.ExitReadLock(); }
333    }
334  }
335}
Note: See TracBrowser for help on using the repository browser.