#region License Information /* HeuristicLab * Copyright (C) 2002-2013 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Threading; using System.Threading.Tasks; using HeuristicLab.Common; using HeuristicLab.Hive; namespace HeuristicLab.Clients.Hive { /// /// Downloads and deserializes jobs. It avoids too many jobs beeing downloaded or deserialized at the same time to avoid memory problems /// public class ConcurrentTaskDownloader : IDisposable where T : class, ITask { private bool abort = false; // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory private Semaphore downloadSemaphore; private Semaphore deserializeSemaphore; public ConcurrentTaskDownloader(int concurrentDownloads, int concurrentDeserializations) { downloadSemaphore = new Semaphore(concurrentDownloads, concurrentDownloads); deserializeSemaphore = new Semaphore(concurrentDeserializations, concurrentDeserializations); } public void DownloadTaskData(Task t, Action onFinishedAction) { Task> task = Task>.Factory.StartNew(DownloadTaskData, t) .ContinueWith((y) => DeserializeTask(y.Result)); task.ContinueWith((x) => OnTaskFinished(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion); task.ContinueWith((x) => OnTaskFailed(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted); } public void DownloadTaskDataAndTask(Guid taskId, Action onFinishedAction) { Task> task = Task.Factory.StartNew(DownloadTask, taskId) .ContinueWith((x) => DownloadTaskData(x.Result)) .ContinueWith((y) => DeserializeTask(y.Result)); task.ContinueWith((x) => OnTaskFinished(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion); task.ContinueWith((x) => OnTaskFailed(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted); } private void OnTaskFinished(Task> task, Action onFinishedAction) { onFinishedAction(task.Result.Item1, task.Result.Item2); } private void OnTaskFailed(Task> task, Action onFinishedAction) { task.Exception.Flatten().Handle((e) => { return true; }); OnExceptionOccured(task.Exception.Flatten()); onFinishedAction(task.Result.Item1, null); } private Task DownloadTask(object taskId) { Task t = null; HiveClient.TryAndRepeat(() => { t = HiveServiceLocator.Instance.CallHiveService(s => s.GetTask((Guid)taskId)); }, Settings.Default.MaxRepeatServiceCalls, "Failed to download task."); return t; } protected Tuple DownloadTaskData(object taskId) { return DownloadTaskData((Task)taskId); } protected Tuple DownloadTaskData(Task task) { downloadSemaphore.WaitOne(); TaskData result = null; try { if (abort) return null; HiveClient.TryAndRepeat(() => { result = HiveServiceLocator.Instance.CallHiveService(s => s.GetTaskData(task.Id)); }, Settings.Default.MaxRepeatServiceCalls, "Failed to download task data."); } finally { downloadSemaphore.Release(); } return new Tuple(task, result); } protected Tuple DeserializeTask(Tuple taskData) { deserializeSemaphore.WaitOne(); try { if (abort || taskData.Item2 == null || taskData.Item1 == null) return null; var deserializedJob = PersistenceUtil.Deserialize(taskData.Item2.Data); taskData.Item2.Data = null; // reduce memory consumption. return new Tuple(taskData.Item1, deserializedJob); } finally { deserializeSemaphore.Release(); } } public event EventHandler> ExceptionOccured; private void OnExceptionOccured(Exception exception) { var handler = ExceptionOccured; if (handler != null) handler(this, new EventArgs(exception)); } #region IDisposable Members public void Dispose() { deserializeSemaphore.Dispose(); downloadSemaphore.Dispose(); } #endregion } }