Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Clients.Hive/3.3/HiveClient.cs @ 7021

Last change on this file since 7021 was 6976, checked in by ascheibe, 13 years ago

#1672 integrate the Hive client projects into trunk (Hive Job Manager and Administrator)

File size: 21.5 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Configuration;
25using System.IO;
26using System.Linq;
27using System.Security.Cryptography;
28using System.Threading;
29using System.Threading.Tasks;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
32using HeuristicLab.PluginInfrastructure;
33using TS = System.Threading.Tasks;
34
35namespace HeuristicLab.Clients.Hive {
36  [Item("HiveClient", "Hive client.")]
37  public sealed class HiveClient : IContent {
38    private static HiveClient instance;
39    public static HiveClient Instance {
40      get {
41        if (instance == null) instance = new HiveClient();
42        return instance;
43      }
44    }
45
46    #region Properties
47    private ItemCollection<RefreshableJob> jobs;
48    public ItemCollection<RefreshableJob> Jobs {
49      get { return jobs; }
50      set {
51        if (value != jobs) {
52          jobs = value;
53          OnHiveExperimentsChanged();
54        }
55      }
56    }
57
58    private List<Plugin> onlinePlugins;
59    public List<Plugin> OnlinePlugins {
60      get { return onlinePlugins; }
61      set { onlinePlugins = value; }
62    }
63
64    private List<Plugin> alreadyUploadedPlugins;
65    public List<Plugin> AlreadyUploadedPlugins {
66      get { return alreadyUploadedPlugins; }
67      set { alreadyUploadedPlugins = value; }
68    }
69
70    private bool isAllowedPrivileged;
71    public bool IsAllowedPrivileged {
72      get { return isAllowedPrivileged; }
73      set { isAllowedPrivileged = value; }
74    }
75    #endregion
76
77    private HiveClient() { }
78
79    #region Refresh
80    public void Refresh() {
81      OnRefreshing();
82
83      try {
84        this.IsAllowedPrivileged = ServiceLocator.Instance.CallHiveService((s) => s.IsAllowedPrivileged());
85
86        var oldJobs = jobs ?? new ItemCollection<RefreshableJob>();
87        jobs = new HiveItemCollection<RefreshableJob>();
88        var jobsLoaded = ServiceLocator.Instance.CallHiveService<IEnumerable<Job>>(s => s.GetJobs());
89
90        foreach (var j in jobsLoaded) {
91          var job = oldJobs.SingleOrDefault(x => x.Id == j.Id);
92          if (job == null) {
93            // new
94            jobs.Add(new RefreshableJob(j) { IsAllowedPrivileged = this.isAllowedPrivileged });
95          } else {
96            // update
97            job.Job = j;
98            job.IsAllowedPrivileged = this.isAllowedPrivileged;
99            jobs.Add(job);
100          }
101        }
102        // remove those which were not in the list of loaded hiveexperiments
103        foreach (var job in oldJobs) {
104          if (job.Id == Guid.Empty) {
105            // experiment not uploaded... keep
106            jobs.Add(job);
107          } else {
108            job.RefreshAutomatically = false; // stop results polling
109          }
110        }
111      }
112      catch {
113        jobs = null;
114        throw;
115      }
116      finally {
117        OnRefreshed();
118      }
119    }
120    public void RefreshAsync(Action<Exception> exceptionCallback) {
121      var call = new Func<Exception>(delegate() {
122        try {
123          Refresh();
124        }
125        catch (Exception ex) {
126          return ex;
127        }
128        return null;
129      });
130      call.BeginInvoke(delegate(IAsyncResult result) {
131        Exception ex = call.EndInvoke(result);
132        if (ex != null) exceptionCallback(ex);
133      }, null);
134    }
135    #endregion
136
137    #region Store
138    public static void Store(IHiveItem item, CancellationToken cancellationToken) {
139      if (item.Id == Guid.Empty) {
140        if (item is RefreshableJob) {
141          HiveClient.Instance.UploadJob((RefreshableJob)item, cancellationToken);
142        }
143        if (item is JobPermission) {
144          var hep = (JobPermission)item;
145          hep.GrantedUserId = ServiceLocator.Instance.CallHiveService((s) => s.GetUserIdByUsername(hep.GrantedUserName));
146          if (hep.GrantedUserId == Guid.Empty) {
147            throw new ArgumentException(string.Format("The user {0} was not found.", hep.GrantedUserName));
148          }
149          ServiceLocator.Instance.CallHiveService((s) => s.GrantPermission(hep.JobId, hep.GrantedUserId, hep.Permission));
150        }
151      } else {
152        if (item is Job)
153          ServiceLocator.Instance.CallHiveService(s => s.UpdateJob((Job)item));
154      }
155    }
156    public static void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item, CancellationToken cancellationToken) {
157      var call = new Func<Exception>(delegate() {
158        try {
159          Store(item, cancellationToken);
160        }
161        catch (Exception ex) {
162          return ex;
163        }
164        return null;
165      });
166      call.BeginInvoke(delegate(IAsyncResult result) {
167        Exception ex = call.EndInvoke(result);
168        if (ex != null) exceptionCallback(ex);
169      }, null);
170    }
171    #endregion
172
173    #region Delete
174    public static void Delete(IHiveItem item) {
175      if (item.Id == Guid.Empty)
176        return;
177
178      if (item is Job)
179        ServiceLocator.Instance.CallHiveService(s => s.DeleteJob(item.Id));
180      if (item is RefreshableJob)
181        ServiceLocator.Instance.CallHiveService(s => s.DeleteJob(item.Id));
182      if (item is JobPermission) {
183        var hep = (JobPermission)item;
184        ServiceLocator.Instance.CallHiveService(s => s.RevokePermission(hep.JobId, hep.GrantedUserId));
185      }
186      item.Id = Guid.Empty;
187    }
188    #endregion
189
190    #region Events
191    public event EventHandler Refreshing;
192    private void OnRefreshing() {
193      EventHandler handler = Refreshing;
194      if (handler != null) handler(this, EventArgs.Empty);
195    }
196    public event EventHandler Refreshed;
197    private void OnRefreshed() {
198      var handler = Refreshed;
199      if (handler != null) handler(this, EventArgs.Empty);
200    }
201    public event EventHandler HiveExperimentsChanged;
202    private void OnHiveExperimentsChanged() {
203      var handler = HiveExperimentsChanged;
204      if (handler != null) handler(this, EventArgs.Empty);
205    }
206    #endregion
207
208    public static void StartJob(Action<Exception> exceptionCallback, RefreshableJob refreshableJob, CancellationToken cancellationToken) {
209      HiveClient.StoreAsync(
210        new Action<Exception>((Exception ex) => {
211          refreshableJob.ExecutionState = ExecutionState.Prepared;
212          exceptionCallback(ex);
213        }), refreshableJob, cancellationToken);
214      refreshableJob.ExecutionState = ExecutionState.Started;
215    }
216
217    public static void PauseJob(RefreshableJob refreshableJob) {
218      ServiceLocator.Instance.CallHiveService(service => {
219        foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) {
220          if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
221            service.PauseTask(task.Task.Id);
222        }
223      });
224      refreshableJob.ExecutionState = ExecutionState.Paused;
225    }
226
227    public static void StopJob(RefreshableJob refreshableJob) {
228      ServiceLocator.Instance.CallHiveService(service => {
229        foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) {
230          if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
231            service.StopTask(task.Task.Id);
232        }
233      });
234      // execution state does not need to be set. it will be set to Stopped, when all jobs have been downloaded
235    }
236
237    #region Upload Job
238    private Semaphore taskUploadSemaphore = new Semaphore(Settings.Default.MaxParallelUploads, Settings.Default.MaxParallelUploads);
239    private static object jobCountLocker = new object();
240    private static object pluginLocker = new object();
241    private void UploadJob(RefreshableJob refreshableJob, CancellationToken cancellationToken) {
242      try {
243        refreshableJob.Progress = new Progress("Connecting to server...");
244        refreshableJob.IsProgressing = true;
245
246        IEnumerable<string> resourceNames = ToResourceNameList(refreshableJob.Job.ResourceNames);
247        var resourceIds = new List<Guid>();
248        foreach (var resourceName in resourceNames) {
249          Guid resourceId = ServiceLocator.Instance.CallHiveService((s) => s.GetResourceId(resourceName));
250          if (resourceId == Guid.Empty) {
251            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
252          }
253          resourceIds.Add(resourceId);
254        }
255
256        foreach (OptimizerHiveTask hiveJob in refreshableJob.HiveTasks.OfType<OptimizerHiveTask>()) {
257          hiveJob.SetIndexInParentOptimizerList(null);
258        }
259
260        // upload Job
261        refreshableJob.Progress.Status = "Uploading Job...";
262        refreshableJob.Job.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddJob(refreshableJob.Job));
263        bool isPrivileged = refreshableJob.Job.IsPrivileged;
264        refreshableJob.Job = ServiceLocator.Instance.CallHiveService((s) => s.GetJob(refreshableJob.Job.Id)); // update owner and permissions
265        refreshableJob.Job.IsPrivileged = isPrivileged;
266        cancellationToken.ThrowIfCancellationRequested();
267
268        int totalJobCount = refreshableJob.GetAllHiveTasks().Count();
269        int[] jobCount = new int[1]; // use a reference type (int-array) instead of value type (int) in order to pass the value via a delegate to task-parallel-library
270        cancellationToken.ThrowIfCancellationRequested();
271
272        // upload plugins
273        refreshableJob.Progress.Status = "Uploading plugins...";
274        this.OnlinePlugins = ServiceLocator.Instance.CallHiveService((s) => s.GetPlugins());
275        this.AlreadyUploadedPlugins = new List<Plugin>();
276        Plugin configFilePlugin = ServiceLocator.Instance.CallHiveService((s) => UploadConfigurationFile(s, onlinePlugins));
277        this.alreadyUploadedPlugins.Add(configFilePlugin);
278        cancellationToken.ThrowIfCancellationRequested();
279
280        if (refreshableJob.RefreshAutomatically) refreshableJob.StartResultPolling();
281
282        // upload tasks
283        refreshableJob.Progress.Status = "Uploading tasks...";
284
285        var tasks = new List<TS.Task>();
286        foreach (HiveTask hiveTask in refreshableJob.HiveTasks) {
287          tasks.Add(TS.Task.Factory.StartNew((hj) => {
288            UploadTaskWithChildren(refreshableJob.Progress, (HiveTask)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, refreshableJob.Job.IsPrivileged, cancellationToken);
289          }, hiveTask)
290          .ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted));
291        }
292        try {
293          TS.Task.WaitAll(tasks.ToArray());
294        }
295        catch (AggregateException ae) {
296          if (!ae.InnerExceptions.All(e => e is TaskCanceledException)) throw ae; // for some reason the WaitAll throws a AggregateException containg a TaskCanceledException. i don't know where it comes from, however the tasks all finish properly, so for now just ignore it
297        }
298        refreshableJob.Job.Modified = false;
299      }
300      finally {
301        refreshableJob.IsProgressing = false;
302      }
303    }
304
305    /// <summary>
306    /// Uploads the local configuration file as plugin
307    /// </summary>
308    private static Plugin UploadConfigurationFile(IHiveService service, List<Plugin> onlinePlugins) {
309      string exeFilePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, Settings.Default.HLBinaryName);
310      string configFileName = Path.GetFileName(ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath);
311      string configFilePath = ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath;
312      byte[] hash;
313
314      byte[] data = File.ReadAllBytes(configFilePath);
315      using (SHA1 sha1 = SHA1.Create()) {
316        hash = sha1.ComputeHash(data);
317      }
318
319      Plugin configPlugin = new Plugin() { Name = "Configuration", Version = new Version(), Hash = hash };
320      PluginData configFile = new PluginData() { FileName = configFileName, Data = data };
321
322      IEnumerable<Plugin> onlineConfig = onlinePlugins.Where(p => p.Hash.SequenceEqual(hash));
323
324      if (onlineConfig.Count() > 0) {
325        return onlineConfig.First();
326      } else {
327        configPlugin.Id = service.AddPlugin(configPlugin, new List<PluginData> { configFile });
328        return configPlugin;
329      }
330    }
331
332    /// <summary>
333    /// Uploads the given task and all its child-jobs while setting the proper parentJobId values for the childs
334    /// </summary>
335    /// <param name="parentHiveTask">shall be null if its the root task</param>
336    private void UploadTaskWithChildren(IProgress progress, HiveTask hiveTask, HiveTask parentHiveTask, IEnumerable<Guid> groups, int[] taskCount, int totalJobCount, Guid configPluginId, Guid jobId, ILog log, bool isPrivileged, CancellationToken cancellationToken) {
337      taskUploadSemaphore.WaitOne();
338      bool semaphoreReleased = false;
339      try {
340        cancellationToken.ThrowIfCancellationRequested();
341        lock (jobCountLocker) {
342          taskCount[0]++;
343        }
344        TaskData taskData;
345        List<IPluginDescription> plugins;
346
347        if (hiveTask.ItemTask.ComputeInParallel && (hiveTask.ItemTask.Item is Optimization.Experiment || hiveTask.ItemTask.Item is Optimization.BatchRun)) {
348          hiveTask.Task.IsParentTask = true;
349          hiveTask.Task.FinishWhenChildJobsFinished = true;
350          taskData = hiveTask.GetAsTaskData(true, out plugins);
351        } else {
352          hiveTask.Task.IsParentTask = false;
353          hiveTask.Task.FinishWhenChildJobsFinished = false;
354          taskData = hiveTask.GetAsTaskData(false, out plugins);
355        }
356        cancellationToken.ThrowIfCancellationRequested();
357
358        TryAndRepeat(() => {
359          if (!cancellationToken.IsCancellationRequested) {
360            lock (pluginLocker) {
361              ServiceLocator.Instance.CallHiveService((s) => hiveTask.Task.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins));
362            }
363          }
364        }, -1, "Failed to upload plugins");
365        cancellationToken.ThrowIfCancellationRequested();
366        hiveTask.Task.PluginsNeededIds.Add(configPluginId);
367        hiveTask.Task.JobId = jobId;
368        hiveTask.Task.IsPrivileged = isPrivileged;
369
370        log.LogMessage(string.Format("Uploading task ({0} kb, {1} objects)", taskData.Data.Count() / 1024, hiveTask.ItemTask.GetObjectGraphObjects().Count()));
371        TryAndRepeat(() => {
372          if (!cancellationToken.IsCancellationRequested) {
373            if (parentHiveTask != null) {
374              hiveTask.Task.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddChildTask(parentHiveTask.Task.Id, hiveTask.Task, taskData));
375            } else {
376              hiveTask.Task.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddTask(hiveTask.Task, taskData, groups.ToList()));
377            }
378          }
379        }, 50, "Failed to add task", log);
380        cancellationToken.ThrowIfCancellationRequested();
381
382        lock (jobCountLocker) {
383          progress.ProgressValue = (double)taskCount[0] / totalJobCount;
384          progress.Status = string.Format("Uploaded task ({0} of {1})", taskCount[0], totalJobCount);
385        }
386
387        var tasks = new List<TS.Task>();
388        foreach (HiveTask child in hiveTask.ChildHiveTasks) {
389          tasks.Add(TS.Task.Factory.StartNew((tuple) => {
390            var arguments = (Tuple<HiveTask, HiveTask>)tuple;
391            UploadTaskWithChildren(progress, arguments.Item1, arguments.Item2, groups, taskCount, totalJobCount, configPluginId, jobId, log, isPrivileged, cancellationToken);
392          }, new Tuple<HiveTask, HiveTask>(child, hiveTask))
393          .ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted));
394        }
395        taskUploadSemaphore.Release(); semaphoreReleased = true; // the semaphore has to be release before waitall!
396        try {
397          TS.Task.WaitAll(tasks.ToArray());
398        }
399        catch (AggregateException ae) {
400          if (!ae.InnerExceptions.All(e => e is TaskCanceledException)) throw ae; // for some reason the WaitAll throws a AggregateException containg a TaskCanceledException. i don't know where it comes from, however the tasks all finish properly, so for now just ignore it
401        }
402      }
403      finally {
404        if (!semaphoreReleased) taskUploadSemaphore.Release();
405      }
406    }
407    #endregion
408
409    #region Download Experiment
410    public static void LoadJob(RefreshableJob refreshableJob) {
411      var hiveExperiment = refreshableJob.Job;
412      refreshableJob.Progress = new Progress();
413
414      try {
415        refreshableJob.IsProgressing = true;
416        int totalJobCount = 0;
417        IEnumerable<LightweightTask> allTasks;
418
419        refreshableJob.Progress.Status = "Connecting to Server...";
420        // fetch all task objects to create the full tree of tree of HiveTask objects
421        refreshableJob.Progress.Status = "Downloading list of tasks...";
422        allTasks = ServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobTasks(hiveExperiment.Id));
423        totalJobCount = allTasks.Count();
424
425        TaskDownloader downloader = new TaskDownloader(allTasks.Select(x => x.Id));
426        downloader.StartAsync();
427
428        while (!downloader.IsFinished) {
429          refreshableJob.Progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount;
430          refreshableJob.Progress.Status = string.Format("Downloading/deserializing tasks... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount);
431          Thread.Sleep(500);
432
433          if (downloader.IsFaulted) {
434            throw downloader.Exception;
435          }
436        }
437        IDictionary<Guid, HiveTask> allHiveTasks = downloader.Results;
438
439        refreshableJob.HiveTasks = new ItemCollection<HiveTask>(allHiveTasks.Values.Where(x => !x.Task.ParentTaskId.HasValue));
440
441        if (refreshableJob.IsFinished()) {
442          refreshableJob.ExecutionState = Core.ExecutionState.Stopped;
443        } else {
444          refreshableJob.ExecutionState = Core.ExecutionState.Started;
445        }
446
447        // build child-task tree
448        foreach (HiveTask hiveTask in refreshableJob.HiveTasks) {
449          BuildHiveJobTree(hiveTask, allTasks, allHiveTasks);
450        }
451
452        refreshableJob.OnLoaded();
453      }
454      finally {
455        refreshableJob.IsProgressing = false;
456      }
457    }
458
459    private static void BuildHiveJobTree(HiveTask parentHiveJob, IEnumerable<LightweightTask> allJobs, IDictionary<Guid, HiveTask> allHiveJobs) {
460      IEnumerable<LightweightTask> childTasks = from job in allJobs
461                                                where job.ParentTaskId.HasValue && job.ParentTaskId.Value == parentHiveJob.Task.Id
462                                                orderby job.DateCreated ascending
463                                                select job;
464      foreach (LightweightTask task in childTasks) {
465        HiveTask childHiveTask = allHiveJobs[task.Id];
466        parentHiveJob.AddChildHiveJob(childHiveTask);
467        BuildHiveJobTree(childHiveTask, allJobs, allHiveJobs);
468      }
469    }
470    #endregion
471
472    /// <summary>
473    /// Converts a string which can contain Ids separated by ';' to a enumerable
474    /// </summary>
475    private static IEnumerable<string> ToResourceNameList(string resourceNames) {
476      if (!string.IsNullOrEmpty(resourceNames)) {
477        return resourceNames.Split(';');
478      } else {
479        return new List<string>();
480      }
481    }
482
483    public static ItemTask LoadItemJob(Guid jobId) {
484      TaskData taskData = ServiceLocator.Instance.CallHiveService(s => s.GetTaskData(jobId));
485      try {
486        return PersistenceUtil.Deserialize<ItemTask>(taskData.Data);
487      }
488      catch {
489        return null;
490      }
491    }
492
493    /// <summary>
494    /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
495    /// If repetitions is -1, it is repeated infinitely.
496    /// </summary>
497    public static void TryAndRepeat(Action action, int repetitions, string errorMessage, ILog log = null) {
498      while (true) {
499        try { action(); return; }
500        catch (Exception e) {
501          if (repetitions == 0) throw new HiveException(errorMessage, e);
502          if (log != null) log.LogMessage(string.Format("{0}: {1} - will try again!", errorMessage, e.ToString()));
503          repetitions--;
504        }
505      }
506    }
507
508    public static HiveItemCollection<JobPermission> GetJobPermissions(Guid jobId) {
509      return ServiceLocator.Instance.CallHiveService((service) => {
510        IEnumerable<JobPermission> jps = service.GetJobPermissions(jobId);
511        foreach (var hep in jps) {
512          hep.GrantedUserName = service.GetUsernameByUserId(hep.GrantedUserId);
513        }
514        return new HiveItemCollection<JobPermission>(jps);
515      });
516    }
517  }
518}
Note: See TracBrowser for help on using the repository browser.