#region License Information /* HeuristicLab * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Threading; using System.Threading.Tasks; using HeuristicLab.Common; using HeuristicLab.Hive; namespace HeuristicLab.Clients.Hive { /// /// Downloads and deserializes jobs. It avoids too many jobs beeing downloaded or deserialized at the same time to avoid memory problems /// public class ConcurrentTaskDownloader where T : class, ITask { private bool abort = false; // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory private Semaphore downloadSemaphore; private Semaphore deserializeSemaphore; public ConcurrentTaskDownloader(int concurrentDownloads, int concurrentDeserializations) { downloadSemaphore = new Semaphore(concurrentDownloads, concurrentDownloads); deserializeSemaphore = new Semaphore(concurrentDeserializations, concurrentDeserializations); TaskScheduler.UnobservedTaskException += new EventHandler(TaskScheduler_UnobservedTaskException); } public void DownloadTaskData(Task t, Action onFinishedAction) { Task> task = Task>.Factory.StartNew(DownloadTaskData, t) .ContinueWith((y) => DeserializeTask(y.Result)); task.ContinueWith((x) => OnTaskFinished(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion); task.ContinueWith((x) => OnTaskFailed(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted); } public void DownloadTaskDataAndTask(Guid taskId, Action onFinishedAction) { Task> task = Task.Factory.StartNew(DownloadTask, taskId) .ContinueWith((x) => DownloadTaskData(x.Result)) .ContinueWith((y) => DeserializeTask(y.Result)); task.ContinueWith((x) => OnTaskFinished(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion); task.ContinueWith((x) => OnTaskFailed(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted); } private void OnTaskFinished(Task> task, Action onFinishedAction) { onFinishedAction(task.Result.Item1, task.Result.Item2); } private void OnTaskFailed(Task> task, Action onFinishedAction) { task.Exception.Flatten().Handle((e) => { return true; }); OnExceptionOccured(task.Exception.Flatten()); onFinishedAction(task.Result.Item1, null); } private Task DownloadTask(object taskId) { return HiveServiceLocator.Instance.CallHiveService(s => s.GetTask((Guid)taskId)); } protected Tuple DownloadTaskData(object taskId) { return DownloadTaskData((Task)taskId); } protected Tuple DownloadTaskData(Task task) { downloadSemaphore.WaitOne(); TaskData result; try { if (abort) return null; result = HiveServiceLocator.Instance.CallHiveService(s => s.GetTaskData(task.Id)); } finally { downloadSemaphore.Release(); } return new Tuple(task, result); } protected Tuple DeserializeTask(Tuple taskData) { deserializeSemaphore.WaitOne(); try { if (abort || taskData.Item2 == null || taskData.Item1 == null) return null; var deserializedJob = PersistenceUtil.Deserialize(taskData.Item2.Data); taskData.Item2.Data = null; // reduce memory consumption. return new Tuple(taskData.Item1, deserializedJob); } finally { deserializeSemaphore.Release(); } } private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) { e.SetObserved(); // avoid crash of process because task crashes. first exception found is handled in Results property OnExceptionOccured(new HiveException("Unobserved Exception in ConcurrentTaskDownloader", e.Exception)); } public event EventHandler> ExceptionOccured; private void OnExceptionOccured(Exception exception) { var handler = ExceptionOccured; if (handler != null) handler(this, new EventArgs(exception)); } } }