Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3133_ProblemModifiers/HeuristicLab.Problems.Modifiers/Hive/HiveApi.cs @ 18029

Last change on this file since 18029 was 18029, checked in by bwerth, 3 years ago

#3133 added implementation of problem modifiers

File size: 5.9 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Threading;
5using HeuristicLab.Clients.Hive;
6using HeuristicLab.Core;
7using HeuristicLab.Optimization;
8
9namespace HeuristicLab.Problems.Modifiers {
10  public static class HiveApi {
11    private static readonly object locker = new object();
12
13    public class Options {
14      public Guid ProjectId { get; set; }
15      public IEnumerable<Guid> ResourceIds { get; set; }
16      public string JobName { get; set; }
17      public bool Distribute { get; set; }
18    }
19
20    public static IEnumerable<T> ExecuteInHive<T>(IEnumerable<T> executables, Options options, CancellationToken cancellationToken) where T : IExecutable {
21      if (options == null) {
22        options = new Options {
23          ProjectId = Guid.Empty,
24          ResourceIds = new Guid[0],
25          JobName = string.Empty,
26          Distribute = true
27        };
28      }
29
30      var safeExecutables = executables as T[] ?? executables.ToArray();
31      if (!safeExecutables.Any())
32        throw new ArgumentException("At least one executable must be specified.");
33
34      Exception exception = null;
35      bool jobFinished;
36      var tries = 0;
37      const int maxRetries = 5;
38      do {
39        exception = null;
40        Project[] projects;
41        lock (locker) {
42          HiveClient.Instance.Refresh();
43          projects = HiveClient.Instance.Projects.ToArray();
44        }
45
46        var projectId = GuardProjectId(options.ProjectId, projects);
47        var resourceIds = GuardResourceIds(options.ResourceIds, projectId);
48        var jobName = GuardJobName(safeExecutables, options.JobName);
49        var distribute = options.Distribute;
50
51        using (var refreshableJob = PackJob(safeExecutables, projectId, resourceIds, jobName, distribute)) {
52          HiveClient.Store(refreshableJob, cancellationToken);
53
54          var taskIds = refreshableJob.HiveTasks.Select(x => x.Task.Id).ToArray();
55
56          using (var signal = SetupWaitHandle(refreshableJob, e => exception = e)) {
57            refreshableJob.StartResultPolling();
58
59            try {
60              signal.Wait(cancellationToken);
61            } catch (OperationCanceledException) {
62            } finally {
63              HiveClient.LoadJob(refreshableJob);
64              executables = UnpackJob<T>(refreshableJob, taskIds);
65              HiveClient.Delete(refreshableJob); // keep problematic jobs for debugging
66            }
67          }
68          jobFinished = refreshableJob.IsFinished();
69        }
70      } while (tries++ < maxRetries && (exception != null || !jobFinished));
71
72      return executables;
73    }
74
75    public static void RefreshHiveClient(out Project[] projects, out Resource[] resources) {
76      lock (locker) {
77        HiveClient.Instance.Refresh();
78        projects = HiveClient.Instance.Projects.ToArray();
79        resources = HiveClient.Instance.Resources.ToArray();
80      }
81    }
82
83    public static Guid GuardProjectId(Guid projectId, IEnumerable<Project> projects) {
84      Project selectedProject;
85
86      if (projectId == Guid.Empty) {
87        selectedProject = projects.FirstOrDefault();
88        if (selectedProject == null) throw new ArgumentException("A default project is not available.");
89      } else {
90        selectedProject = projects.SingleOrDefault(x => x.Id == projectId);
91        if (selectedProject == null) throw new ArgumentException("The specified project is not available.");
92      }
93
94      return selectedProject.Id;
95    }
96
97    public static IEnumerable<Guid> GuardResourceIds(IEnumerable<Guid> resourceIds, Guid projectId) {
98      Resource[] availableResources;
99      lock (locker) availableResources = HiveClient.Instance.GetAvailableResourcesForProject(projectId).ToArray();
100
101      var availableResourceIds = availableResources.Select(x => x.Id).ToArray();
102      var guardResourceIds = resourceIds as Guid[] ?? resourceIds.ToArray();
103      var unavailableResources = guardResourceIds.Except(availableResourceIds);
104      if (unavailableResources.Any()) throw new ArgumentException("Some of the specified resources are not available for the specified project.");
105
106      return guardResourceIds;
107    }
108
109    public static string GuardJobName<T>(IEnumerable<T> executables, string jobName) where T : IExecutable {
110      if (string.IsNullOrEmpty(jobName)) jobName = string.Join(" + ", executables);
111      return jobName;
112    }
113
114    public static RefreshableJob PackJob<T>(IEnumerable<T> executables, Guid projectId, IEnumerable<Guid> resourceIds, string jobName, bool distribute) where T : IExecutable {
115      var refreshableJob = new RefreshableJob() {
116        Job = {
117          Name = jobName,
118          ProjectId = projectId,
119          ResourceIds = resourceIds.ToList()
120        }
121      };
122
123      foreach (var executable in executables) {
124        var itemTask = ItemTask.GetItemTaskForItem(executable);
125        itemTask.ComputeInParallel = distribute && (executable is Experiment || executable is BatchRun);
126        var hiveTask = itemTask.CreateHiveTask();
127        refreshableJob.HiveTasks.Add(hiveTask);
128      }
129
130      return refreshableJob;
131    }
132
133    public static IEnumerable<T> UnpackJob<T>(RefreshableJob refreshableJob, IList<Guid> taskIds) where T : IExecutable {
134      var hiveTasks = refreshableJob.HiveTasks.OrderBy(x => taskIds.IndexOf(x.Task.Id));
135      foreach (var hiveTask in hiveTasks) {
136        var executable = (T)hiveTask.ItemTask.Item;
137        yield return executable;
138      }
139    }
140
141    public static ManualResetEventSlim SetupWaitHandle(RefreshableJob refreshableJob, Action<Exception> exceptionCallback) {
142      var signal = new ManualResetEventSlim(false);
143      refreshableJob.StateLogListChanged += (sender, args) => {
144        if (refreshableJob.IsFinished())
145          signal.Set();
146      };
147      refreshableJob.ExceptionOccured += (sender, args) => {
148        exceptionCallback(args.Value);
149        signal.Set();
150      };
151      return signal;
152    }
153  }
154}
Note: See TracBrowser for help on using the repository browser.