1 | using System;
|
---|
2 | using System.Threading;
|
---|
3 | using System.Threading.Tasks;
|
---|
4 | using HeuristicLab.Hive;
|
---|
5 |
|
---|
6 | namespace HeuristicLab.Clients.Hive {
|
---|
7 | /// <summary>
|
---|
8 | /// Downloads and deserializes jobs. It avoids too many jobs beeing downloaded or deserialized at the same time to avoid memory problems
|
---|
9 | /// </summary>
|
---|
10 | public class JobDownloader<T> where T : class, IJob {
|
---|
11 | private bool abort = false;
|
---|
12 | // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory
|
---|
13 | private Semaphore downloadSemaphore;
|
---|
14 | private Semaphore deserializeSemaphore;
|
---|
15 |
|
---|
16 | public JobDownloader(int concurrentDownloads, int concurrentDeserializations) {
|
---|
17 | downloadSemaphore = new Semaphore(concurrentDownloads, concurrentDownloads);
|
---|
18 | deserializeSemaphore = new Semaphore(concurrentDeserializations, concurrentDeserializations);
|
---|
19 | TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
|
---|
20 | }
|
---|
21 |
|
---|
22 | public void DownloadJob(Guid jobId, Action<Guid, T, Exception> onFinishedAction) {
|
---|
23 | Task<JobData>.Factory.StartNew((x) => DownloadJob(x), jobId)
|
---|
24 | .ContinueWith((x) => DeserializeJob(x.Result))
|
---|
25 | .ContinueWith((x) => OnJobFinished(jobId, x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously);
|
---|
26 | }
|
---|
27 |
|
---|
28 | private void OnJobFinished(Guid jobId, Task<T> task, Action<Guid, T, Exception> onFinishedAction) {
|
---|
29 | onFinishedAction(jobId, task.Result, task.Exception);
|
---|
30 | }
|
---|
31 |
|
---|
32 | protected JobData DownloadJob(object jobId) {
|
---|
33 | downloadSemaphore.WaitOne();
|
---|
34 | deserializeSemaphore.WaitOne();
|
---|
35 | JobData result;
|
---|
36 | try {
|
---|
37 | if (abort) return null;
|
---|
38 | result = ServiceLocator.Instance.CallHiveService(s => s.GetJobData((Guid)jobId));
|
---|
39 | }
|
---|
40 | finally {
|
---|
41 | downloadSemaphore.Release();
|
---|
42 | }
|
---|
43 | return result;
|
---|
44 | }
|
---|
45 |
|
---|
46 | protected T DeserializeJob(JobData jobData) {
|
---|
47 | try {
|
---|
48 | Job job = ServiceLocator.Instance.CallHiveService(s => s.GetJob(jobData.JobId));
|
---|
49 | if (abort || job == null || jobData == null) return null;
|
---|
50 | var deserializedJob = PersistenceUtil.Deserialize<T>(jobData.Data);
|
---|
51 | jobData.Data = null; // reduce memory consumption.
|
---|
52 | return deserializedJob;
|
---|
53 | }
|
---|
54 | finally {
|
---|
55 | deserializeSemaphore.Release();
|
---|
56 | }
|
---|
57 | }
|
---|
58 |
|
---|
59 | private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
|
---|
60 | e.SetObserved(); // avoid crash of process because task crashes. first exception found is handled in Results property
|
---|
61 | }
|
---|
62 | }
|
---|
63 | }
|
---|