Free cookie consent management tool by TermsFeed Policy Generator

source: stable/HeuristicLab.Clients.Hive/3.3/HiveClient.cs @ 17226

Last change on this file since 17226 was 17181, checked in by swagner, 5 years ago

#2875: Merged r17180 from trunk to stable

File size: 30.9 KB
RevLine 
[6976]1#region License Information
2/* HeuristicLab
[17181]3 * Copyright (C) Heuristic and Evolutionary Algorithms Laboratory (HEAL)
[6976]4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Configuration;
25using System.IO;
26using System.Linq;
27using System.Security.Cryptography;
28using System.Threading;
29using System.Threading.Tasks;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
[7582]32using HeuristicLab.MainForm;
[6976]33using HeuristicLab.PluginInfrastructure;
34using TS = System.Threading.Tasks;
35
36namespace HeuristicLab.Clients.Hive {
37  [Item("HiveClient", "Hive client.")]
38  public sealed class HiveClient : IContent {
39    private static HiveClient instance;
40    public static HiveClient Instance {
41      get {
42        if (instance == null) instance = new HiveClient();
43        return instance;
44      }
45    }
46
47    #region Properties
[9219]48    private HiveItemCollection<RefreshableJob> jobs;
49    public HiveItemCollection<RefreshableJob> Jobs {
[6976]50      get { return jobs; }
51      set {
52        if (value != jobs) {
53          jobs = value;
[9219]54          OnHiveJobsChanged();
[6976]55        }
56      }
57    }
58
[17059]59    private IItemList<Project> projects;
60    public IItemList<Project> Projects {
61      get { return projects; }
62    }
63
64    private IItemList<Resource> resources;
65    public IItemList<Resource> Resources {
66      get { return resources; }
67    }
68
69    private Dictionary<Guid, HashSet<Guid>> projectAncestors;
70    public Dictionary<Guid, HashSet<Guid>> ProjectAncestors {
71      get { return projectAncestors; }
72    }
73
74    private Dictionary<Guid, HashSet<Guid>> projectDescendants;
75    public Dictionary<Guid, HashSet<Guid>> ProjectDescendants {
76      get { return projectDescendants; }
77    }
78
79    private Dictionary<Guid, HashSet<Guid>> resourceAncestors;
80    public Dictionary<Guid, HashSet<Guid>> ResourceAncestors {
81      get { return resourceAncestors; }
82    }
83
84    private Dictionary<Guid, HashSet<Guid>> resourceDescendants;
85    public Dictionary<Guid, HashSet<Guid>> ResourceDescendants {
86      get { return resourceDescendants; }
87    }
88
89    private Dictionary<Guid, string> projectNames;
90    public Dictionary<Guid, string> ProjectNames {
91      get { return projectNames; }
92    }
93
94    private HashSet<Project> disabledParentProjects;
95    public HashSet<Project> DisabledParentProjects {
96      get { return disabledParentProjects; }
97    }
98
99    private Dictionary<Guid, string> resourceNames;
100    public Dictionary<Guid, string> ResourceNames {
101      get { return resourceNames; }
102    }
103
104    private HashSet<Resource> disabledParentResources;
105    public HashSet<Resource> DisabledParentResources {
106      get { return disabledParentResources; }
107    }
108
[6976]109    private List<Plugin> onlinePlugins;
110    public List<Plugin> OnlinePlugins {
111      get { return onlinePlugins; }
112      set { onlinePlugins = value; }
113    }
114
115    private List<Plugin> alreadyUploadedPlugins;
116    public List<Plugin> AlreadyUploadedPlugins {
117      get { return alreadyUploadedPlugins; }
118      set { alreadyUploadedPlugins = value; }
119    }
120    #endregion
121
[12665]122    private HiveClient() { }
[6976]123
[9219]124    public void ClearHiveClient() {
125      Jobs.ClearWithoutHiveDeletion();
126      foreach (var j in Jobs) {
127        if (j.RefreshAutomatically) {
128          j.RefreshAutomatically = false; // stop result polling
129        }
130        j.Dispose();
131      }
132      Jobs = null;
133
134      if (onlinePlugins != null)
135        onlinePlugins.Clear();
136      if (alreadyUploadedPlugins != null)
137        alreadyUploadedPlugins.Clear();
138    }
139
[6976]140    #region Refresh
141    public void Refresh() {
142      OnRefreshing();
143
144      try {
[17059]145        projects = new ItemList<Project>();
146        resources = new ItemList<Resource>();
[6976]147        jobs = new HiveItemCollection<RefreshableJob>();
[17059]148        projectNames = new Dictionary<Guid, string>();
149        resourceNames = new Dictionary<Guid, string>();
[6976]150
[17059]151        projectAncestors = new Dictionary<Guid, HashSet<Guid>>();
152        projectDescendants = new Dictionary<Guid, HashSet<Guid>>();
153        resourceAncestors = new Dictionary<Guid, HashSet<Guid>>();
154        resourceDescendants = new Dictionary<Guid, HashSet<Guid>>();
155
156        HiveServiceLocator.Instance.CallHiveService(service => {
157          service.GetProjects().ForEach(p => projects.Add(p));
158          service.GetSlaveGroups().ForEach(g => resources.Add(g));
159          service.GetSlaves().ForEach(s => resources.Add(s));
160          service.GetJobs().ForEach(p => jobs.Add(new RefreshableJob(p)));
161          projectNames = service.GetProjectNames();
162          resourceNames = service.GetResourceNames();
163        });
164
165        RefreshResourceGenealogy();
166        RefreshProjectGenealogy();
167        RefreshDisabledParentProjects();
168        RefreshDisabledParentResources();
[17062]169      } catch {
[6976]170        jobs = null;
[17059]171        projects = null;
172        resources = null;
[6976]173        throw;
[17062]174      } finally {
[17059]175        OnRefreshed();
176      }
177    }
178
179    public void RefreshProjectsAndResources() {
180      OnRefreshing();
181
182      try {
183        projects = new ItemList<Project>();
184        projectNames = new Dictionary<Guid, string>();
185        resources = new ItemList<Resource>();
186        resourceNames = new Dictionary<Guid, string>();
187
188        projectAncestors = new Dictionary<Guid, HashSet<Guid>>();
189        projectDescendants = new Dictionary<Guid, HashSet<Guid>>();
190        resourceAncestors = new Dictionary<Guid, HashSet<Guid>>();
191        resourceDescendants = new Dictionary<Guid, HashSet<Guid>>();
192
193        HiveServiceLocator.Instance.CallHiveService(service => {
194          service.GetProjects().ForEach(p => projects.Add(p));
195          service.GetSlaveGroups().ForEach(g => resources.Add(g));
196          service.GetSlaves().ForEach(s => resources.Add(s));
197          projectNames = service.GetProjectNames();
198          resourceNames = service.GetResourceNames();
199        });
200
201        RefreshResourceGenealogy();
202        RefreshProjectGenealogy();
203        RefreshDisabledParentProjects();
204        RefreshDisabledParentResources();
205      } catch {
206        projects = null;
207        resources = null;
208        throw;
[15495]209      } finally {
[6976]210        OnRefreshed();
211      }
212    }
[17059]213
214    public void RefreshAsync(Action<Exception> exceptionCallback) {
[17062]215      var call = new Func<Exception>(delegate () {
[17059]216        try {
217          Refresh();
[17062]218        } catch (Exception ex) {
[17059]219          return ex;
220        }
221        return null;
222      });
[17062]223      call.BeginInvoke(delegate (IAsyncResult result) {
[17059]224        Exception ex = call.EndInvoke(result);
225        if (ex != null) exceptionCallback(ex);
226      }, null);
227    }
228
229    private void RefreshResourceGenealogy() {
230      resourceAncestors.Clear();
231      resourceDescendants.Clear();
232
233      // fetch resource ancestor set
234      HiveServiceLocator.Instance.CallHiveService(service => {
235        var ra = service.GetResourceGenealogy();
236        ra.Keys.ToList().ForEach(k => resourceAncestors.Add(k, new HashSet<Guid>()));
237        resourceAncestors.Keys.ToList().ForEach(k => resourceAncestors[k].UnionWith(ra[k]));
238      });
239
240      // build resource descendant set
241      resourceAncestors.Keys.ToList().ForEach(k => resourceDescendants.Add(k, new HashSet<Guid>()));
242      foreach (var ra in resourceAncestors) {
[17062]243        foreach (var ancestor in ra.Value) {
[17059]244          resourceDescendants[ancestor].Add(ra.Key);
245        }
246      }
247    }
248
249    private void RefreshProjectGenealogy() {
250      projectAncestors.Clear();
251      projectDescendants.Clear();
252
253      // fetch project ancestor list
254      HiveServiceLocator.Instance.CallHiveService(service => {
255        var pa = service.GetProjectGenealogy();
256        pa.Keys.ToList().ForEach(k => projectAncestors.Add(k, new HashSet<Guid>()));
257        projectAncestors.Keys.ToList().ForEach(k => projectAncestors[k].UnionWith(pa[k]));
258      });
259
260      // build project descendant list
261      projectAncestors.Keys.ToList().ForEach(k => projectDescendants.Add(k, new HashSet<Guid>()));
[17062]262      foreach (var pa in projectAncestors) {
263        foreach (var ancestor in pa.Value) {
[17059]264          projectDescendants[ancestor].Add(pa.Key);
265        }
266      }
267    }
268
269    private void RefreshDisabledParentProjects() {
270      disabledParentProjects = new HashSet<Project>();
271
272      foreach (var pid in projects
273        .Where(x => x.ParentProjectId.HasValue)
274        .SelectMany(x => projectAncestors[x.Id]).Distinct()
275        .Where(x => !projects.Select(y => y.Id).Contains(x))) {
276        var p = new Project();
277        p.Id = pid;
278        p.ParentProjectId = projectAncestors[pid].FirstOrDefault();
279        p.Name = projectNames[pid];
280        disabledParentProjects.Add(p);
281      }
282    }
283
284    private void RefreshDisabledParentResources() {
285      disabledParentResources = new HashSet<Resource>();
286
287      foreach (var rid in resources
288        .Where(x => x.ParentResourceId.HasValue)
289        .SelectMany(x => resourceAncestors[x.Id]).Distinct()
290        .Where(x => !resources.Select(y => y.Id).Contains(x))) {
291        var r = new SlaveGroup();
292        r.Id = rid;
293        r.ParentResourceId = resourceAncestors[rid].FirstOrDefault();
294        r.Name = resourceNames[rid];
295        disabledParentResources.Add(r);
296      }
297    }
298
299    public IEnumerable<Project> GetAvailableProjectAncestors(Guid id) {
300      if (projectAncestors.ContainsKey(id)) return projects.Where(x => projectAncestors[id].Contains(x.Id));
301      else return Enumerable.Empty<Project>();
302    }
303
304    public IEnumerable<Project> GetAvailableProjectDescendants(Guid id) {
305      if (projectDescendants.ContainsKey(id)) return projects.Where(x => projectDescendants[id].Contains(x.Id));
306      else return Enumerable.Empty<Project>();
307    }
308
309    public IEnumerable<Resource> GetAvailableResourceAncestors(Guid id) {
310      if (resourceAncestors.ContainsKey(id)) return resources.Where(x => resourceAncestors[id].Contains(x.Id));
311      else return Enumerable.Empty<Resource>();
312    }
313
314    public IEnumerable<Resource> GetAvailableResourceDescendants(Guid id) {
315      if (resourceDescendants.ContainsKey(id)) return resources.Where(x => resourceDescendants[id].Contains(x.Id));
316      else return Enumerable.Empty<Resource>();
317    }
318
319    public IEnumerable<Resource> GetAvailableResourcesForProject(Guid id) {
320      var assignedProjectResources = HiveServiceLocator.Instance.CallHiveService(s => s.GetAssignedResourcesForProject(id));
321      return resources.Where(x => assignedProjectResources.Select(y => y.ResourceId).Contains(x.Id));
322    }
323
324    public IEnumerable<Resource> GetDisabledResourceAncestors(IEnumerable<Resource> availableResources) {
325      var missingParentIds = availableResources
326        .Where(x => x.ParentResourceId.HasValue)
327        .SelectMany(x => resourceAncestors[x.Id]).Distinct()
328        .Where(x => !availableResources.Select(y => y.Id).Contains(x));
329
330      return resources.OfType<SlaveGroup>().Union(disabledParentResources).Where(x => missingParentIds.Contains(x.Id));
331    }
[6976]332    #endregion
333
334    #region Store
335    public static void Store(IHiveItem item, CancellationToken cancellationToken) {
336      if (item.Id == Guid.Empty) {
337        if (item is RefreshableJob) {
[17059]338          item.Id = HiveClient.Instance.UploadJob((RefreshableJob)item, cancellationToken);
[6976]339        }
340        if (item is JobPermission) {
341          var hep = (JobPermission)item;
[7132]342          hep.GrantedUserId = HiveServiceLocator.Instance.CallHiveService((s) => s.GetUserIdByUsername(hep.GrantedUserName));
[6976]343          if (hep.GrantedUserId == Guid.Empty) {
344            throw new ArgumentException(string.Format("The user {0} was not found.", hep.GrantedUserName));
345          }
[7132]346          HiveServiceLocator.Instance.CallHiveService((s) => s.GrantPermission(hep.JobId, hep.GrantedUserId, hep.Permission));
[6976]347        }
[17059]348        if (item is Project) {
349          item.Id = HiveServiceLocator.Instance.CallHiveService(s => s.AddProject((Project)item));
350        }
[6976]351      } else {
[17059]352        if (item is Job) {
353          var job = (Job)item;
354          HiveServiceLocator.Instance.CallHiveService(s => s.UpdateJob(job, job.ResourceIds));
355        }
356        if (item is Project)
357          HiveServiceLocator.Instance.CallHiveService(s => s.UpdateProject((Project)item));
[6976]358      }
359    }
360    public static void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item, CancellationToken cancellationToken) {
[17062]361      var call = new Func<Exception>(delegate () {
[6976]362        try {
363          Store(item, cancellationToken);
[17062]364        } catch (Exception ex) {
[6976]365          return ex;
366        }
367        return null;
368      });
[17062]369      call.BeginInvoke(delegate (IAsyncResult result) {
[6976]370        Exception ex = call.EndInvoke(result);
371        if (ex != null) exceptionCallback(ex);
372      }, null);
373    }
374    #endregion
375
376    #region Delete
377    public static void Delete(IHiveItem item) {
[7059]378      if (item.Id == Guid.Empty && item.GetType() != typeof(JobPermission))
[6976]379        return;
380
381      if (item is Job)
[17059]382        HiveServiceLocator.Instance.CallHiveService(s => s.UpdateJobState(item.Id, JobState.StatisticsPending));
[7144]383      if (item is RefreshableJob) {
384        RefreshableJob job = (RefreshableJob)item;
385        if (job.RefreshAutomatically) {
386          job.StopResultPolling();
387        }
[17059]388        HiveServiceLocator.Instance.CallHiveService(s => s.UpdateJobState(item.Id, JobState.StatisticsPending));
[7144]389      }
[6976]390      if (item is JobPermission) {
391        var hep = (JobPermission)item;
[7132]392        HiveServiceLocator.Instance.CallHiveService(s => s.RevokePermission(hep.JobId, hep.GrantedUserId));
[6976]393      }
394      item.Id = Guid.Empty;
395    }
396    #endregion
397
398    #region Events
399    public event EventHandler Refreshing;
400    private void OnRefreshing() {
401      EventHandler handler = Refreshing;
402      if (handler != null) handler(this, EventArgs.Empty);
403    }
404    public event EventHandler Refreshed;
405    private void OnRefreshed() {
406      var handler = Refreshed;
407      if (handler != null) handler(this, EventArgs.Empty);
408    }
[9219]409    public event EventHandler HiveJobsChanged;
410    private void OnHiveJobsChanged() {
411      var handler = HiveJobsChanged;
[6976]412      if (handler != null) handler(this, EventArgs.Empty);
413    }
414    #endregion
415
416    public static void StartJob(Action<Exception> exceptionCallback, RefreshableJob refreshableJob, CancellationToken cancellationToken) {
417      HiveClient.StoreAsync(
418        new Action<Exception>((Exception ex) => {
419          refreshableJob.ExecutionState = ExecutionState.Prepared;
420          exceptionCallback(ex);
421        }), refreshableJob, cancellationToken);
422      refreshableJob.ExecutionState = ExecutionState.Started;
423    }
424
425    public static void PauseJob(RefreshableJob refreshableJob) {
[7132]426      HiveServiceLocator.Instance.CallHiveService(service => {
[6976]427        foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) {
428          if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
429            service.PauseTask(task.Task.Id);
430        }
431      });
432      refreshableJob.ExecutionState = ExecutionState.Paused;
433    }
434
435    public static void StopJob(RefreshableJob refreshableJob) {
[7132]436      HiveServiceLocator.Instance.CallHiveService(service => {
[6976]437        foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) {
438          if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
439            service.StopTask(task.Task.Id);
440        }
441      });
[7156]442      refreshableJob.ExecutionState = ExecutionState.Stopped;
[6976]443    }
444
[7156]445    public static void ResumeJob(RefreshableJob refreshableJob) {
446      HiveServiceLocator.Instance.CallHiveService(service => {
447        foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) {
[7165]448          if (task.Task.State == TaskState.Paused) {
449            service.RestartTask(task.Task.Id);
[7156]450          }
451        }
452      });
453      refreshableJob.ExecutionState = ExecutionState.Started;
454    }
455
[17059]456    public static void UpdateJob(Action<Exception> exceptionCallback, RefreshableJob refreshableJob, CancellationToken cancellationToken) {
457      refreshableJob.IsProgressing = true;
[17062]458      refreshableJob.Progress.Message = "Saving Job...";
[17059]459      HiveClient.StoreAsync(
460        new Action<Exception>((Exception ex) => {
461          exceptionCallback(ex);
462        }), refreshableJob.Job, cancellationToken);
463      refreshableJob.IsProgressing = false;
464      refreshableJob.Progress.Finish();
465    }
466
467    public static void UpdateJob(RefreshableJob refreshableJob) {
468      refreshableJob.IsProgressing = true;
469
470      try {
[17062]471        refreshableJob.Progress.Start("Saving Job...", ProgressMode.Indeterminate);
[17059]472        HiveClient.StoreAsync(new Action<Exception>((Exception ex) => {
473          throw new Exception("Update failed.", ex);
474        }), refreshableJob.Job, new CancellationToken());
475      } finally {
476        refreshableJob.IsProgressing = false;
477        refreshableJob.Progress.Finish();
478      }
479    }
480
481
482
[6976]483    #region Upload Job
484    private Semaphore taskUploadSemaphore = new Semaphore(Settings.Default.MaxParallelUploads, Settings.Default.MaxParallelUploads);
485    private static object jobCountLocker = new object();
486    private static object pluginLocker = new object();
[17059]487    private Guid UploadJob(RefreshableJob refreshableJob, CancellationToken cancellationToken) {
[6976]488      try {
[8156]489        refreshableJob.IsProgressing = true;
[17062]490        refreshableJob.Progress.Start("Connecting to server...", ProgressMode.Indeterminate);
[6976]491
492        foreach (OptimizerHiveTask hiveJob in refreshableJob.HiveTasks.OfType<OptimizerHiveTask>()) {
493          hiveJob.SetIndexInParentOptimizerList(null);
494        }
495
496        // upload Job
[17062]497        refreshableJob.Progress.Message = "Uploading Job...";
[17059]498        refreshableJob.Job.Id = HiveServiceLocator.Instance.CallHiveService((s) => s.AddJob(refreshableJob.Job, refreshableJob.Job.ResourceIds));
[7132]499        refreshableJob.Job = HiveServiceLocator.Instance.CallHiveService((s) => s.GetJob(refreshableJob.Job.Id)); // update owner and permissions
[6976]500        cancellationToken.ThrowIfCancellationRequested();
501
502        int totalJobCount = refreshableJob.GetAllHiveTasks().Count();
503        int[] jobCount = new int[1]; // use a reference type (int-array) instead of value type (int) in order to pass the value via a delegate to task-parallel-library
504        cancellationToken.ThrowIfCancellationRequested();
505
506        // upload plugins
[17062]507        refreshableJob.Progress.Message = "Uploading plugins...";
[7132]508        this.OnlinePlugins = HiveServiceLocator.Instance.CallHiveService((s) => s.GetPlugins());
[6976]509        this.AlreadyUploadedPlugins = new List<Plugin>();
[7132]510        Plugin configFilePlugin = HiveServiceLocator.Instance.CallHiveService((s) => UploadConfigurationFile(s, onlinePlugins));
[6976]511        this.alreadyUploadedPlugins.Add(configFilePlugin);
512        cancellationToken.ThrowIfCancellationRequested();
513
514        // upload tasks
[17062]515        refreshableJob.Progress.Message = "Uploading tasks...";
516        refreshableJob.Progress.ProgressMode = ProgressMode.Determinate;
517        refreshableJob.Progress.ProgressValue = 0;
[6976]518
519        var tasks = new List<TS.Task>();
520        foreach (HiveTask hiveTask in refreshableJob.HiveTasks) {
[8939]521          var task = TS.Task.Factory.StartNew((hj) => {
[17059]522            UploadTaskWithChildren(refreshableJob.Progress, (HiveTask)hj, null, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, cancellationToken);
[8939]523          }, hiveTask);
524          task.ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
525          tasks.Add(task);
[6976]526        }
[8939]527        TS.Task.WaitAll(tasks.ToArray());
[17062]528      } finally {
[8939]529        refreshableJob.Job.Modified = false;
[8159]530        refreshableJob.IsProgressing = false;
[8156]531        refreshableJob.Progress.Finish();
[6976]532      }
[17059]533      return (refreshableJob.Job != null) ? refreshableJob.Job.Id : Guid.Empty;
[6976]534    }
535
536    /// <summary>
537    /// Uploads the local configuration file as plugin
538    /// </summary>
539    private static Plugin UploadConfigurationFile(IHiveService service, List<Plugin> onlinePlugins) {
540      string exeFilePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, Settings.Default.HLBinaryName);
541      string configFileName = Path.GetFileName(ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath);
542      string configFilePath = ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath;
543      byte[] hash;
544
545      byte[] data = File.ReadAllBytes(configFilePath);
546      using (SHA1 sha1 = SHA1.Create()) {
547        hash = sha1.ComputeHash(data);
548      }
549
550      Plugin configPlugin = new Plugin() { Name = "Configuration", Version = new Version(), Hash = hash };
551      PluginData configFile = new PluginData() { FileName = configFileName, Data = data };
552
553      IEnumerable<Plugin> onlineConfig = onlinePlugins.Where(p => p.Hash.SequenceEqual(hash));
554
555      if (onlineConfig.Count() > 0) {
556        return onlineConfig.First();
557      } else {
558        configPlugin.Id = service.AddPlugin(configPlugin, new List<PluginData> { configFile });
559        return configPlugin;
560      }
561    }
562
563    /// <summary>
564    /// Uploads the given task and all its child-jobs while setting the proper parentJobId values for the childs
565    /// </summary>
566    /// <param name="parentHiveTask">shall be null if its the root task</param>
[17059]567    private void UploadTaskWithChildren(IProgress progress, HiveTask hiveTask, HiveTask parentHiveTask, int[] taskCount, int totalJobCount, Guid configPluginId, Guid jobId, ILog log, CancellationToken cancellationToken) {
[6976]568      taskUploadSemaphore.WaitOne();
569      bool semaphoreReleased = false;
570      try {
571        cancellationToken.ThrowIfCancellationRequested();
572        lock (jobCountLocker) {
573          taskCount[0]++;
574        }
575        TaskData taskData;
576        List<IPluginDescription> plugins;
577
[11083]578        if (hiveTask.ItemTask.ComputeInParallel) {
[6976]579          hiveTask.Task.IsParentTask = true;
580          hiveTask.Task.FinishWhenChildJobsFinished = true;
581          taskData = hiveTask.GetAsTaskData(true, out plugins);
582        } else {
583          hiveTask.Task.IsParentTask = false;
584          hiveTask.Task.FinishWhenChildJobsFinished = false;
585          taskData = hiveTask.GetAsTaskData(false, out plugins);
586        }
587        cancellationToken.ThrowIfCancellationRequested();
588
589        TryAndRepeat(() => {
590          if (!cancellationToken.IsCancellationRequested) {
591            lock (pluginLocker) {
[7132]592              HiveServiceLocator.Instance.CallHiveService((s) => hiveTask.Task.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins));
[6976]593            }
594          }
[7125]595        }, Settings.Default.MaxRepeatServiceCalls, "Failed to upload plugins");
[6976]596        cancellationToken.ThrowIfCancellationRequested();
597        hiveTask.Task.PluginsNeededIds.Add(configPluginId);
598        hiveTask.Task.JobId = jobId;
599
600        log.LogMessage(string.Format("Uploading task ({0} kb, {1} objects)", taskData.Data.Count() / 1024, hiveTask.ItemTask.GetObjectGraphObjects().Count()));
601        TryAndRepeat(() => {
602          if (!cancellationToken.IsCancellationRequested) {
603            if (parentHiveTask != null) {
[7132]604              hiveTask.Task.Id = HiveServiceLocator.Instance.CallHiveService((s) => s.AddChildTask(parentHiveTask.Task.Id, hiveTask.Task, taskData));
[6976]605            } else {
[17059]606              hiveTask.Task.Id = HiveServiceLocator.Instance.CallHiveService((s) => s.AddTask(hiveTask.Task, taskData));
[6976]607            }
608          }
[7125]609        }, Settings.Default.MaxRepeatServiceCalls, "Failed to add task", log);
[6976]610        cancellationToken.ThrowIfCancellationRequested();
611
612        lock (jobCountLocker) {
613          progress.ProgressValue = (double)taskCount[0] / totalJobCount;
[17062]614          progress.Message = string.Format("Uploaded task ({0} of {1})", taskCount[0], totalJobCount);
[6976]615        }
616
617        var tasks = new List<TS.Task>();
618        foreach (HiveTask child in hiveTask.ChildHiveTasks) {
[8939]619          var task = TS.Task.Factory.StartNew((tuple) => {
[6976]620            var arguments = (Tuple<HiveTask, HiveTask>)tuple;
[17059]621            UploadTaskWithChildren(progress, arguments.Item1, arguments.Item2, taskCount, totalJobCount, configPluginId, jobId, log, cancellationToken);
[8939]622          }, new Tuple<HiveTask, HiveTask>(child, hiveTask));
623          task.ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
624          tasks.Add(task);
[6976]625        }
626        taskUploadSemaphore.Release(); semaphoreReleased = true; // the semaphore has to be release before waitall!
[8939]627        TS.Task.WaitAll(tasks.ToArray());
[15495]628      } finally {
[6976]629        if (!semaphoreReleased) taskUploadSemaphore.Release();
630      }
631    }
632    #endregion
633
634    #region Download Experiment
635    public static void LoadJob(RefreshableJob refreshableJob) {
636      var hiveExperiment = refreshableJob.Job;
[8156]637      refreshableJob.IsProgressing = true;
[9219]638      TaskDownloader downloader = null;
[6976]639
640      try {
641        int totalJobCount = 0;
642        IEnumerable<LightweightTask> allTasks;
643
644        // fetch all task objects to create the full tree of tree of HiveTask objects
[17062]645        refreshableJob.Progress.Start("Downloading list of tasks...", ProgressMode.Indeterminate);
[9219]646        allTasks = HiveServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobTasksWithoutStateLog(hiveExperiment.Id));
[6976]647        totalJobCount = allTasks.Count();
648
[17062]649        refreshableJob.Progress.Message = "Downloading tasks...";
650        refreshableJob.Progress.ProgressMode = ProgressMode.Determinate;
651        refreshableJob.Progress.ProgressValue = 0.0;
[9219]652        downloader = new TaskDownloader(allTasks.Select(x => x.Id));
[6976]653        downloader.StartAsync();
654
655        while (!downloader.IsFinished) {
656          refreshableJob.Progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount;
[17062]657          refreshableJob.Progress.Message = string.Format("Downloading/deserializing tasks... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount);
[6976]658          Thread.Sleep(500);
659
660          if (downloader.IsFaulted) {
661            throw downloader.Exception;
662          }
663        }
664        IDictionary<Guid, HiveTask> allHiveTasks = downloader.Results;
[7165]665        var parents = allHiveTasks.Values.Where(x => !x.Task.ParentTaskId.HasValue);
[6976]666
[17062]667        refreshableJob.Progress.Message = "Downloading/deserializing complete. Displaying tasks...";
668        refreshableJob.Progress.ProgressMode = ProgressMode.Indeterminate;
669
[7165]670        // build child-task tree
671        foreach (HiveTask hiveTask in parents) {
672          BuildHiveJobTree(hiveTask, allTasks, allHiveTasks);
673        }
[6976]674
[7165]675        refreshableJob.HiveTasks = new ItemCollection<HiveTask>(parents);
[6976]676        if (refreshableJob.IsFinished()) {
677          refreshableJob.ExecutionState = Core.ExecutionState.Stopped;
[15262]678        } else if (refreshableJob.IsPaused()) {
679          refreshableJob.ExecutionState = Core.ExecutionState.Paused;
[17062]680        } else {
[6976]681          refreshableJob.ExecutionState = Core.ExecutionState.Started;
682        }
683        refreshableJob.OnLoaded();
[17062]684      } finally {
[8159]685        refreshableJob.IsProgressing = false;
[8156]686        refreshableJob.Progress.Finish();
[9219]687        if (downloader != null) {
688          downloader.Dispose();
689        }
[6976]690      }
691    }
692
[7125]693    private static void BuildHiveJobTree(HiveTask parentHiveTask, IEnumerable<LightweightTask> allTasks, IDictionary<Guid, HiveTask> allHiveTasks) {
694      IEnumerable<LightweightTask> childTasks = from job in allTasks
695                                                where job.ParentTaskId.HasValue && job.ParentTaskId.Value == parentHiveTask.Task.Id
[6976]696                                                orderby job.DateCreated ascending
697                                                select job;
698      foreach (LightweightTask task in childTasks) {
[7125]699        HiveTask childHiveTask = allHiveTasks[task.Id];
[8700]700        BuildHiveJobTree(childHiveTask, allTasks, allHiveTasks);
[7125]701        parentHiveTask.AddChildHiveTask(childHiveTask);
[6976]702      }
703    }
704    #endregion
705
706    /// <summary>
707    /// Converts a string which can contain Ids separated by ';' to a enumerable
708    /// </summary>
709    private static IEnumerable<string> ToResourceNameList(string resourceNames) {
710      if (!string.IsNullOrEmpty(resourceNames)) {
[8109]711        return resourceNames.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries);
[6976]712      } else {
713        return new List<string>();
714      }
715    }
716
717    public static ItemTask LoadItemJob(Guid jobId) {
[7132]718      TaskData taskData = HiveServiceLocator.Instance.CallHiveService(s => s.GetTaskData(jobId));
[6976]719      try {
720        return PersistenceUtil.Deserialize<ItemTask>(taskData.Data);
[17062]721      } catch {
[6976]722        return null;
723      }
724    }
725
726    /// <summary>
727    /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
728    /// If repetitions is -1, it is repeated infinitely.
729    /// </summary>
730    public static void TryAndRepeat(Action action, int repetitions, string errorMessage, ILog log = null) {
731      while (true) {
[17062]732        try { action(); return; } catch (Exception e) {
[6976]733          if (repetitions == 0) throw new HiveException(errorMessage, e);
734          if (log != null) log.LogMessage(string.Format("{0}: {1} - will try again!", errorMessage, e.ToString()));
735          repetitions--;
736        }
737      }
738    }
739
740    public static HiveItemCollection<JobPermission> GetJobPermissions(Guid jobId) {
[7132]741      return HiveServiceLocator.Instance.CallHiveService((service) => {
[6976]742        IEnumerable<JobPermission> jps = service.GetJobPermissions(jobId);
743        foreach (var hep in jps) {
[7059]744          hep.UnmodifiedGrantedUserNameUpdate(service.GetUsernameByUserId(hep.GrantedUserId));
[6976]745        }
746        return new HiveItemCollection<JobPermission>(jps);
747      });
748    }
[17059]749
750    public string GetProjectAncestry(Guid projectId) {
751      if (projectId == null || projectId == Guid.Empty) return "";
752      var projects = projectAncestors[projectId].Reverse().ToList();
753      projects.Add(projectId);
754      return string.Join(" » ", projects.Select(x => ProjectNames[x]).ToArray());
755    }
756
757    public IEnumerable<Resource> GetAssignedResourcesForJob(Guid jobId) {
758      var assignedJobResource = HiveServiceLocator.Instance.CallHiveService(service => service.GetAssignedResourcesForJob(jobId));
759      return Resources.Where(x => assignedJobResource.Select(y => y.ResourceId).Contains(x.Id));
760    }
[6976]761  }
[11083]762}
Note: See TracBrowser for help on using the repository browser.