Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.4/HiveClient.cs @ 6452

Last change on this file since 6452 was 6452, checked in by cneumuel, 13 years ago

#1233

  • renamed UptimeCalendar and Appointment to Downtime
  • added service methods to delete plugins and get plugin by hash
  • made reverted TransactionManager change, made it non-static and added interface
  • moved magic numbers to application settings
File size: 18.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Configuration;
25using System.IO;
26using System.Linq;
27using System.Security.Cryptography;
28using System.Threading;
29using System.Threading.Tasks;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
32using HeuristicLab.PluginInfrastructure;
33
34namespace HeuristicLab.Clients.Hive {
35  [Item("HiveClient", "Hive client.")]
36  public sealed class HiveClient : IContent {
37    private static HiveClient instance;
38    public static HiveClient Instance {
39      get {
40        if (instance == null) instance = new HiveClient();
41        return instance;
42      }
43    }
44
45    #region Properties
46    private ItemCollection<RefreshableHiveExperiment> hiveExperiments;
47    public ItemCollection<RefreshableHiveExperiment> HiveExperiments {
48      get { return hiveExperiments; }
49      set {
50        if (value != hiveExperiments) {
51          hiveExperiments = value;
52          OnHiveExperimentsChanged();
53        }
54      }
55    }
56
57    private List<Plugin> onlinePlugins;
58    public List<Plugin> OnlinePlugins {
59      get { return onlinePlugins; }
60      set { onlinePlugins = value; }
61    }
62
63    private List<Plugin> alreadyUploadedPlugins;
64    public List<Plugin> AlreadyUploadedPlugins {
65      get { return alreadyUploadedPlugins; }
66      set { alreadyUploadedPlugins = value; }
67    }
68    #endregion
69
70    public HiveClient() { }
71
72    #region Refresh
73    public void Refresh() {
74      OnRefreshing();
75
76      try {
77        hiveExperiments = new HiveItemCollection<RefreshableHiveExperiment>();
78        var he = ServiceLocator.Instance.CallHiveService<IEnumerable<HiveExperiment>>(s => s.GetHiveExperiments());
79        hiveExperiments.AddRange(he.Select(x => new RefreshableHiveExperiment(x)).OrderBy(x => x.HiveExperiment.Name));
80      }
81      catch {
82        hiveExperiments = null;
83        throw;
84      }
85      finally {
86        OnRefreshed();
87      }
88    }
89    public void RefreshAsync(Action<Exception> exceptionCallback) {
90      var call = new Func<Exception>(delegate() {
91        try {
92          Refresh();
93        }
94        catch (Exception ex) {
95          return ex;
96        }
97        return null;
98      });
99      call.BeginInvoke(delegate(IAsyncResult result) {
100        Exception ex = call.EndInvoke(result);
101        if (ex != null) exceptionCallback(ex);
102      }, null);
103    }
104    #endregion
105
106    #region Store
107    public static void Store(IHiveItem item, CancellationToken cancellationToken) {
108      if (item.Id == Guid.Empty) {
109        if (item is RefreshableHiveExperiment) {
110          HiveClient.Instance.UploadExperiment((RefreshableHiveExperiment)item, cancellationToken);
111        }
112      } else {
113        if (item is HiveExperiment)
114          ServiceLocator.Instance.CallHiveService(s => s.UpdateHiveExperiment((HiveExperiment)item));
115      }
116    }
117    public static void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item, CancellationToken cancellationToken) {
118      var call = new Func<Exception>(delegate() {
119        try {
120          Store(item, cancellationToken);
121        }
122        catch (Exception ex) {
123          return ex;
124        }
125        return null;
126      });
127      call.BeginInvoke(delegate(IAsyncResult result) {
128        Exception ex = call.EndInvoke(result);
129        if (ex != null) exceptionCallback(ex);
130      }, null);
131    }
132    #endregion
133
134    #region Delete
135    public static void Delete(IHiveItem item) {
136      if (item is HiveExperiment)
137        ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(item.Id));
138      if (item is RefreshableHiveExperiment)
139        ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(item.Id));
140      item.Id = Guid.Empty;
141    }
142    #endregion
143
144    #region Events
145    public event EventHandler Refreshing;
146    private void OnRefreshing() {
147      EventHandler handler = Refreshing;
148      if (handler != null) handler(this, EventArgs.Empty);
149    }
150    public event EventHandler Refreshed;
151    private void OnRefreshed() {
152      var handler = Refreshed;
153      if (handler != null) handler(this, EventArgs.Empty);
154    }
155    public event EventHandler HiveExperimentsChanged;
156    private void OnHiveExperimentsChanged() {
157      var handler = HiveExperimentsChanged;
158      if (handler != null) handler(this, EventArgs.Empty);
159    }
160    #endregion
161
162    public static void StartExperiment(Action<Exception> exceptionCallback, RefreshableHiveExperiment refreshableHiveExperiment, CancellationToken cancellationToken) {
163      HiveClient.StoreAsync(
164        new Action<Exception>((Exception ex) => {
165          refreshableHiveExperiment.HiveExperiment.ExecutionState = ExecutionState.Prepared;
166          exceptionCallback(ex);
167        }), refreshableHiveExperiment, cancellationToken);
168      refreshableHiveExperiment.HiveExperiment.ExecutionState = ExecutionState.Started;
169    }
170
171    public static void PauseExperiment(HiveExperiment hiveExperiment) {
172      ServiceLocator.Instance.CallHiveService(service => {
173        foreach (HiveJob job in hiveExperiment.GetAllHiveJobs()) {
174          if (job.Job.State != JobState.Finished && job.Job.State != JobState.Aborted && job.Job.State != JobState.Failed)
175            service.PauseJob(job.Job.Id);
176        }
177      });
178      hiveExperiment.ExecutionState = ExecutionState.Paused;
179    }
180
181    public static void StopExperiment(HiveExperiment hiveExperiment) {
182      ServiceLocator.Instance.CallHiveService(service => {
183        foreach (HiveJob job in hiveExperiment.GetAllHiveJobs()) {
184          if (job.Job.State != JobState.Finished && job.Job.State != JobState.Aborted && job.Job.State != JobState.Failed)
185            service.StopJob(job.Job.Id);
186        }
187      });
188      // execution state does not need to be set. it will be set to Stopped, when all jobs have been downloaded
189    }
190
191    #region Upload Experiment
192    private Semaphore jobUploadSemaphore = new Semaphore(4, 4); // todo: take magic number into config
193    private static object jobCountLocker = new object();
194    private static object pluginLocker = new object();
195    private void UploadExperiment(RefreshableHiveExperiment refreshableHiveExperiment, CancellationToken cancellationToken) {
196      try {
197        refreshableHiveExperiment.HiveExperiment.Progress = new Progress("Connecting to server...");
198        refreshableHiveExperiment.HiveExperiment.IsProgressing = true;
199
200        IEnumerable<string> resourceNames = ToResourceNameList(refreshableHiveExperiment.HiveExperiment.ResourceNames);
201        var resourceIds = new List<Guid>();
202        foreach (var resourceName in resourceNames) {
203          Guid resourceId = ServiceLocator.Instance.CallHiveService((s) => s.GetResourceId(resourceName));
204          if (resourceId == Guid.Empty) {
205            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
206          }
207          resourceIds.Add(resourceId);
208        }
209
210        foreach (OptimizerHiveJob hiveJob in refreshableHiveExperiment.HiveExperiment.HiveJobs.OfType<OptimizerHiveJob>()) {
211          hiveJob.SetIndexInParentOptimizerList(null);
212        }
213
214        // upload HiveExperiment
215        refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading HiveExperiment...";
216        refreshableHiveExperiment.HiveExperiment.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddHiveExperiment(refreshableHiveExperiment.HiveExperiment));
217        cancellationToken.ThrowIfCancellationRequested();
218
219        int totalJobCount = refreshableHiveExperiment.HiveExperiment.GetAllHiveJobs().Count();
220        int[] jobCount = new int[1]; // use a reference type (int-array) instead of value type (int) in order to pass the value via a delegate to task-parallel-library
221        cancellationToken.ThrowIfCancellationRequested();
222
223        // upload plugins
224        refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading plugins...";
225        this.OnlinePlugins = ServiceLocator.Instance.CallHiveService((s) => s.GetPlugins());
226        this.AlreadyUploadedPlugins = new List<Plugin>();
227        Plugin configFilePlugin = ServiceLocator.Instance.CallHiveService((s) => UploadConfigurationFile(s, onlinePlugins));
228        this.alreadyUploadedPlugins.Add(configFilePlugin);
229        cancellationToken.ThrowIfCancellationRequested();
230
231        if (refreshableHiveExperiment.RefreshAutomatically) refreshableHiveExperiment.StartResultPolling();
232
233        // upload jobs
234        refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading jobs...";
235
236        var tasks = new List<Task>();
237        foreach (HiveJob hiveJob in refreshableHiveExperiment.HiveExperiment.HiveJobs) {
238          tasks.Add(Task.Factory.StartNew((hj) => {
239            UploadJobWithChildren(refreshableHiveExperiment.HiveExperiment.Progress, (HiveJob)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableHiveExperiment.HiveExperiment.Id, refreshableHiveExperiment.Log, refreshableHiveExperiment.HiveExperiment.IsPrivileged, cancellationToken);
240          }, hiveJob)
241          .ContinueWith((x) => refreshableHiveExperiment.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted));
242        }
243        Task.WaitAll(tasks.ToArray());
244      }
245      finally {
246        refreshableHiveExperiment.HiveExperiment.IsProgressing = false;
247      }
248    }
249
250    /// <summary>
251    /// Uploads the local configuration file as plugin
252    /// </summary>
253    private static Plugin UploadConfigurationFile(IHiveService service, List<Plugin> onlinePlugins) {
254      string exeFilePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "HeuristicLab 3.3.exe");
255      string configFileName = Path.GetFileName(ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath);
256      string configFilePath = ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath;
257      byte[] hash;
258
259      byte[] data = File.ReadAllBytes(configFilePath);
260      using (SHA1 sha1 = SHA1.Create()) {
261        hash = sha1.ComputeHash(data);
262      }
263
264      Plugin configPlugin = new Plugin() { Name = "Configuration", Version = new Version(), Hash = hash };
265      PluginData configFile = new PluginData() { FileName = configFileName, Data = data };
266
267      IEnumerable<Plugin> onlineConfig = onlinePlugins.Where(p => p.Hash.SequenceEqual(hash));
268
269      if (onlineConfig.Count() > 0) {
270        return onlineConfig.First();
271      } else {
272        configPlugin.Id = service.AddPlugin(configPlugin, new List<PluginData> { configFile });
273        return configPlugin;
274      }
275    }
276
277    /// <summary>
278    /// Uploads the given job and all its child-jobs while setting the proper parentJobId values for the childs
279    /// </summary>
280    /// <param name="parentHiveJob">shall be null if its the root job</param>
281    private void UploadJobWithChildren(IProgress progress, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable<Guid> groups, int[] jobCount, int totalJobCount, Guid configPluginId, Guid hiveExperimentId, ILog log, bool isPrivileged, CancellationToken cancellationToken) {
282      jobUploadSemaphore.WaitOne();
283      try {
284        cancellationToken.ThrowIfCancellationRequested();
285        lock (jobCountLocker) {
286          jobCount[0]++;
287        }
288        JobData jobData;
289        List<IPluginDescription> plugins;
290
291        if (hiveJob.ItemJob.ComputeInParallel && (hiveJob.ItemJob.Item is Optimization.Experiment || hiveJob.ItemJob.Item is Optimization.BatchRun)) {
292          hiveJob.Job.IsParentJob = true;
293          hiveJob.Job.FinishWhenChildJobsFinished = true;
294          jobData = hiveJob.GetAsJobData(true, out plugins);
295        } else {
296          hiveJob.Job.IsParentJob = false;
297          hiveJob.Job.FinishWhenChildJobsFinished = false;
298          jobData = hiveJob.GetAsJobData(false, out plugins);
299        }
300        cancellationToken.ThrowIfCancellationRequested();
301
302        TryAndRepeat(() => {
303          if (!cancellationToken.IsCancellationRequested) {
304            lock (pluginLocker) {
305              ServiceLocator.Instance.CallHiveService((s) => hiveJob.Job.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins));
306            }
307          }
308        }, -1, "Failed to upload plugins");
309        cancellationToken.ThrowIfCancellationRequested();
310        hiveJob.Job.PluginsNeededIds.Add(configPluginId);
311        hiveJob.Job.HiveExperimentId = hiveExperimentId;
312        hiveJob.Job.IsPrivileged = isPrivileged;
313
314        log.LogMessage(string.Format("Uploading job ({0} kb, {1} objects)", jobData.Data.Count() / 1024, hiveJob.ItemJob.GetObjectGraphObjects().Count()));
315        TryAndRepeat(() => {
316          if (!cancellationToken.IsCancellationRequested) {
317            if (parentHiveJob != null) {
318              hiveJob.Job.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddChildJob(parentHiveJob.Job.Id, hiveJob.Job, jobData));
319            } else {
320              hiveJob.Job.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddJob(hiveJob.Job, jobData, groups.ToList()));
321            }
322          }
323        }, 50, "Failed to add job", log);
324        cancellationToken.ThrowIfCancellationRequested();
325
326        lock (jobCountLocker) {
327          progress.ProgressValue = (double)jobCount[0] / totalJobCount;
328          progress.Status = string.Format("Uploaded job ({0} of {1})", jobCount[0], totalJobCount);
329        }
330
331        var tasks = new List<Task>();
332        foreach (HiveJob child in hiveJob.ChildHiveJobs) {
333          tasks.Add(Task.Factory.StartNew((tuple) => {
334            var arguments = (Tuple<HiveJob, HiveJob>)tuple;
335            UploadJobWithChildren(progress, arguments.Item1, arguments.Item2, groups, jobCount, totalJobCount, configPluginId, hiveExperimentId, log, isPrivileged, cancellationToken);
336          }, new Tuple<HiveJob, HiveJob>(child, hiveJob))
337          .ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted));
338        }
339        Task.WaitAll(tasks.ToArray());
340      }
341      finally {
342        jobUploadSemaphore.Release();
343      }
344    }
345    #endregion
346
347    #region Download Experiment
348    public static void LoadExperiment(HiveExperiment hiveExperiment) {
349      hiveExperiment.Progress = new Progress();
350      try {
351        hiveExperiment.IsProgressing = true;
352        int totalJobCount = 0;
353        IEnumerable<LightweightJob> allJobs;
354
355        hiveExperiment.Progress.Status = "Connecting to Server...";
356        // fetch all Job objects to create the full tree of tree of HiveJob objects
357        hiveExperiment.Progress.Status = "Downloading list of jobs...";
358        allJobs = ServiceLocator.Instance.CallHiveService(s => s.GetLightweightExperimentJobs(hiveExperiment.Id));
359        totalJobCount = allJobs.Count();
360
361        HiveJobDownloader downloader = new HiveJobDownloader(allJobs.Select(x => x.Id));
362        downloader.StartAsync();
363
364        while (!downloader.IsFinished) {
365          hiveExperiment.Progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount;
366          hiveExperiment.Progress.Status = string.Format("Downloading/deserializing jobs... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount);
367          Thread.Sleep(500);
368
369          if (downloader.IsFaulted) {
370            throw downloader.Exception;
371          }
372        }
373        IDictionary<Guid, HiveJob> allHiveJobs = downloader.Results;
374
375        hiveExperiment.HiveJobs = new ItemCollection<HiveJob>(allHiveJobs.Values.Where(x => !x.Job.ParentJobId.HasValue));
376
377        if (hiveExperiment.IsFinished()) {
378          hiveExperiment.ExecutionState = Core.ExecutionState.Stopped;
379        } else {
380          hiveExperiment.ExecutionState = Core.ExecutionState.Started;
381        }
382
383        // build child-job tree
384        foreach (HiveJob hiveJob in hiveExperiment.HiveJobs) {
385          BuildHiveJobTree(hiveJob, allJobs, allHiveJobs);
386        }
387
388        hiveExperiment.OnLoaded();
389      }
390      finally {
391        hiveExperiment.IsProgressing = false;
392      }
393    }
394
395    private static void BuildHiveJobTree(HiveJob parentHiveJob, IEnumerable<LightweightJob> allJobs, IDictionary<Guid, HiveJob> allHiveJobs) {
396      IEnumerable<LightweightJob> childJobs = from job in allJobs
397                                              where job.ParentJobId.HasValue && job.ParentJobId.Value == parentHiveJob.Job.Id
398                                              orderby job.DateCreated ascending
399                                              select job;
400      foreach (LightweightJob job in childJobs) {
401        HiveJob childHiveJob = allHiveJobs[job.Id];
402        parentHiveJob.AddChildHiveJob(childHiveJob);
403        BuildHiveJobTree(childHiveJob, allJobs, allHiveJobs);
404      }
405    }
406    #endregion
407
408    /// <summary>
409    /// Converts a string which can contain Ids separated by ';' to a enumerable
410    /// </summary>
411    private static IEnumerable<string> ToResourceNameList(string resourceNames) {
412      if (!string.IsNullOrEmpty(resourceNames)) {
413        return resourceNames.Split(';');
414      } else {
415        return new List<string>();
416      }
417    }
418
419    public static ItemJob LoadItemJob(Guid jobId) {
420      JobData jobData = ServiceLocator.Instance.CallHiveService(s => s.GetJobData(jobId));
421      try {
422        return PersistenceUtil.Deserialize<ItemJob>(jobData.Data);
423      }
424      catch {
425        return null;
426      }
427    }
428
429    /// <summary>
430    /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
431    /// If repetitions is -1, it is repeated infinitely.
432    /// </summary>
433    public static void TryAndRepeat(Action action, int repetitions, string errorMessage, ILog log = null) {
434      while (true) {
435        try { action(); return; }
436        catch (Exception e) {
437          if (repetitions == 0) throw new HiveException(errorMessage, e);
438          if (log != null) log.LogMessage(string.Format("{0}: {1} - will try again!", errorMessage, e.ToString()));
439          repetitions--;
440        }
441      }
442    }
443  }
444}
Note: See TracBrowser for help on using the repository browser.