Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.4/HiveClient.cs @ 6444

Last change on this file since 6444 was 6444, checked in by cneumuel, 13 years ago

#1233

  • stability improvements for HiveExperiment and HiveEngine
  • parallelized upload of jobs
  • enabled cancellation of job upload
  • reduced the amount of double-assignment of jobs by an additional check in HeartbeatManager
  • tried to tackle the amount of deadlocks by automatically rerunning transactions
  • some fixes
File size: 18.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Configuration;
25using System.IO;
26using System.Linq;
27using System.Security.Cryptography;
28using System.Threading;
29using System.Threading.Tasks;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
32using HeuristicLab.PluginInfrastructure;
33
34namespace HeuristicLab.Clients.Hive {
35  [Item("HiveClient", "Hive client.")]
36  public sealed class HiveClient : IContent {
37    private static HiveClient instance;
38    public static HiveClient Instance {
39      get {
40        if (instance == null) instance = new HiveClient();
41        return instance;
42      }
43    }
44
45    #region Properties
46    private ItemCollection<RefreshableHiveExperiment> hiveExperiments;
47    public ItemCollection<RefreshableHiveExperiment> HiveExperiments {
48      get { return hiveExperiments; }
49      set {
50        if (value != hiveExperiments) {
51          hiveExperiments = value;
52          OnHiveExperimentsChanged();
53        }
54      }
55    }
56
57    private List<Plugin> onlinePlugins;
58    public List<Plugin> OnlinePlugins {
59      get { return onlinePlugins; }
60      set { onlinePlugins = value; }
61    }
62
63    private List<Plugin> alreadyUploadedPlugins;
64    public List<Plugin> AlreadyUploadedPlugins {
65      get { return alreadyUploadedPlugins; }
66      set { alreadyUploadedPlugins = value; }
67    }
68    #endregion
69
70    public HiveClient() { }
71
72    #region Refresh
73    public void Refresh() {
74      OnRefreshing();
75
76      try {
77        hiveExperiments = new HiveItemCollection<RefreshableHiveExperiment>();
78        var he = ServiceLocator.Instance.CallHiveService<IEnumerable<HiveExperiment>>(s => s.GetHiveExperiments());
79        hiveExperiments.AddRange(he.Select(x => new RefreshableHiveExperiment(x)).OrderBy(x => x.HiveExperiment.Name));
80      }
81      catch {
82        hiveExperiments = null;
83        throw;
84      }
85      finally {
86        OnRefreshed();
87      }
88    }
89    public void RefreshAsync(Action<Exception> exceptionCallback) {
90      var call = new Func<Exception>(delegate() {
91        try {
92          Refresh();
93        }
94        catch (Exception ex) {
95          return ex;
96        }
97        return null;
98      });
99      call.BeginInvoke(delegate(IAsyncResult result) {
100        Exception ex = call.EndInvoke(result);
101        if (ex != null) exceptionCallback(ex);
102      }, null);
103    }
104    #endregion
105
106    #region Store
107    public static void Store(IHiveItem item, CancellationToken cancellationToken) {
108      if (item.Id == Guid.Empty) {
109        if (item is RefreshableHiveExperiment) {
110          HiveClient.Instance.UploadExperiment((RefreshableHiveExperiment)item, cancellationToken);
111        }
112      } else {
113        if (item is HiveExperiment)
114          ServiceLocator.Instance.CallHiveService(s => s.UpdateHiveExperiment((HiveExperiment)item));
115      }
116    }
117    public static void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item, CancellationToken cancellationToken) {
118      var call = new Func<Exception>(delegate() {
119        try {
120          Store(item, cancellationToken);
121        }
122        catch (Exception ex) {
123          return ex;
124        }
125        return null;
126      });
127      call.BeginInvoke(delegate(IAsyncResult result) {
128        Exception ex = call.EndInvoke(result);
129        if (ex != null) exceptionCallback(ex);
130      }, null);
131    }
132    #endregion
133
134    #region Delete
135    public static void Delete(IHiveItem item) {
136      if (item is HiveExperiment)
137        ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(item.Id));
138      if (item is RefreshableHiveExperiment)
139        ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(item.Id));
140      item.Id = Guid.Empty;
141    }
142    #endregion
143
144    #region Events
145    public event EventHandler Refreshing;
146    private void OnRefreshing() {
147      EventHandler handler = Refreshing;
148      if (handler != null) handler(this, EventArgs.Empty);
149    }
150    public event EventHandler Refreshed;
151    private void OnRefreshed() {
152      var handler = Refreshed;
153      if (handler != null) handler(this, EventArgs.Empty);
154    }
155    public event EventHandler HiveExperimentsChanged;
156    private void OnHiveExperimentsChanged() {
157      var handler = HiveExperimentsChanged;
158      if (handler != null) handler(this, EventArgs.Empty);
159    }
160    #endregion
161
162    public static void StartExperiment(Action<Exception> exceptionCallback, RefreshableHiveExperiment refreshableHiveExperiment, CancellationToken cancellationToken) {
163      HiveClient.StoreAsync(
164        new Action<Exception>((Exception ex) => {
165          refreshableHiveExperiment.HiveExperiment.ExecutionState = ExecutionState.Prepared;
166          exceptionCallback(ex);
167        }), refreshableHiveExperiment, cancellationToken);
168      refreshableHiveExperiment.HiveExperiment.ExecutionState = ExecutionState.Started;
169    }
170
171    public static void PauseExperiment(HiveExperiment hiveExperiment) {
172      ServiceLocator.Instance.CallHiveService(service => {
173        foreach (HiveJob job in hiveExperiment.GetAllHiveJobs()) {
174          if (job.Job.State != JobState.Finished && job.Job.State != JobState.Aborted && job.Job.State != JobState.Failed)
175            service.PauseJob(job.Job.Id);
176        }
177      });
178      hiveExperiment.ExecutionState = ExecutionState.Paused;
179    }
180
181    public static void StopExperiment(HiveExperiment hiveExperiment) {
182      ServiceLocator.Instance.CallHiveService(service => {
183        foreach (HiveJob job in hiveExperiment.GetAllHiveJobs()) {
184          if (job.Job.State != JobState.Finished && job.Job.State != JobState.Aborted && job.Job.State != JobState.Failed)
185            service.StopJob(job.Job.Id);
186        }
187      });
188      // execution state does not need to be set. it will be set to Stopped, when all jobs have been downloaded
189    }
190
191    #region Upload Experiment
192    private Semaphore jobUploadSemaphore = new Semaphore(4, 4); // todo: take magic number into config
193    private static object jobCountLocker = new object();
194    private void UploadExperiment(RefreshableHiveExperiment refreshableHiveExperiment, CancellationToken cancellationToken) {
195      try {
196        refreshableHiveExperiment.HiveExperiment.Progress = new Progress("Connecting to server...");
197        refreshableHiveExperiment.HiveExperiment.IsProgressing = true;
198
199        IEnumerable<string> resourceNames = ToResourceNameList(refreshableHiveExperiment.HiveExperiment.ResourceNames);
200        var resourceIds = new List<Guid>();
201        foreach (var resourceName in resourceNames) {
202          Guid resourceId = ServiceLocator.Instance.CallHiveService((s) => s.GetResourceId(resourceName));
203          if (resourceId == Guid.Empty) {
204            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
205          }
206          resourceIds.Add(resourceId);
207        }
208
209        foreach (OptimizerHiveJob hiveJob in refreshableHiveExperiment.HiveExperiment.HiveJobs.OfType<OptimizerHiveJob>()) {
210          hiveJob.SetIndexInParentOptimizerList(null);
211        }
212
213        // upload HiveExperiment
214        refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading HiveExperiment...";
215        refreshableHiveExperiment.HiveExperiment.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddHiveExperiment(refreshableHiveExperiment.HiveExperiment));
216        cancellationToken.ThrowIfCancellationRequested();
217
218        int totalJobCount = refreshableHiveExperiment.HiveExperiment.GetAllHiveJobs().Count();
219        int[] jobCount = new int[1]; // use a reference type (int-array) instead of value type (int) in order to pass the value via a delegate to task-parallel-library
220        cancellationToken.ThrowIfCancellationRequested();
221
222        // upload plugins
223        refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading plugins...";
224        this.OnlinePlugins = ServiceLocator.Instance.CallHiveService((s) => s.GetPlugins());
225        this.AlreadyUploadedPlugins = new List<Plugin>();
226        Plugin configFilePlugin = ServiceLocator.Instance.CallHiveService((s) => UploadConfigurationFile(s, onlinePlugins));
227        this.alreadyUploadedPlugins.Add(configFilePlugin);
228        cancellationToken.ThrowIfCancellationRequested();
229
230        if (refreshableHiveExperiment.RefreshAutomatically) refreshableHiveExperiment.StartResultPolling();
231
232        // upload jobs
233        refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading jobs...";
234
235        var tasks = new List<Task>();
236        foreach (HiveJob hiveJob in refreshableHiveExperiment.HiveExperiment.HiveJobs) {
237          tasks.Add(Task.Factory.StartNew((hj) => {
238            UploadJobWithChildren(refreshableHiveExperiment.HiveExperiment.Progress, (HiveJob)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableHiveExperiment.HiveExperiment.Id, refreshableHiveExperiment.Log, refreshableHiveExperiment.HiveExperiment.IsPrivileged, cancellationToken);
239          }, hiveJob)
240          .ContinueWith((x) => refreshableHiveExperiment.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted));
241        }
242        Task.WaitAll(tasks.ToArray());
243      }
244      finally {
245        refreshableHiveExperiment.HiveExperiment.IsProgressing = false;
246      }
247    }
248
249    /// <summary>
250    /// Uploads the local configuration file as plugin
251    /// </summary>
252    private static Plugin UploadConfigurationFile(IHiveService service, List<Plugin> onlinePlugins) {
253      string exeFilePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "HeuristicLab 3.3.exe");
254      string configFileName = Path.GetFileName(ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath);
255      string configFilePath = ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath;
256      byte[] hash;
257
258      byte[] data = File.ReadAllBytes(configFilePath);
259      using (SHA1 sha1 = SHA1.Create()) {
260        hash = sha1.ComputeHash(data);
261      }
262
263      Plugin configPlugin = new Plugin() { Name = "Configuration", Version = new Version(), Hash = hash };
264      PluginData configFile = new PluginData() { FileName = configFileName, Data = data };
265
266      IEnumerable<Plugin> onlineConfig = onlinePlugins.Where(p => p.Hash.SequenceEqual(hash));
267
268      if (onlineConfig.Count() > 0) {
269        return onlineConfig.First();
270      } else {
271        configPlugin.Id = service.AddPlugin(configPlugin, new List<PluginData> { configFile });
272        return configPlugin;
273      }
274    }
275
276    /// <summary>
277    /// Uploads the given job and all its child-jobs while setting the proper parentJobId values for the childs
278    /// </summary>
279    /// <param name="parentHiveJob">shall be null if its the root job</param>
280    private void UploadJobWithChildren(IProgress progress, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable<Guid> groups, int[] jobCount, int totalJobCount, Guid configPluginId, Guid hiveExperimentId, ILog log, bool isPrivileged, CancellationToken cancellationToken) {
281      jobUploadSemaphore.WaitOne();
282      try {
283        cancellationToken.ThrowIfCancellationRequested();
284        lock (jobCountLocker) {
285          jobCount[0]++;
286        }
287        JobData jobData = null;
288        List<IPluginDescription> plugins = null;
289
290        TryAndRepeat(() => { // workaround for persistence bug (thread-safe access to bitmaps) - remove later
291          if (hiveJob.ItemJob.ComputeInParallel && (hiveJob.ItemJob.Item is Optimization.Experiment || hiveJob.ItemJob.Item is Optimization.BatchRun)) {
292            hiveJob.Job.IsParentJob = true;
293            hiveJob.Job.FinishWhenChildJobsFinished = true;
294            jobData = hiveJob.GetAsJobData(true, out plugins);
295          } else {
296            hiveJob.Job.IsParentJob = false;
297            hiveJob.Job.FinishWhenChildJobsFinished = false;
298            jobData = hiveJob.GetAsJobData(false, out plugins);
299          }
300        }, 30, "Could not serialize job");
301        cancellationToken.ThrowIfCancellationRequested();
302
303        TryAndRepeat(() => {
304          if (!cancellationToken.IsCancellationRequested) {
305            ServiceLocator.Instance.CallHiveService((s) => hiveJob.Job.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins));
306          }
307        }, -1, "Failed to upload plugins");
308        cancellationToken.ThrowIfCancellationRequested();
309        hiveJob.Job.PluginsNeededIds.Add(configPluginId);
310        hiveJob.Job.HiveExperimentId = hiveExperimentId;
311        hiveJob.Job.IsPrivileged = isPrivileged;
312
313        log.LogMessage(string.Format("Uploading job ({0} kb, {1} objects)", jobData.Data.Count() / 1024, hiveJob.ItemJob.GetObjectGraphObjects().Count()));
314        TryAndRepeat(() => {
315          if (!cancellationToken.IsCancellationRequested) {
316            if (parentHiveJob != null) {
317              hiveJob.Job.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddChildJob(parentHiveJob.Job.Id, hiveJob.Job, jobData));
318            } else {
319              hiveJob.Job.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddJob(hiveJob.Job, jobData, groups.ToList()));
320            }
321          }
322        }, 50, "Failed to add job", log);
323        cancellationToken.ThrowIfCancellationRequested();
324
325        lock (jobCountLocker) {
326          progress.ProgressValue = (double)jobCount[0] / totalJobCount;
327          progress.Status = string.Format("Uploaded job ({0} of {1})", jobCount[0], totalJobCount);
328        }
329
330        var tasks = new List<Task>();
331        foreach (HiveJob child in hiveJob.ChildHiveJobs) {
332          tasks.Add(Task.Factory.StartNew((tuple) => {
333            var arguments = (Tuple<HiveJob, HiveJob>)tuple;
334            UploadJobWithChildren(progress, arguments.Item1, arguments.Item2, groups, jobCount, totalJobCount, configPluginId, hiveExperimentId, log, isPrivileged, cancellationToken);
335          }, new Tuple<HiveJob, HiveJob>(child, hiveJob ))
336          .ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted));
337        }
338        Task.WaitAll(tasks.ToArray());
339      }
340      finally {
341        jobUploadSemaphore.Release();
342      }
343    }
344    #endregion
345
346    #region Download Experiment
347    public static void LoadExperiment(HiveExperiment hiveExperiment) {
348      hiveExperiment.Progress = new Progress();
349      try {
350        hiveExperiment.IsProgressing = true;
351        int totalJobCount = 0;
352        IEnumerable<LightweightJob> allJobs;
353
354        hiveExperiment.Progress.Status = "Connecting to Server...";
355        // fetch all Job objects to create the full tree of tree of HiveJob objects
356        hiveExperiment.Progress.Status = "Downloading list of jobs...";
357        allJobs = ServiceLocator.Instance.CallHiveService(s => s.GetLightweightExperimentJobs(hiveExperiment.Id));
358        totalJobCount = allJobs.Count();
359
360        HiveJobDownloader downloader = new HiveJobDownloader(allJobs.Select(x => x.Id));
361        downloader.StartAsync();
362
363        while (!downloader.IsFinished) {
364          hiveExperiment.Progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount;
365          hiveExperiment.Progress.Status = string.Format("Downloading/deserializing jobs... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount);
366          Thread.Sleep(500);
367
368          if (downloader.IsFaulted) {
369            throw downloader.Exception;
370          }
371        }
372        IDictionary<Guid, HiveJob> allHiveJobs = downloader.Results;
373
374        hiveExperiment.HiveJobs = new ItemCollection<HiveJob>(allHiveJobs.Values.Where(x => !x.Job.ParentJobId.HasValue));
375
376        if (hiveExperiment.IsFinished()) {
377          hiveExperiment.ExecutionState = Core.ExecutionState.Stopped;
378        } else {
379          hiveExperiment.ExecutionState = Core.ExecutionState.Started;
380        }
381
382        // build child-job tree
383        foreach (HiveJob hiveJob in hiveExperiment.HiveJobs) {
384          BuildHiveJobTree(hiveJob, allJobs, allHiveJobs);
385        }
386
387        hiveExperiment.OnLoaded();
388      }
389      finally {
390        hiveExperiment.IsProgressing = false;
391      }
392    }
393
394    private static void BuildHiveJobTree(HiveJob parentHiveJob, IEnumerable<LightweightJob> allJobs, IDictionary<Guid, HiveJob> allHiveJobs) {
395      IEnumerable<LightweightJob> childJobs = from job in allJobs
396                                              where job.ParentJobId.HasValue && job.ParentJobId.Value == parentHiveJob.Job.Id
397                                              orderby job.DateCreated ascending
398                                              select job;
399      foreach (LightweightJob job in childJobs) {
400        HiveJob childHiveJob = allHiveJobs[job.Id];
401        parentHiveJob.AddChildHiveJob(childHiveJob);
402        BuildHiveJobTree(childHiveJob, allJobs, allHiveJobs);
403      }
404    }
405    #endregion
406
407    /// <summary>
408    /// Converts a string which can contain Ids separated by ';' to a enumerable
409    /// </summary>
410    private static IEnumerable<string> ToResourceNameList(string resourceNames) {
411      if (!string.IsNullOrEmpty(resourceNames)) {
412        return resourceNames.Split(';');
413      } else {
414        return new List<string>();
415      }
416    }
417
418    public static ItemJob LoadItemJob(Guid jobId) {
419      JobData jobData = ServiceLocator.Instance.CallHiveService(s => s.GetJobData(jobId));
420      try {
421        return PersistenceUtil.Deserialize<ItemJob>(jobData.Data);
422      }
423      catch {
424        return null;
425      }
426    }
427
428    /// <summary>
429    /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
430    /// If repetitions is -1, it is repeated infinitely.
431    /// </summary>
432    public static void TryAndRepeat(Action action, int repetitions, string errorMessage, ILog log = null) {
433      while (true) {
434        try { action(); return; }
435        catch (Exception e) {
436          if (repetitions == 0) throw new HiveException(errorMessage, e);
437          if (log != null) log.LogMessage(string.Format("{0}: {1} - will try again!", errorMessage, e.ToString()));
438          repetitions--;
439        }
440      }
441    }
442  }
443}
Note: See TracBrowser for help on using the repository browser.