Free cookie consent management tool by TermsFeed Policy Generator

source: branches/WebJobManager/HeuristicLab.Clients.Hive.WebJobManager/Services/Imports/ConcurrentTaskDownloaderWeb.cs @ 15027

Last change on this file since 15027 was 13739, checked in by jlodewyc, 9 years ago

#2582 Overhaul Login service, done

File size: 5.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2015 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Threading;
24using System.Threading.Tasks;
25using HeuristicLab.Common;
26using HeuristicLab.Hive;
27
28namespace HeuristicLab.Clients.Hive.WebJobManager.Services.Imports
29{
30
31    //
32    /// <summary>
33    /// Rewritten to use HiveServiceLocatorWeb and HiveClientWeb
34    /// </summary>
35    public class ConcurrentTaskDownloaderWeb<T> : IDisposable where T : class, ITask
36    {
37        private bool abort = false;
38        // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory
39        private Semaphore downloadSemaphore;
40        private Semaphore deserializeSemaphore;
41        private HiveServiceLocatorWeb serviceLocator;
42        public ConcurrentTaskDownloaderWeb(int concurrentDownloads, int concurrentDeserializations, HiveServiceLocatorWeb hiv)
43        {
44            downloadSemaphore = new Semaphore(concurrentDownloads, concurrentDownloads);
45            deserializeSemaphore = new Semaphore(concurrentDeserializations, concurrentDeserializations);
46            serviceLocator = hiv;
47        }
48
49        public void DownloadTaskData(Task t, Action<Task, T> onFinishedAction)
50        {
51            Task<Tuple<Task, T>> task = Task<Tuple<Task, TaskData>>.Factory.StartNew(DownloadTaskData, t)
52                                           .ContinueWith((y) => DeserializeTask(y.Result));
53
54            task.ContinueWith((x) => OnTaskFinished(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion);
55            task.ContinueWith((x) => OnTaskFailed(x), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted);
56        }
57
58        public void DownloadTaskDataAndTask(Guid taskId, Action<Task, T> onFinishedAction)
59        {
60            Task<Tuple<Task, T>> task = Task<Task>.Factory.StartNew(DownloadTask, taskId)
61                                           .ContinueWith((x) => DownloadTaskData(x.Result))
62                                           .ContinueWith((y) => DeserializeTask(y.Result));
63
64            task.ContinueWith((x) => OnTaskFinished(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion);
65            task.ContinueWith((x) => OnTaskFailed(x), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted);
66        }
67
68        private void OnTaskFinished(Task<Tuple<Task, T>> task, Action<Task, T> onFinishedAction)
69        {
70            onFinishedAction(task.Result.Item1, task.Result.Item2);
71        }
72        private void OnTaskFailed(Task<Tuple<Task, T>> task)
73        {
74            task.Exception.Flatten().Handle((e) => { return true; });
75            OnExceptionOccured(task.Exception.Flatten());
76        }
77
78        private Task DownloadTask(object taskId)
79        {
80            Task t = null;
81            HiveClientWeb.TryAndRepeat(() =>
82            {
83                t = serviceLocator.CallHiveService(s => s.GetTask((Guid)taskId));
84            }, Settings.Default.MaxRepeatServiceCalls, "Failed to download task.");
85            return t;
86        }
87
88        protected Tuple<Task, TaskData> DownloadTaskData(object taskId)
89        {
90            return DownloadTaskData((Task)taskId);
91        }
92
93        protected Tuple<Task, TaskData> DownloadTaskData(Task task)
94        {
95            downloadSemaphore.WaitOne();
96            TaskData result = null;
97            try
98            {
99                if (abort) return null;
100                HiveClientWeb.TryAndRepeat(() =>
101                {
102                    result = serviceLocator.CallHiveService(s => s.GetTaskData(task.Id));
103                }, Settings.Default.MaxRepeatServiceCalls, "Failed to download task data.");
104            }
105            finally
106            {
107                downloadSemaphore.Release();
108            }
109            return new Tuple<Task, TaskData>(task, result);
110        }
111
112        protected Tuple<Task, T> DeserializeTask(Tuple<Task, TaskData> taskData)
113        {
114            deserializeSemaphore.WaitOne();
115            try
116            {
117                if (abort || taskData.Item2 == null || taskData.Item1 == null) return null;
118                var deserializedJob = PersistenceUtil.Deserialize<T>(taskData.Item2.Data);
119                taskData.Item2.Data = null; // reduce memory consumption.
120                return new Tuple<Task, T>(taskData.Item1, deserializedJob);
121            }
122            finally
123            {
124                deserializeSemaphore.Release();
125            }
126        }
127
128        public event EventHandler<EventArgs<Exception>> ExceptionOccured;
129        private void OnExceptionOccured(Exception exception)
130        {
131            var handler = ExceptionOccured;
132            if (handler != null) handler(this, new EventArgs<Exception>(exception));
133        }
134
135        #region IDisposable Members
136        public void Dispose()
137        {
138            deserializeSemaphore.Dispose();
139            downloadSemaphore.Dispose();
140        }
141        #endregion
142    }
143}
Note: See TracBrowser for help on using the repository browser.