#region License Information
/* HeuristicLab
* Copyright (C) 2002-2018 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using HeuristicLab.Clients.Hive.SlaveCore.Properties;
using HeuristicLab.Common;
using HeuristicLab.Core;
namespace HeuristicLab.Clients.Hive.SlaveCore {
///
/// Holds a list of slave tasks and manages access to this list.
/// Forwards events from SlaveTask and forwards commands to SlaveTask.
/// Periodically sends task data to the server to avoid loss of progress when the slave crashes.
///
public class TaskManager {
private static readonly ReaderWriterLockSlim slaveTasksLocker = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private readonly Dictionary> slaveTasks;
private readonly ILog log;
private readonly PluginManager pluginManager;
private readonly CancellationTokenSource cts;
private readonly CancellationToken ct;
private readonly AutoResetEvent waitHandle;
private readonly TimeSpan checkpointInterval;
private readonly TimeSpan checkpointCheckInterval;
public int TaskCount {
get {
slaveTasksLocker.EnterReadLock();
try {
return slaveTasks.Count;
}
finally { slaveTasksLocker.ExitReadLock(); }
}
}
public Guid[] TaskIds {
get {
slaveTasksLocker.EnterReadLock();
try {
return slaveTasks.Keys.ToArray();
}
finally { slaveTasksLocker.ExitReadLock(); }
}
}
public TaskManager(PluginManager pluginCache, ILog log) {
this.pluginManager = pluginCache;
this.log = log;
this.slaveTasks = new Dictionary>();
cts = new CancellationTokenSource();
ct = cts.Token;
waitHandle = new AutoResetEvent(true);
checkpointInterval = Settings.Default.CheckpointInterval;
checkpointCheckInterval = Settings.Default.CheckpointCheckInterval;
System.Threading.Tasks.Task.Factory.StartNew(Checkpointing, ct);
}
#region Checkpointing
private void Checkpointing() {
while (!ct.IsCancellationRequested) {
slaveTasksLocker.EnterUpgradeableReadLock();
try {
foreach (var entry in slaveTasks) {
if (DateTime.Now - entry.Value.Item2 > checkpointInterval)
PauseTaskAsync(entry.Key);
}
} finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
waitHandle.WaitOne(checkpointCheckInterval);
}
}
public void StopCheckpointing() {
cts.Cancel();
waitHandle.Set();
waitHandle.Close();
}
#endregion
#region Task Control methods
public void StartTaskAsync(Task task, TaskData taskData) {
SlaveTask slaveTask = null;
slaveTasksLocker.EnterUpgradeableReadLock();
try {
if (slaveTasks.ContainsKey(task.Id)) {
SlaveStatusInfo.IncrementTasksFailed();
throw new TaskAlreadyRunningException(task.Id);
} else {
slaveTask = new SlaveTask(pluginManager, task.CoresNeeded, log);
AddSlaveTask(task, slaveTask);
SlaveStatusInfo.IncrementTasksFetched();
}
}
finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
if (slaveTask != null) {
try {
slaveTask.StartJobAsync(task, taskData);
}
catch (Exception) {
RemoveSlaveTask(task.Id, slaveTask); // clean up and rethrow
slaveTask.DisposeAppDomain();
throw;
}
}
}
public void PauseTaskAsync(Guid taskId) {
slaveTasksLocker.EnterUpgradeableReadLock();
try {
if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
SlaveTask slaveTask = slaveTasks[taskId].Item1;
slaveTask.PauseTask();
}
finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
}
public void StopTaskAsync(Guid taskId) {
slaveTasksLocker.EnterUpgradeableReadLock();
try {
if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
SlaveTask slaveTask = slaveTasks[taskId].Item1;
slaveTask.StopTask();
}
finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
}
public void AbortTask(Guid taskId) {
SlaveTask slaveTask = null;
slaveTasksLocker.EnterUpgradeableReadLock();
try {
if (!slaveTasks.ContainsKey(taskId)) throw new TaskNotRunningException(taskId);
slaveTask = slaveTasks[taskId].Item1;
if (!slaveTask.IsPrepared) throw new AppDomainNotCreatedException();
RemoveSlaveTask(taskId, slaveTask);
}
finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
slaveTask.DisposeAppDomain();
SlaveStatusInfo.IncrementTasksAborted();
OnTaskAborted(slaveTask);
}
public void PauseAllTasksAsync() {
slaveTasksLocker.EnterUpgradeableReadLock();
try {
foreach (var slaveTask in slaveTasks.Values) {
slaveTask.Item1.PauseTask();
}
}
finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
}
public void StopAllTasksAsync() {
slaveTasksLocker.EnterUpgradeableReadLock();
try {
foreach (var slaveTask in slaveTasks.Values) {
slaveTask.Item1.StopTask();
}
}
finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
}
public void AbortAllTasks() {
slaveTasksLocker.EnterUpgradeableReadLock();
try {
foreach (var slaveTask in slaveTasks.Values.ToArray()) {
AbortTask(slaveTask.Item1.TaskId);
}
}
finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
}
#endregion
#region Add/Remove SlaveTask
private void AddSlaveTask(Task task, SlaveTask slaveTask) {
slaveTasksLocker.EnterWriteLock();
try {
slaveTasks.Add(task.Id, Tuple.Create(slaveTask, DateTime.Now));
RegisterSlaveTaskEvents(slaveTask);
}
finally { slaveTasksLocker.ExitWriteLock(); }
}
private void RemoveSlaveTask(Guid taskId, SlaveTask slaveTask) {
slaveTasksLocker.EnterWriteLock();
try {
slaveTasks.Remove(taskId);
DeregisterSlaveTaskEvents(slaveTask);
}
finally { slaveTasksLocker.ExitWriteLock(); }
}
#endregion
#region SlaveTask Events
private void RegisterSlaveTaskEvents(SlaveTask slaveTask) {
slaveTask.TaskStarted += new EventHandler>(slaveTask_TaskStarted);
slaveTask.TaskPaused += new EventHandler>(slaveTask_TaskPaused);
slaveTask.TaskStopped += new EventHandler>(slaveTask_TaskStopped);
slaveTask.TaskFailed += new EventHandler>(slaveTask_TaskFailed);
}
private void DeregisterSlaveTaskEvents(SlaveTask slaveTask) {
slaveTask.TaskStarted -= new EventHandler>(slaveTask_TaskStarted);
slaveTask.TaskPaused -= new EventHandler>(slaveTask_TaskPaused);
slaveTask.TaskStopped -= new EventHandler>(slaveTask_TaskStopped);
slaveTask.TaskFailed -= new EventHandler>(slaveTask_TaskFailed);
}
private void slaveTask_TaskStarted(object sender, EventArgs e) {
SlaveTask slaveTask;
slaveTasksLocker.EnterUpgradeableReadLock();
try {
slaveTask = slaveTasks[e.Value].Item1;
}
finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
SlaveStatusInfo.IncrementTasksStarted();
OnTaskStarted(slaveTask);
}
private void slaveTask_TaskPaused(object sender, EventArgs e) {
SlaveTask slaveTask;
slaveTasksLocker.EnterUpgradeableReadLock();
try {
slaveTask = slaveTasks[e.Value].Item1;
RemoveSlaveTask(e.Value, slaveTask);
}
finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
TaskData taskData = null;
try {
taskData = slaveTask.GetTaskData();
SlaveStatusInfo.IncrementTasksFinished();
OnTaskPaused(slaveTask, taskData);
}
catch (Exception ex) {
RemoveSlaveTask(e.Value, slaveTask);
SlaveStatusInfo.IncrementTasksFailed();
OnTaskFailed(slaveTask, taskData, ex);
}
}
private void slaveTask_TaskStopped(object sender, EventArgs e) {
SlaveTask slaveTask;
slaveTasksLocker.EnterUpgradeableReadLock();
try {
slaveTask = slaveTasks[e.Value].Item1;
RemoveSlaveTask(e.Value, slaveTask);
}
finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
TaskData taskData = null;
try {
taskData = slaveTask.GetTaskData();
SlaveStatusInfo.IncrementTasksFinished();
OnTaskStopped(slaveTask, taskData);
}
catch (Exception ex) {
RemoveSlaveTask(e.Value, slaveTask);
SlaveStatusInfo.IncrementTasksFailed();
OnTaskFailed(slaveTask, taskData, ex);
}
}
private void slaveTask_TaskFailed(object sender, EventArgs e) {
SlaveTask slaveTask;
slaveTasksLocker.EnterUpgradeableReadLock();
try {
slaveTask = slaveTasks[e.Value].Item1;
RemoveSlaveTask(e.Value, slaveTask);
}
finally { slaveTasksLocker.ExitUpgradeableReadLock(); }
TaskData taskData = null;
try {
taskData = slaveTask.GetTaskData();
}
catch { /* taskData will be null */ }
SlaveStatusInfo.IncrementTasksFailed();
OnTaskFailed(slaveTask, taskData, e.Value2);
}
#endregion
#region EventHandler
public event EventHandler> TaskStarted;
private void OnTaskStarted(SlaveTask slaveTask) {
var handler = TaskStarted;
if (handler != null) handler(this, new EventArgs(slaveTask));
}
public event EventHandler> TaskStopped;
private void OnTaskStopped(SlaveTask slaveTask, TaskData taskData) {
var handler = TaskStopped;
if (handler != null) handler(this, new EventArgs(slaveTask, taskData));
}
public event EventHandler> TaskPaused;
private void OnTaskPaused(SlaveTask slaveTask, TaskData taskData) {
var handler = TaskPaused;
if (handler != null) handler(this, new EventArgs(slaveTask, taskData));
}
public event EventHandler>> TaskFailed;
private void OnTaskFailed(SlaveTask slaveTask, TaskData taskData, Exception exception) {
var handler = TaskFailed;
if (handler != null) handler(this, new EventArgs>(new Tuple(slaveTask, taskData, exception)));
}
public event EventHandler> TaskAborted;
private void OnTaskAborted(SlaveTask slaveTask) {
var handler = TaskAborted;
if (handler != null) handler(this, new EventArgs(slaveTask));
}
#endregion
public Dictionary GetExecutionTimes() {
slaveTasksLocker.EnterReadLock();
try {
return slaveTasks.ToDictionary(x => x.Key, x => x.Value.Item1.ExecutionTime);
}
finally { slaveTasksLocker.ExitReadLock(); }
}
}
}