Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HiveProjectManagement/HeuristicLab.Clients.Hive/3.3/HiveClient.cs @ 15456

Last change on this file since 15456 was 15412, checked in by jkarder, 7 years ago

#2839:

  • worked on resources and projects views
  • changed resource selector to be able to select projects and assigned resources
  • updated service clients
File size: 20.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2016 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Configuration;
25using System.IO;
26using System.Linq;
27using System.Security.Cryptography;
28using System.Threading;
29using System.Threading.Tasks;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
32using HeuristicLab.MainForm;
33using HeuristicLab.PluginInfrastructure;
34using TS = System.Threading.Tasks;
35
36namespace HeuristicLab.Clients.Hive {
37  [Item("HiveClient", "Hive client.")]
38  public sealed class HiveClient : IContent {
39    private static HiveClient instance;
40    public static HiveClient Instance {
41      get {
42        if (instance == null) instance = new HiveClient();
43        return instance;
44      }
45    }
46
47    #region Properties
48    private HiveItemCollection<RefreshableJob> jobs;
49    public HiveItemCollection<RefreshableJob> Jobs {
50      get { return jobs; }
51      set {
52        if (value != jobs) {
53          jobs = value;
54          OnHiveJobsChanged();
55        }
56      }
57    }
58
59    private List<Plugin> onlinePlugins;
60    public List<Plugin> OnlinePlugins {
61      get { return onlinePlugins; }
62      set { onlinePlugins = value; }
63    }
64
65    private List<Plugin> alreadyUploadedPlugins;
66    public List<Plugin> AlreadyUploadedPlugins {
67      get { return alreadyUploadedPlugins; }
68      set { alreadyUploadedPlugins = value; }
69    }
70    #endregion
71
72    private HiveClient() { }
73
74    public void ClearHiveClient() {
75      Jobs.ClearWithoutHiveDeletion();
76      foreach (var j in Jobs) {
77        if (j.RefreshAutomatically) {
78          j.RefreshAutomatically = false; // stop result polling
79        }
80        j.Dispose();
81      }
82      Jobs = null;
83
84      if (onlinePlugins != null)
85        onlinePlugins.Clear();
86      if (alreadyUploadedPlugins != null)
87        alreadyUploadedPlugins.Clear();
88    }
89
90    #region Refresh
91    public void Refresh() {
92      OnRefreshing();
93
94      try {
95        jobs = new HiveItemCollection<RefreshableJob>();
96        var jobsLoaded = HiveServiceLocator.Instance.CallHiveService<IEnumerable<Job>>(s => s.GetJobs());
97
98        foreach (var j in jobsLoaded) {
99          jobs.Add(new RefreshableJob(j));
100        }
101      }
102      catch {
103        jobs = null;
104        throw;
105      }
106      finally {
107        OnRefreshed();
108      }
109    }
110
111    public void RefreshAsync(Action<Exception> exceptionCallback) {
112      var call = new Func<Exception>(delegate() {
113        try {
114          Refresh();
115        }
116        catch (Exception ex) {
117          return ex;
118        }
119        return null;
120      });
121      call.BeginInvoke(delegate(IAsyncResult result) {
122        Exception ex = call.EndInvoke(result);
123        if (ex != null) exceptionCallback(ex);
124      }, null);
125    }
126    #endregion
127
128    #region Store
129    public static void Store(IHiveItem item, CancellationToken cancellationToken) {
130      if (item.Id == Guid.Empty) {
131        if (item is RefreshableJob) {
132          HiveClient.Instance.UploadJob((RefreshableJob)item, cancellationToken);
133        }
134        if (item is JobPermission) {
135          var hep = (JobPermission)item;
136          hep.GrantedUserId = HiveServiceLocator.Instance.CallHiveService((s) => s.GetUserIdByUsername(hep.GrantedUserName));
137          if (hep.GrantedUserId == Guid.Empty) {
138            throw new ArgumentException(string.Format("The user {0} was not found.", hep.GrantedUserName));
139          }
140          HiveServiceLocator.Instance.CallHiveService((s) => s.GrantPermission(hep.JobId, hep.GrantedUserId, hep.Permission));
141        }
142        if (item is Project) {
143          HiveServiceLocator.Instance.CallHiveService(s => s.AddProject((Project)item));
144        }
145      } else {
146        if (item is Job)
147          HiveServiceLocator.Instance.CallHiveService(s => s.UpdateJob((Job)item));
148        if (item is Project)
149          HiveServiceLocator.Instance.CallHiveService(s => s.UpdateProject((Project)item));
150      }
151    }
152    public static void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item, CancellationToken cancellationToken) {
153      var call = new Func<Exception>(delegate() {
154        try {
155          Store(item, cancellationToken);
156        }
157        catch (Exception ex) {
158          return ex;
159        }
160        return null;
161      });
162      call.BeginInvoke(delegate(IAsyncResult result) {
163        Exception ex = call.EndInvoke(result);
164        if (ex != null) exceptionCallback(ex);
165      }, null);
166    }
167    #endregion
168
169    #region Delete
170    public static void Delete(IHiveItem item) {
171      if (item.Id == Guid.Empty && item.GetType() != typeof(JobPermission))
172        return;
173
174      if (item is Job)
175        HiveServiceLocator.Instance.CallHiveService(s => s.DeleteJob(item.Id));
176      if (item is RefreshableJob) {
177        RefreshableJob job = (RefreshableJob)item;
178        if (job.RefreshAutomatically) {
179          job.StopResultPolling();
180        }
181        HiveServiceLocator.Instance.CallHiveService(s => s.DeleteJob(item.Id));
182      }
183      if (item is JobPermission) {
184        var hep = (JobPermission)item;
185        HiveServiceLocator.Instance.CallHiveService(s => s.RevokePermission(hep.JobId, hep.GrantedUserId));
186      }
187      item.Id = Guid.Empty;
188    }
189    #endregion
190
191    #region Events
192    public event EventHandler Refreshing;
193    private void OnRefreshing() {
194      EventHandler handler = Refreshing;
195      if (handler != null) handler(this, EventArgs.Empty);
196    }
197    public event EventHandler Refreshed;
198    private void OnRefreshed() {
199      var handler = Refreshed;
200      if (handler != null) handler(this, EventArgs.Empty);
201    }
202    public event EventHandler HiveJobsChanged;
203    private void OnHiveJobsChanged() {
204      var handler = HiveJobsChanged;
205      if (handler != null) handler(this, EventArgs.Empty);
206    }
207    #endregion
208
209    public static void StartJob(Action<Exception> exceptionCallback, RefreshableJob refreshableJob, CancellationToken cancellationToken) {
210      HiveClient.StoreAsync(
211        new Action<Exception>((Exception ex) => {
212          refreshableJob.ExecutionState = ExecutionState.Prepared;
213          exceptionCallback(ex);
214        }), refreshableJob, cancellationToken);
215      refreshableJob.ExecutionState = ExecutionState.Started;
216    }
217
218    public static void PauseJob(RefreshableJob refreshableJob) {
219      HiveServiceLocator.Instance.CallHiveService(service => {
220        foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) {
221          if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
222            service.PauseTask(task.Task.Id);
223        }
224      });
225      refreshableJob.ExecutionState = ExecutionState.Paused;
226    }
227
228    public static void StopJob(RefreshableJob refreshableJob) {
229      HiveServiceLocator.Instance.CallHiveService(service => {
230        foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) {
231          if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
232            service.StopTask(task.Task.Id);
233        }
234      });
235      refreshableJob.ExecutionState = ExecutionState.Stopped;
236    }
237
238    public static void ResumeJob(RefreshableJob refreshableJob) {
239      HiveServiceLocator.Instance.CallHiveService(service => {
240        foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) {
241          if (task.Task.State == TaskState.Paused) {
242            service.RestartTask(task.Task.Id);
243          }
244        }
245      });
246      refreshableJob.ExecutionState = ExecutionState.Started;
247    }
248
249    #region Upload Job
250    private Semaphore taskUploadSemaphore = new Semaphore(Settings.Default.MaxParallelUploads, Settings.Default.MaxParallelUploads);
251    private static object jobCountLocker = new object();
252    private static object pluginLocker = new object();
253    private void UploadJob(RefreshableJob refreshableJob, CancellationToken cancellationToken) {
254      try {
255        refreshableJob.IsProgressing = true;
256        refreshableJob.Progress.Start("Connecting to server...");
257
258        foreach (OptimizerHiveTask hiveJob in refreshableJob.HiveTasks.OfType<OptimizerHiveTask>()) {
259          hiveJob.SetIndexInParentOptimizerList(null);
260        }
261
262        // upload Job
263        refreshableJob.Progress.Status = "Uploading Job...";
264        refreshableJob.Job.Id = HiveServiceLocator.Instance.CallHiveService((s) => s.AddJob(refreshableJob.Job));
265        refreshableJob.Job = HiveServiceLocator.Instance.CallHiveService((s) => s.GetJob(refreshableJob.Job.Id)); // update owner and permissions
266        cancellationToken.ThrowIfCancellationRequested();
267
268        int totalJobCount = refreshableJob.GetAllHiveTasks().Count();
269        int[] jobCount = new int[1]; // use a reference type (int-array) instead of value type (int) in order to pass the value via a delegate to task-parallel-library
270        cancellationToken.ThrowIfCancellationRequested();
271
272        // upload plugins
273        refreshableJob.Progress.Status = "Uploading plugins...";
274        this.OnlinePlugins = HiveServiceLocator.Instance.CallHiveService((s) => s.GetPlugins());
275        this.AlreadyUploadedPlugins = new List<Plugin>();
276        Plugin configFilePlugin = HiveServiceLocator.Instance.CallHiveService((s) => UploadConfigurationFile(s, onlinePlugins));
277        this.alreadyUploadedPlugins.Add(configFilePlugin);
278        cancellationToken.ThrowIfCancellationRequested();
279
280        // upload tasks
281        refreshableJob.Progress.Status = "Uploading tasks...";
282
283        var tasks = new List<TS.Task>();
284        foreach (HiveTask hiveTask in refreshableJob.HiveTasks) {
285          var task = TS.Task.Factory.StartNew((hj) => {
286            UploadTaskWithChildren(refreshableJob.Progress, (HiveTask)hj, null, refreshableJob.Job.ResourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, cancellationToken);
287          }, hiveTask);
288          task.ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
289          tasks.Add(task);
290        }
291        TS.Task.WaitAll(tasks.ToArray());
292      }
293      finally {
294        refreshableJob.Job.Modified = false;
295        refreshableJob.IsProgressing = false;
296        refreshableJob.Progress.Finish();
297      }
298    }
299
300    /// <summary>
301    /// Uploads the local configuration file as plugin
302    /// </summary>
303    private static Plugin UploadConfigurationFile(IHiveService service, List<Plugin> onlinePlugins) {
304      string exeFilePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, Settings.Default.HLBinaryName);
305      string configFileName = Path.GetFileName(ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath);
306      string configFilePath = ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath;
307      byte[] hash;
308
309      byte[] data = File.ReadAllBytes(configFilePath);
310      using (SHA1 sha1 = SHA1.Create()) {
311        hash = sha1.ComputeHash(data);
312      }
313
314      Plugin configPlugin = new Plugin() { Name = "Configuration", Version = new Version(), Hash = hash };
315      PluginData configFile = new PluginData() { FileName = configFileName, Data = data };
316
317      IEnumerable<Plugin> onlineConfig = onlinePlugins.Where(p => p.Hash.SequenceEqual(hash));
318
319      if (onlineConfig.Count() > 0) {
320        return onlineConfig.First();
321      } else {
322        configPlugin.Id = service.AddPlugin(configPlugin, new List<PluginData> { configFile });
323        return configPlugin;
324      }
325    }
326
327    /// <summary>
328    /// Uploads the given task and all its child-jobs while setting the proper parentJobId values for the childs
329    /// </summary>
330    /// <param name="parentHiveTask">shall be null if its the root task</param>
331    private void UploadTaskWithChildren(IProgress progress, HiveTask hiveTask, HiveTask parentHiveTask, IEnumerable<Guid> groups, int[] taskCount, int totalJobCount, Guid configPluginId, Guid jobId, ILog log, CancellationToken cancellationToken) {
332      taskUploadSemaphore.WaitOne();
333      bool semaphoreReleased = false;
334      try {
335        cancellationToken.ThrowIfCancellationRequested();
336        lock (jobCountLocker) {
337          taskCount[0]++;
338        }
339        TaskData taskData;
340        List<IPluginDescription> plugins;
341
342        if (hiveTask.ItemTask.ComputeInParallel) {
343          hiveTask.Task.IsParentTask = true;
344          hiveTask.Task.FinishWhenChildJobsFinished = true;
345          taskData = hiveTask.GetAsTaskData(true, out plugins);
346        } else {
347          hiveTask.Task.IsParentTask = false;
348          hiveTask.Task.FinishWhenChildJobsFinished = false;
349          taskData = hiveTask.GetAsTaskData(false, out plugins);
350        }
351        cancellationToken.ThrowIfCancellationRequested();
352
353        TryAndRepeat(() => {
354          if (!cancellationToken.IsCancellationRequested) {
355            lock (pluginLocker) {
356              HiveServiceLocator.Instance.CallHiveService((s) => hiveTask.Task.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins));
357            }
358          }
359        }, Settings.Default.MaxRepeatServiceCalls, "Failed to upload plugins");
360        cancellationToken.ThrowIfCancellationRequested();
361        hiveTask.Task.PluginsNeededIds.Add(configPluginId);
362        hiveTask.Task.JobId = jobId;
363
364        log.LogMessage(string.Format("Uploading task ({0} kb, {1} objects)", taskData.Data.Count() / 1024, hiveTask.ItemTask.GetObjectGraphObjects().Count()));
365        TryAndRepeat(() => {
366          if (!cancellationToken.IsCancellationRequested) {
367            if (parentHiveTask != null) {
368              hiveTask.Task.Id = HiveServiceLocator.Instance.CallHiveService((s) => s.AddChildTask(parentHiveTask.Task.Id, hiveTask.Task, taskData));
369            } else {
370              hiveTask.Task.Id = HiveServiceLocator.Instance.CallHiveService((s) => s.AddTask(hiveTask.Task, taskData, groups.ToList()));
371            }
372          }
373        }, Settings.Default.MaxRepeatServiceCalls, "Failed to add task", log);
374        cancellationToken.ThrowIfCancellationRequested();
375
376        lock (jobCountLocker) {
377          progress.ProgressValue = (double)taskCount[0] / totalJobCount;
378          progress.Status = string.Format("Uploaded task ({0} of {1})", taskCount[0], totalJobCount);
379        }
380
381        var tasks = new List<TS.Task>();
382        foreach (HiveTask child in hiveTask.ChildHiveTasks) {
383          var task = TS.Task.Factory.StartNew((tuple) => {
384            var arguments = (Tuple<HiveTask, HiveTask>)tuple;
385            UploadTaskWithChildren(progress, arguments.Item1, arguments.Item2, groups, taskCount, totalJobCount, configPluginId, jobId, log, cancellationToken);
386          }, new Tuple<HiveTask, HiveTask>(child, hiveTask));
387          task.ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
388          tasks.Add(task);
389        }
390        taskUploadSemaphore.Release(); semaphoreReleased = true; // the semaphore has to be release before waitall!
391        TS.Task.WaitAll(tasks.ToArray());
392      }
393      finally {
394        if (!semaphoreReleased) taskUploadSemaphore.Release();
395      }
396    }
397    #endregion
398
399    #region Download Experiment
400    public static void LoadJob(RefreshableJob refreshableJob) {
401      var hiveExperiment = refreshableJob.Job;
402      refreshableJob.IsProgressing = true;
403      TaskDownloader downloader = null;
404
405      try {
406        int totalJobCount = 0;
407        IEnumerable<LightweightTask> allTasks;
408
409        // fetch all task objects to create the full tree of tree of HiveTask objects
410        refreshableJob.Progress.Start("Downloading list of tasks...");
411        allTasks = HiveServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobTasksWithoutStateLog(hiveExperiment.Id));
412        totalJobCount = allTasks.Count();
413
414        refreshableJob.Progress.Status = "Downloading tasks...";
415        downloader = new TaskDownloader(allTasks.Select(x => x.Id));
416        downloader.StartAsync();
417
418        while (!downloader.IsFinished) {
419          refreshableJob.Progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount;
420          refreshableJob.Progress.Status = string.Format("Downloading/deserializing tasks... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount);
421          Thread.Sleep(500);
422
423          if (downloader.IsFaulted) {
424            throw downloader.Exception;
425          }
426        }
427        IDictionary<Guid, HiveTask> allHiveTasks = downloader.Results;
428        var parents = allHiveTasks.Values.Where(x => !x.Task.ParentTaskId.HasValue);
429
430        refreshableJob.Progress.Status = "Downloading/deserializing complete. Displaying tasks...";
431        // build child-task tree
432        foreach (HiveTask hiveTask in parents) {
433          BuildHiveJobTree(hiveTask, allTasks, allHiveTasks);
434        }
435
436        refreshableJob.HiveTasks = new ItemCollection<HiveTask>(parents);
437        if (refreshableJob.IsFinished()) {
438          refreshableJob.ExecutionState = Core.ExecutionState.Stopped;
439        } else if (refreshableJob.IsPaused()) {
440          refreshableJob.ExecutionState = Core.ExecutionState.Paused;
441        } else {
442          refreshableJob.ExecutionState = Core.ExecutionState.Started;
443        }
444        refreshableJob.OnLoaded();
445      }
446      finally {
447        refreshableJob.IsProgressing = false;
448        refreshableJob.Progress.Finish();
449        if (downloader != null) {
450          downloader.Dispose();
451        }
452      }
453    }
454
455    private static void BuildHiveJobTree(HiveTask parentHiveTask, IEnumerable<LightweightTask> allTasks, IDictionary<Guid, HiveTask> allHiveTasks) {
456      IEnumerable<LightweightTask> childTasks = from job in allTasks
457                                                where job.ParentTaskId.HasValue && job.ParentTaskId.Value == parentHiveTask.Task.Id
458                                                orderby job.DateCreated ascending
459                                                select job;
460      foreach (LightweightTask task in childTasks) {
461        HiveTask childHiveTask = allHiveTasks[task.Id];
462        BuildHiveJobTree(childHiveTask, allTasks, allHiveTasks);
463        parentHiveTask.AddChildHiveTask(childHiveTask);
464      }
465    }
466    #endregion
467
468    /// <summary>
469    /// Converts a string which can contain Ids separated by ';' to a enumerable
470    /// </summary>
471    private static IEnumerable<string> ToResourceNameList(string resourceNames) {
472      if (!string.IsNullOrEmpty(resourceNames)) {
473        return resourceNames.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries);
474      } else {
475        return new List<string>();
476      }
477    }
478
479    public static ItemTask LoadItemJob(Guid jobId) {
480      TaskData taskData = HiveServiceLocator.Instance.CallHiveService(s => s.GetTaskData(jobId));
481      try {
482        return PersistenceUtil.Deserialize<ItemTask>(taskData.Data);
483      }
484      catch {
485        return null;
486      }
487    }
488
489    /// <summary>
490    /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
491    /// If repetitions is -1, it is repeated infinitely.
492    /// </summary>
493    public static void TryAndRepeat(Action action, int repetitions, string errorMessage, ILog log = null) {
494      while (true) {
495        try { action(); return; }
496        catch (Exception e) {
497          if (repetitions == 0) throw new HiveException(errorMessage, e);
498          if (log != null) log.LogMessage(string.Format("{0}: {1} - will try again!", errorMessage, e.ToString()));
499          repetitions--;
500        }
501      }
502    }
503
504    public static HiveItemCollection<JobPermission> GetJobPermissions(Guid jobId) {
505      return HiveServiceLocator.Instance.CallHiveService((service) => {
506        IEnumerable<JobPermission> jps = service.GetJobPermissions(jobId);
507        foreach (var hep in jps) {
508          hep.UnmodifiedGrantedUserNameUpdate(service.GetUsernameByUserId(hep.GrantedUserId));
509        }
510        return new HiveItemCollection<JobPermission>(jps);
511      });
512    }
513  }
514}
Note: See TracBrowser for help on using the repository browser.