Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.4/ExperimentManager/HiveJobDownloader.cs @ 6033

Last change on this file since 6033 was 6033, checked in by cneumuel, 13 years ago

#1233

  • created baseclass for jobs (ItemJob) which derives OperatorJobs and EngineJobs
  • created special view for OptimizerJobs which derives from a more general view
  • removed logic from domain class HiveExperiment and moved it into RefreshableHiveExperiment
  • improved ItemTreeView
  • corrected plugin dependencies
  • fixed bug in database trigger when deleting HiveExperiments
  • added delete cascade for Plugin and PluginData
  • lots of fixes
File size: 3.6 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Threading;
5using System.Threading.Tasks;
6using HeuristicLab.Clients.Hive.ExperimentManager;
7using HeuristicLab.Clients.Hive.Jobs;
8using HeuristicLab.Hive;
9
10namespace HeuristicLab.Clients.Hive {
11  public class HiveJobDownloader {
12    private IEnumerable<Guid> jobIds;
13    private List<Task<HiveJob>> tasks;
14    private bool abort = false;
15
16    public bool IsFinished {
17      get {
18        return tasks.TrueForAll(t => t.Status == TaskStatus.RanToCompletion ||
19                                     t.Status == TaskStatus.Faulted ||
20                                     t.Status == TaskStatus.Canceled);
21      }
22    }
23
24    public int FinishedCount {
25      get {
26        var faulted = tasks.Where(t => t.Status == TaskStatus.Faulted);
27        if (faulted.Count() > 0) {
28          abort = true;
29          throw faulted.First().Exception;
30        }
31        return tasks.Count(t => t.Status == TaskStatus.RanToCompletion ||
32                                t.Status == TaskStatus.Faulted ||
33                                t.Status == TaskStatus.Canceled);
34      }
35    }
36
37    public IDictionary<Guid, HiveJob> Results {
38      get {
39        var results = new Dictionary<Guid, HiveJob>();
40        foreach (var t in tasks) {
41          if (t.Status == TaskStatus.Faulted) {
42            throw t.Exception;
43          }
44          if (t.Result != null)
45            results.Add(t.Result.Job.Id, t.Result);
46        }
47        return results;
48      }
49    }
50
51    public HiveJobDownloader(IEnumerable<Guid> jobIds) {
52      this.jobIds = jobIds;
53    }
54
55    public void StartAsync() {
56      abort = false;
57      tasks = new List<Task<HiveJob>>();
58      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
59      foreach (Guid jobId in jobIds) {
60        tasks.Add(Task<JobData>.Factory.StartNew(
61          (x) => DownloadJob(x), jobId)
62          .ContinueWith((x) => DeserializeJob(x.Result)));
63      }
64    }
65
66    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
67      e.SetObserved(); // avoid crash of process because task crashes. first exception found is handled in Results property
68    }
69
70    // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory
71    private Semaphore downloadSemaphore = new Semaphore(2, 2);
72    private Semaphore deserializeSemaphore = new Semaphore(2, 2);
73    protected JobData DownloadJob(object jobId) {
74      downloadSemaphore.WaitOne();
75      deserializeSemaphore.WaitOne();
76      JobData result;
77      try {
78        if (abort) return null;
79        result = ServiceLocator.Instance.CallHiveService(s => s.GetJobData((Guid)jobId));
80      }
81      finally {
82        downloadSemaphore.Release();
83      }
84      return result;
85    }
86
87    protected HiveJob DeserializeJob(JobData jobData) {
88      try {
89        Job job = ServiceLocator.Instance.CallHiveService(s => s.GetJob(jobData.JobId));
90        if (abort || job == null || jobData == null) return null;
91
92        HiveJob hiveJob;
93        var itemJob = PersistenceUtil.Deserialize<ItemJob>(jobData.Data);
94        if (itemJob is OptimizerJob) {
95          hiveJob = new OptimizerHiveJob((OptimizerJob)itemJob);
96        } else {
97          hiveJob = new HiveJob(itemJob, true);
98        }
99        jobData.Data = null; // reduce memory consumption.
100        hiveJob.Job = job;
101        return hiveJob;
102      }
103      finally {
104        deserializeSemaphore.Release();
105      }
106    }
107  }
108}
Note: See TracBrowser for help on using the repository browser.