Free cookie consent management tool by TermsFeed Policy Generator

source: branches/1888_OaaS/HeuristicLab.Clients.Hive/3.3/ConcurrentTaskDownloader.cs @ 17958

Last change on this file since 17958 was 9508, checked in by fschoepp, 12 years ago

#1888:
HL:

  • Web projects requires different users to interact with hive. The singleton HiveServiceLocator.Instance doesn't allow different users at the same time, resulting in serialization during access of HiveClient methods.

The following changes have been introduced in favor of a parallel use of the HL libs:

  • HiveClient, TaskDownloader and ConcurrentTaskDownloader may now use a different IHiveServiceLocator than HiveServiceLocator.Instance (all methods have appropriate overloads now).
  • The default instance is still HiveServiceLocator.Instance.

Automated Scaling of Instances:

  • Added Scaler project to solution which represents a WorkerRole that scales the slave instances based on the global cpu utilization of all slaves.
  • Scaler is based on WASABi, rules can be adjusted in rulesstore.xml. Basic rule is: if < 45% global cpu utilization => remove an instance; if > 65% cpu => add an instance. Minimum boundary is 1 and maximum boundary is 8 slave instances.
  • Adjusted Slave project to automatically register itself to a SlaveGroup during WebRole startup (can be adjusted in service configuration).

Web-Frontend:

  • Added basic error messages to the dialogs when an ajax call fails.
  • Removed Styling.js from scripts.
File size: 5.5 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Threading;
24using System.Threading.Tasks;
25using HeuristicLab.Common;
26using HeuristicLab.Hive;
27
28namespace HeuristicLab.Clients.Hive {
29  /// <summary>
30  /// Downloads and deserializes jobs. It avoids too many jobs beeing downloaded or deserialized at the same time to avoid memory problems
31  /// </summary>
32  public class ConcurrentTaskDownloader<T> : IDisposable where T : class, ITask {
33    private bool abort = false;
34    // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory
35    private Semaphore downloadSemaphore;
36    private Semaphore deserializeSemaphore;
37    private IHiveServiceLocator locator;
38
39    public ConcurrentTaskDownloader(int concurrentDownloads, int concurrentDeserializations) : this(concurrentDownloads, concurrentDeserializations, HiveServiceLocator.Instance) {     
40    }
41
42    public ConcurrentTaskDownloader(int concurrentDownloads, int concurrentDeserializations, IHiveServiceLocator locator) {
43      downloadSemaphore = new Semaphore(concurrentDownloads, concurrentDownloads);
44      this.locator = locator;
45      deserializeSemaphore = new Semaphore(concurrentDeserializations, concurrentDeserializations);
46    }
47
48    public void DownloadTaskData(Task t, Action<Task, T> onFinishedAction) {
49      Task<Tuple<Task, T>> task = Task<Tuple<Task, TaskData>>.Factory.StartNew(DownloadTaskData, t)
50                                     .ContinueWith((y) => DeserializeTask(y.Result));
51
52      task.ContinueWith((x) => OnTaskFinished(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion);
53      task.ContinueWith((x) => OnTaskFailed(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted);
54    }
55
56    public void DownloadTaskDataAndTask(Guid taskId, Action<Task, T> onFinishedAction) {
57      Task<Tuple<Task, T>> task = Task<Task>.Factory.StartNew(DownloadTask, taskId)
58                                     .ContinueWith((x) => DownloadTaskData(x.Result))
59                                     .ContinueWith((y) => DeserializeTask(y.Result));
60
61      task.ContinueWith((x) => OnTaskFinished(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion);
62      task.ContinueWith((x) => OnTaskFailed(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted);
63    }
64
65    private void OnTaskFinished(Task<Tuple<Task, T>> task, Action<Task, T> onFinishedAction) {
66      onFinishedAction(task.Result.Item1, task.Result.Item2);
67    }
68    private void OnTaskFailed(Task<Tuple<Task, T>> task, Action<Task, T> onFinishedAction) {
69      task.Exception.Flatten().Handle((e) => { return true; });
70      OnExceptionOccured(task.Exception.Flatten());
71      onFinishedAction(task.Result.Item1, null);
72    }
73
74    private Task DownloadTask(object taskId) {
75      Task t = null;
76      HiveClient.TryAndRepeat(() => {
77        t = locator.CallHiveService(s => s.GetTask((Guid)taskId));
78      }, Settings.Default.MaxRepeatServiceCalls, "Failed to download task.");
79      return t;
80    }
81
82    protected Tuple<Task, TaskData> DownloadTaskData(object taskId) {
83      return DownloadTaskData((Task)taskId);
84    }
85
86    protected Tuple<Task, TaskData> DownloadTaskData(Task task) {
87      downloadSemaphore.WaitOne();
88      TaskData result = null;
89      try {
90        if (abort) return null;
91        HiveClient.TryAndRepeat(() => {
92          result = locator.CallHiveService(s => s.GetTaskData(task.Id));
93        }, Settings.Default.MaxRepeatServiceCalls, "Failed to download task data.");
94      }
95      finally {
96        downloadSemaphore.Release();
97      }
98      return new Tuple<Task, TaskData>(task, result);
99    }
100
101    protected Tuple<Task, T> DeserializeTask(Tuple<Task, TaskData> taskData) {
102      deserializeSemaphore.WaitOne();
103      try {
104        if (abort || taskData.Item2 == null || taskData.Item1 == null) return null;
105        var deserializedJob = PersistenceUtil.Deserialize<T>(taskData.Item2.Data);
106        taskData.Item2.Data = null; // reduce memory consumption.
107        return new Tuple<Task, T>(taskData.Item1, deserializedJob);
108      }
109      finally {
110        deserializeSemaphore.Release();
111      }
112    }
113
114    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
115    private void OnExceptionOccured(Exception exception) {
116      var handler = ExceptionOccured;
117      if (handler != null) handler(this, new EventArgs<Exception>(exception));
118    }
119
120    #region IDisposable Members
121    public void Dispose() {
122      deserializeSemaphore.Dispose();
123      downloadSemaphore.Dispose();
124    }
125    #endregion
126  }
127}
Note: See TracBrowser for help on using the repository browser.