Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.4/ExperimentManager/HiveJobDownloader.cs @ 5535

Last change on this file since 5535 was 5535, checked in by cneumuel, 14 years ago

#1233

  • merged changes in hive experiment manager from 3.3 (more reliable and efficient downloading of jobs)
File size: 3.4 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Threading;
5using System.Threading.Tasks;
6using HeuristicLab.Services.Hive.Common.DataTransfer;
7
8namespace HeuristicLab.Clients.Hive {
9  public class HiveJobDownloader {
10    private IEnumerable<Guid> jobIds;
11    private List<Task<HiveJob>> tasks;
12    private bool abort = false;
13
14    public bool IsFinished {
15      get {
16        return tasks.TrueForAll(t => t.Status == TaskStatus.RanToCompletion ||
17                                     t.Status == TaskStatus.Faulted ||
18                                     t.Status == TaskStatus.Canceled);
19      }
20    }
21
22    public int FinishedCount {
23      get {
24        var faulted = tasks.Where(t => t.Status == TaskStatus.Faulted);
25        if (faulted.Count() > 0) {
26          abort = true;
27          throw faulted.First().Exception;
28        }
29        return tasks.Count(t => t.Status == TaskStatus.RanToCompletion ||
30                                t.Status == TaskStatus.Faulted ||
31                                t.Status == TaskStatus.Canceled);
32      }
33    }
34
35    public IDictionary<Guid, HiveJob> Results {
36      get {
37        var results = new Dictionary<Guid, HiveJob>();
38        foreach (var t in tasks) {
39          if (t.Status == TaskStatus.Faulted) {
40            throw t.Exception;
41          }
42          if(t.Result != null)
43            results.Add(t.Result.Job.Id, t.Result);
44        }
45        return results;
46      }
47    }
48
49    public HiveJobDownloader(IEnumerable<Guid> jobIds) {
50      this.jobIds = jobIds;
51    }
52
53    public void StartAsync() {
54      abort = false;
55      tasks = new List<Task<HiveJob>>();
56      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
57      foreach (Guid jobId in jobIds) {
58        tasks.Add(Task<JobData>.Factory.StartNew(
59          (x) => DownloadJob(x), jobId)
60          .ContinueWith((x) => DeserializeJob(x.Result)));
61      }
62    }
63
64    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
65      e.SetObserved(); // avoid crash of process because task crashes. first exception found is handled in Results property
66    }
67
68    // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory
69    private Semaphore downloadSemaphore = new Semaphore(2, 2);
70    private Semaphore deserializeSemaphore = new Semaphore(2, 2);
71    protected JobData DownloadJob(object jobId) {
72      downloadSemaphore.WaitOne();
73      deserializeSemaphore.WaitOne();
74      JobData result;
75      try {
76        if (abort) return null;
77        result = ServiceLocator.Instance.CallHiveService(s => s.GetJobData((Guid)jobId));
78      }
79      finally {
80        downloadSemaphore.Release();
81      }
82      return result;
83    }
84
85    protected HiveJob DeserializeJob(JobData jobData) {
86      try {
87        Job job = ServiceLocator.Instance.CallHiveService(s => s.GetJob(jobData.JobId));
88        if (abort || job == null || jobData == null) return null;
89        HiveJob hiveJob = new HiveJob(job, jobData, false);
90        jobData.Data = null; // reduce memory consumption.
91        hiveJob.OptimizerJob.Prepare(); // reduce memory consumption.
92        hiveJob.Job = job;
93        return hiveJob;
94      }
95      finally {
96        deserializeSemaphore.Release();
97      }
98    }
99  }
100}
Note: See TracBrowser for help on using the repository browser.