Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3133_ProblemModifiers/HeuristicLab.Problems.Modifiers/Hive/HiveEvaluationResultPoller.cs @ 18029

Last change on this file since 18029 was 18029, checked in by bwerth, 3 years ago

#3133 added implementation of problem modifiers

File size: 5.0 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.IO;
4using System.Linq;
5using System.Threading;
6using System.Threading.Tasks;
7using HeuristicLab.Clients.Hive;
8using HeuristicLab.Core;
9
10namespace HeuristicLab.Problems.Modifiers {
11  /// <summary>
12  /// non-storable poller that downloads hive jobs
13  /// </summary>
14  public class HiveEvaluationResultPoller {
15    protected static readonly object LogLocker = new object();
16    protected static readonly Semaphore Throttle = new Semaphore(90, 90); //The Hive server throttles connections down to 100
17  }
18  public class HiveEvaluationResultPoller<T> : HiveEvaluationResultPoller where T : IExecutable {
19    private Thread thread;
20    private readonly object locker = new object();
21    private readonly Dictionary<Guid, TaskCompletionSource<T>> jobs = new Dictionary<Guid, TaskCompletionSource<T>>();
22
23    public TimeSpan Interval { get; set; }
24    public string LogFile;
25    public HiveEvaluationResultPoller(TimeSpan interval) {
26      Interval = interval;
27    }
28
29    public TaskCompletionSource<T> StartEvaluation(T evaluation, HiveApi.Options options) {
30      HiveApi.RefreshHiveClient(out var projects, out _);
31      var projectId = HiveApi.GuardProjectId(options.ProjectId, projects);
32      var resourceIds = options.ResourceIds;
33      var jobName = HiveApi.GuardJobName(new[] { evaluation }, options.JobName);
34      var distribute = options.Distribute;
35      using (var refreshableJob = HiveApi.PackJob(new[] { evaluation }, projectId, resourceIds, jobName, distribute)) {
36        Throttle.WaitOne();
37        try {
38          HiveClient.Store(refreshableJob, CancellationToken.None);
39        } catch (Exception e) {
40          WriteToLog(new[] { "Exception occured when uploading Job : " + DateTime.Now, e.ToString(), "========" });
41          throw;
42        } finally {
43          Throttle.Release();
44        }
45        var comp = new TaskCompletionSource<T>();
46        lock (locker) {
47          var startNew = jobs.Count == 0;
48          jobs.Add(refreshableJob.Id, comp);
49          if (startNew) Start();
50        }
51        return comp;
52      }
53    }
54
55    #region Helpers
56    private void Start() {
57      thread = new Thread(RunPolling);
58      thread.Start();
59    }
60
61    private void RunPolling() {
62      var stopRequested = false;
63      do {
64        try {
65          Thread.Sleep(Interval);
66          stopRequested = FetchJobResults();
67          HiveApi.RefreshHiveClient(out _, out _);
68        } catch (Exception e) {
69          WriteToLog(new[] { "Poller Exception occurred at : " + DateTime.Now, e.ToString(), "========" });
70        }
71      } while (!stopRequested);
72    }
73
74    private bool FetchJobResults() {
75      var stopping = false;
76      HiveServiceLocator.Instance.CallHiveService(service => {
77        Guid[] tasks1;
78        lock (locker) {
79          tasks1 = jobs.Keys.ToArray();
80        }
81
82        var toRemoves = new HashSet<Guid>();
83        foreach (var x in tasks1) {
84          LightweightTask task;
85          try {
86            task = service.GetLightweightJobTasksWithoutStateLog(x).Single();
87          } catch (Exception e) {
88            toRemoves.Add(x);
89            lock (locker) {
90              if (jobs.ContainsKey(x)) jobs[x].SetException(e);
91            }
92            continue;
93          }
94          switch (task.State) {
95            case TaskState.Offline:
96            case TaskState.Waiting:
97            case TaskState.Transferring:
98            case TaskState.Calculating:
99            case TaskState.Paused:
100              break;
101            case TaskState.Finished:
102              var evaluation = (T)PersistenceUtil.Deserialize<ItemTask>(service.GetTaskData(task.Id).Data).Item;
103              HiveClient.Delete(service.GetJob(x));
104              toRemoves.Add(x);
105              lock (locker) {
106                if (jobs.ContainsKey(x))
107                  jobs[x].SetResult(evaluation);
108              }
109              break;
110            case TaskState.Aborted:
111            case TaskState.Failed:
112              toRemoves.Add(x);
113              lock (locker) {
114                if (jobs.ContainsKey(x))
115                  jobs[x].SetException(new HiveException($"The evaluation with job id {x} and task id {task.Id} failed. Please consult the Hive Job-Manager for further information"));
116              }
117              break;
118            default:
119              throw new ArgumentOutOfRangeException();
120          }
121        }
122
123        lock (locker) {
124          foreach (var x in toRemoves.Where(x => jobs.ContainsKey(x))) {
125            jobs.Remove(x);
126            if (jobs.Count != 0) continue;
127            stopping = true;
128            thread = null;
129          }
130        }
131
132      });
133      return stopping;
134    }
135    private void WriteToLog(IEnumerable<string> lines) {
136      if (LogFile == null) return;
137      lock (LogLocker) {
138        try {
139          File.AppendAllLines(LogFile, lines);
140        } catch { // ignored
141        }
142      }
143    }
144    #endregion
145  }
146}
147
Note: See TracBrowser for help on using the repository browser.