#region License Information
/* HeuristicLab
* Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Threading;
using System.Threading.Tasks;
using HeuristicLab.Common;
using HeuristicLab.Hive;
namespace HeuristicLab.Clients.Hive {
///
/// Downloads and deserializes jobs. It avoids too many jobs beeing downloaded or deserialized at the same time to avoid memory problems
///
public class ConcurrentTaskDownloader where T : class, ITask {
private bool abort = false;
// use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory
private Semaphore downloadSemaphore;
private Semaphore deserializeSemaphore;
public ConcurrentTaskDownloader(int concurrentDownloads, int concurrentDeserializations) {
downloadSemaphore = new Semaphore(concurrentDownloads, concurrentDownloads);
deserializeSemaphore = new Semaphore(concurrentDeserializations, concurrentDeserializations);
TaskScheduler.UnobservedTaskException += new EventHandler(TaskScheduler_UnobservedTaskException);
}
public void DownloadTaskData(Task t, Action onFinishedAction) {
Task> task = Task>.Factory.StartNew(DownloadTaskData, t)
.ContinueWith((y) => DeserializeTask(y.Result));
task.ContinueWith((x) => OnTaskFinished(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion);
task.ContinueWith((x) => OnTaskFailed(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted);
}
public void DownloadTaskDataAndTask(Guid taskId, Action onFinishedAction) {
Task> task = Task.Factory.StartNew(DownloadTask, taskId)
.ContinueWith((x) => DownloadTaskData(x.Result))
.ContinueWith((y) => DeserializeTask(y.Result));
task.ContinueWith((x) => OnTaskFinished(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion);
task.ContinueWith((x) => OnTaskFailed(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted);
}
private void OnTaskFinished(Task> task, Action onFinishedAction) {
onFinishedAction(task.Result.Item1, task.Result.Item2);
}
private void OnTaskFailed(Task> task, Action onFinishedAction) {
task.Exception.Flatten().Handle((e) => { return true; });
OnExceptionOccured(task.Exception.Flatten());
onFinishedAction(task.Result.Item1, null);
}
private Task DownloadTask(object taskId) {
return HiveServiceLocator.Instance.CallHiveService(s => s.GetTask((Guid)taskId));
}
protected Tuple DownloadTaskData(object taskId) {
return DownloadTaskData((Task)taskId);
}
protected Tuple DownloadTaskData(Task task) {
downloadSemaphore.WaitOne();
TaskData result;
try {
if (abort) return null;
result = HiveServiceLocator.Instance.CallHiveService(s => s.GetTaskData(task.Id));
} finally {
downloadSemaphore.Release();
}
return new Tuple(task, result);
}
protected Tuple DeserializeTask(Tuple taskData) {
deserializeSemaphore.WaitOne();
try {
if (abort || taskData.Item2 == null || taskData.Item1 == null) return null;
var deserializedJob = PersistenceUtil.Deserialize(taskData.Item2.Data);
taskData.Item2.Data = null; // reduce memory consumption.
return new Tuple(taskData.Item1, deserializedJob);
} finally {
deserializeSemaphore.Release();
}
}
private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
e.SetObserved(); // avoid crash of process because task crashes. first exception found is handled in Results property
OnExceptionOccured(new HiveException("Unobserved Exception in ConcurrentTaskDownloader", e.Exception));
}
public event EventHandler> ExceptionOccured;
private void OnExceptionOccured(Exception exception) {
var handler = ExceptionOccured;
if (handler != null) handler(this, new EventArgs(exception));
}
}
}