Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.ExperimentManager/3.3/HiveJobDownloader.cs @ 5329

Last change on this file since 5329 was 5329, checked in by cneumuel, 13 years ago

#1260

  • robustified HiveEngine and HiveJobDownloader (handling of unobserved exceptions from tasks)
File size: 3.6 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using HeuristicLab.Hive.ExperimentManager.Jobs;
6using System.Threading;
7using System.Threading.Tasks;
8using HeuristicLab.Hive.Contracts.BusinessObjects;
9using HeuristicLab.Hive.Contracts.Interfaces;
10using HeuristicLab.Clients.Common;
11
12namespace HeuristicLab.Hive.ExperimentManager {
13  public class HiveJobDownloader {
14    private IEnumerable<Guid> jobIds;
15    private List<Task<HiveJob>> tasks;
16    private bool abort = false;
17
18    public bool IsFinished {
19      get {
20        return tasks.TrueForAll(t => t.Status == TaskStatus.RanToCompletion ||
21                                     t.Status == TaskStatus.Faulted ||
22                                     t.Status == TaskStatus.Canceled);
23      }
24    }
25
26    public int FinishedCount {
27      get {
28        var faulted = tasks.Where(t => t.Status == TaskStatus.Faulted);
29        if (faulted.Count() > 0) {
30          abort = true;
31          throw faulted.First().Exception;
32        }
33        return tasks.Count(t => t.Status == TaskStatus.RanToCompletion ||
34                                t.Status == TaskStatus.Faulted ||
35                                t.Status == TaskStatus.Canceled);
36      }
37    }
38
39    public IDictionary<Guid, HiveJob> Results {
40      get {
41        var results = new Dictionary<Guid, HiveJob>();
42        foreach (var t in tasks) {
43          if (t.Status == TaskStatus.Faulted) {
44            throw t.Exception;
45          }
46          if(t.Result != null)
47            results.Add(t.Result.JobDto.Id, t.Result);
48        }
49        return results;
50      }
51    }
52
53    public HiveJobDownloader(IEnumerable<Guid> jobIds) {
54      this.jobIds = jobIds;
55    }
56
57    public void StartAsync() {
58      abort = false;
59      tasks = new List<Task<HiveJob>>();
60      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
61      foreach (Guid jobId in jobIds) {
62        tasks.Add(Task<SerializedJob>.Factory.StartNew(
63          (x) => DownloadJob(x), jobId)
64          .ContinueWith((x) => DeserializeJob(x.Result)));
65      }
66    }
67
68    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
69      e.SetObserved(); // evoid crash of process because task crashes. first exception found is handled in Results property
70    }
71
72    // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory
73    private Semaphore downloadSemaphore = new Semaphore(2, 2);
74    private Semaphore deserializeSemaphore = new Semaphore(2, 2);
75    protected SerializedJob DownloadJob(object jobId) {
76      downloadSemaphore.WaitOne();
77      deserializeSemaphore.WaitOne();
78      SerializedJob result;
79      try {
80        if (abort) return null;
81        using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
82          result = service.Obj.GetLastSerializedResult((Guid)jobId).Obj;
83        }
84      }
85      finally {
86        downloadSemaphore.Release();
87      }
88      return result;
89    }
90
91    protected HiveJob DeserializeJob(SerializedJob serializedJob) {
92      try {
93        if (abort || serializedJob == null) return null;
94        HiveJob job = new HiveJob(serializedJob, false);
95        serializedJob.SerializedJobData = null; // reduce memory consumption.
96        job.Job.Prepare(); // reduce memory consumption.
97        job.JobDto = serializedJob.JobInfo;
98        return job;
99      }
100      finally {
101        deserializeSemaphore.Release();
102      }
103    }
104  }
105}
Note: See TracBrowser for help on using the repository browser.