Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.4/ExperimentManager/ConcurrentJobDownloader.cs @ 6198

Last change on this file since 6198 was 6198, checked in by cneumuel, 13 years ago

#1233

  • small fixes for HiveEngine
File size: 2.7 KB
Line 
1using System;
2using System.Threading;
3using System.Threading.Tasks;
4using HeuristicLab.Hive;
5
6namespace HeuristicLab.Clients.Hive {
7  /// <summary>
8  /// Downloads and deserializes jobs. It avoids too many jobs beeing downloaded or deserialized at the same time to avoid memory problems
9  /// </summary>
10  public class ConcurrentJobDownloader<T> where T : class, IJob {
11    private bool abort = false;
12    // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory
13    private Semaphore downloadSemaphore;
14    private Semaphore deserializeSemaphore;
15   
16    public ConcurrentJobDownloader(int concurrentDownloads, int concurrentDeserializations) {
17      downloadSemaphore = new Semaphore(concurrentDownloads, concurrentDownloads);
18      deserializeSemaphore = new Semaphore(concurrentDeserializations, concurrentDeserializations);
19      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
20    }
21
22    public void DownloadJob(Job job, Action<Job, T, Exception> onFinishedAction) {
23      Task<JobData>.Factory.StartNew((x) => DownloadJob(x), job.Id)
24                                     .ContinueWith((x) => DeserializeJob(x.Result))
25                                     .ContinueWith((x) => OnJobFinished(job, x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously);
26    }
27
28    private void OnJobFinished(Job job, Task<T> task, Action<Job, T, Exception> onFinishedAction) {
29      onFinishedAction(job, task.Result, task.Exception);
30    }
31
32    protected JobData DownloadJob(object jobId) {
33      downloadSemaphore.WaitOne();
34      deserializeSemaphore.WaitOne();
35      JobData result;
36      try {
37        if (abort) return null;
38        result = ServiceLocator.Instance.CallHiveService(s => s.GetJobData((Guid)jobId));
39      }
40      finally {
41        downloadSemaphore.Release();
42      }
43      return result;
44    }
45
46    protected T DeserializeJob(JobData jobData) {
47      try {
48        if (abort || jobData == null) return null;
49        Job job = ServiceLocator.Instance.CallHiveService(s => s.GetJob(jobData.JobId));
50        if (job == null) return null;
51        var deserializedJob = PersistenceUtil.Deserialize<T>(jobData.Data);
52        jobData.Data = null; // reduce memory consumption.
53        return deserializedJob;
54      }
55      finally {
56        deserializeSemaphore.Release();
57      }
58    }
59
60    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
61      e.SetObserved(); // avoid crash of process because task crashes. first exception found is handled in Results property
62    }
63  }
64}
Note: See TracBrowser for help on using the repository browser.