Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.4/ExperimentManager/HiveJobDownloader.cs @ 5793

Last change on this file since 5793 was 5793, checked in by cneumuel, 13 years ago

#1233

  • implemented correct downloading of paused jobs. its now also possible to change parameters and resume a algorithm
  • removed Prepare() calls in ExperimentManager and in slave, as it prevents corrent resuming of paused jobs
  • made events in ItemTreeView be invoked in the correct thread
  • reduced log output in ExperimentManager
File size: 3.3 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Threading;
5using System.Threading.Tasks;
6
7namespace HeuristicLab.Clients.Hive {
8  public class HiveJobDownloader {
9    private IEnumerable<Guid> jobIds;
10    private List<Task<HiveJob>> tasks;
11    private bool abort = false;
12
13    public bool IsFinished {
14      get {
15        return tasks.TrueForAll(t => t.Status == TaskStatus.RanToCompletion ||
16                                     t.Status == TaskStatus.Faulted ||
17                                     t.Status == TaskStatus.Canceled);
18      }
19    }
20
21    public int FinishedCount {
22      get {
23        var faulted = tasks.Where(t => t.Status == TaskStatus.Faulted);
24        if (faulted.Count() > 0) {
25          abort = true;
26          throw faulted.First().Exception;
27        }
28        return tasks.Count(t => t.Status == TaskStatus.RanToCompletion ||
29                                t.Status == TaskStatus.Faulted ||
30                                t.Status == TaskStatus.Canceled);
31      }
32    }
33
34    public IDictionary<Guid, HiveJob> Results {
35      get {
36        var results = new Dictionary<Guid, HiveJob>();
37        foreach (var t in tasks) {
38          if (t.Status == TaskStatus.Faulted) {
39            throw t.Exception;
40          }
41          if (t.Result != null)
42            results.Add(t.Result.Job.Id, t.Result);
43        }
44        return results;
45      }
46    }
47
48    public HiveJobDownloader(IEnumerable<Guid> jobIds) {
49      this.jobIds = jobIds;
50    }
51
52    public void StartAsync() {
53      abort = false;
54      tasks = new List<Task<HiveJob>>();
55      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
56      foreach (Guid jobId in jobIds) {
57        tasks.Add(Task<JobData>.Factory.StartNew(
58          (x) => DownloadJob(x), jobId)
59          .ContinueWith((x) => DeserializeJob(x.Result)));
60      }
61    }
62
63    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
64      e.SetObserved(); // avoid crash of process because task crashes. first exception found is handled in Results property
65    }
66
67    // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory
68    private Semaphore downloadSemaphore = new Semaphore(2, 2);
69    private Semaphore deserializeSemaphore = new Semaphore(2, 2);
70    protected JobData DownloadJob(object jobId) {
71      downloadSemaphore.WaitOne();
72      deserializeSemaphore.WaitOne();
73      JobData result;
74      try {
75        if (abort) return null;
76        result = ServiceLocator.Instance.CallHiveService(s => s.GetJobData((Guid)jobId));
77      }
78      finally {
79        downloadSemaphore.Release();
80      }
81      return result;
82    }
83
84    protected HiveJob DeserializeJob(JobData jobData) {
85      try {
86        Job job = ServiceLocator.Instance.CallHiveService(s => s.GetJob(jobData.JobId));
87        if (abort || job == null || jobData == null) return null;
88        HiveJob hiveJob = new HiveJob(job, jobData, false);
89        jobData.Data = null; // reduce memory consumption.
90        hiveJob.Job = job;
91        return hiveJob;
92      }
93      finally {
94        deserializeSemaphore.Release();
95      }
96    }
97  }
98}
Note: See TracBrowser for help on using the repository browser.