Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.4/ExperimentManager/JobDownloader.cs @ 6168

Last change on this file since 6168 was 6168, checked in by cneumuel, 13 years ago

#1233

  • removed Job-dto objects from slave core (since it stores outdated objects)
  • added command textbox to HiveJobView
  • improved the way the control buttons behave in HiveJobView
  • improved job control (pause and stop is also possible when job is not currently calculating)
  • improved gantt chart view (last state log entry is also displayed)
  • unified code for downloading jobs between experiment manager and hive engine
File size: 2.6 KB
Line 
1using System;
2using System.Threading;
3using System.Threading.Tasks;
4using HeuristicLab.Hive;
5
6namespace HeuristicLab.Clients.Hive {
7  /// <summary>
8  /// Downloads and deserializes jobs. It avoids too many jobs beeing downloaded or deserialized at the same time to avoid memory problems
9  /// </summary>
10  public class JobDownloader<T> where T : class, IJob {
11    private bool abort = false;
12    // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory
13    private Semaphore downloadSemaphore;
14    private Semaphore deserializeSemaphore;
15   
16    public JobDownloader(int concurrentDownloads, int concurrentDeserializations) {
17      downloadSemaphore = new Semaphore(concurrentDownloads, concurrentDownloads);
18      deserializeSemaphore = new Semaphore(concurrentDeserializations, concurrentDeserializations);
19      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
20    }
21
22    public void DownloadJob(Guid jobId, Action<Guid, T, Exception> onFinishedAction) {
23      Task<JobData>.Factory.StartNew((x) => DownloadJob(x), jobId)
24                                     .ContinueWith((x) => DeserializeJob(x.Result))
25                                     .ContinueWith((x) => OnJobFinished(jobId, x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously);
26    }
27
28    private void OnJobFinished(Guid jobId, Task<T> task, Action<Guid, T, Exception> onFinishedAction) {
29      onFinishedAction(jobId, task.Result, task.Exception);
30    }
31
32    protected JobData DownloadJob(object jobId) {
33      downloadSemaphore.WaitOne();
34      deserializeSemaphore.WaitOne();
35      JobData result;
36      try {
37        if (abort) return null;
38        result = ServiceLocator.Instance.CallHiveService(s => s.GetJobData((Guid)jobId));
39      }
40      finally {
41        downloadSemaphore.Release();
42      }
43      return result;
44    }
45
46    protected T DeserializeJob(JobData jobData) {
47      try {
48        Job job = ServiceLocator.Instance.CallHiveService(s => s.GetJob(jobData.JobId));
49        if (abort || job == null || jobData == null) return null;
50        var deserializedJob = PersistenceUtil.Deserialize<T>(jobData.Data);
51        jobData.Data = null; // reduce memory consumption.
52        return deserializedJob;
53      }
54      finally {
55        deserializeSemaphore.Release();
56      }
57    }
58
59    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
60      e.SetObserved(); // avoid crash of process because task crashes. first exception found is handled in Results property
61    }
62  }
63}
Note: See TracBrowser for help on using the repository browser.