Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.4/HiveClient.cs @ 6419

Last change on this file since 6419 was 6419, checked in by cneumuel, 13 years ago

#1233

  • created events when statelog changed
  • fixed memory leak in hiveengine
  • extended timeout for long running transactions and database contexts (when jobdata is stored)
  • replaced random guids in database with sequential guids for performance reasons
  • minor fixes and cleanups
File size: 16.5 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Configuration;
25using System.IO;
26using System.Linq;
27using System.Security.Cryptography;
28using System.Threading;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using HeuristicLab.PluginInfrastructure;
32
33namespace HeuristicLab.Clients.Hive {
34  [Item("HiveClient", "Hive client.")]
35  public sealed class HiveClient : IContent {
36    private static HiveClient instance;
37    public static HiveClient Instance {
38      get {
39        if (instance == null) instance = new HiveClient();
40        return instance;
41      }
42    }
43
44    #region Properties
45    private ItemCollection<RefreshableHiveExperiment> hiveExperiments;
46    public ItemCollection<RefreshableHiveExperiment> HiveExperiments {
47      get { return hiveExperiments; }
48      set {
49        if (value != hiveExperiments) {
50          hiveExperiments = value;
51          OnHiveExperimentsChanged();
52        }
53      }
54    }
55
56    private List<Plugin> onlinePlugins;
57    public List<Plugin> OnlinePlugins {
58      get { return onlinePlugins; }
59      set { onlinePlugins = value; }
60    }
61
62    private List<Plugin> alreadyUploadedPlugins;
63    public List<Plugin> AlreadyUploadedPlugins {
64      get { return alreadyUploadedPlugins; }
65      set { alreadyUploadedPlugins = value; }
66    }
67    #endregion
68
69    public HiveClient() { }
70
71    #region Refresh
72    public void Refresh() {
73      OnRefreshing();
74
75      try {
76        hiveExperiments = new HiveItemCollection<RefreshableHiveExperiment>();
77        var he = ServiceLocator.Instance.CallHiveService<IEnumerable<HiveExperiment>>(s => s.GetHiveExperiments());
78        hiveExperiments.AddRange(he.Select(x => new RefreshableHiveExperiment(x)).OrderBy(x => x.HiveExperiment.Name));
79      }
80      catch {
81        hiveExperiments = null;
82        throw;
83      }
84      finally {
85        OnRefreshed();
86      }
87    }
88    public void RefreshAsync(Action<Exception> exceptionCallback) {
89      var call = new Func<Exception>(delegate() {
90        try {
91          Refresh();
92        }
93        catch (Exception ex) {
94          return ex;
95        }
96        return null;
97      });
98      call.BeginInvoke(delegate(IAsyncResult result) {
99        Exception ex = call.EndInvoke(result);
100        if (ex != null) exceptionCallback(ex);
101      }, null);
102    }
103    #endregion
104
105    #region Store
106    public static void Store(IHiveItem item) {
107      if (item.Id == Guid.Empty) {
108        if (item is RefreshableHiveExperiment) {
109          HiveClient.Instance.UploadExperiment((RefreshableHiveExperiment)item);
110        }
111      } else {
112        if (item is HiveExperiment)
113          ServiceLocator.Instance.CallHiveService(s => s.UpdateHiveExperiment((HiveExperiment)item));
114      }
115    }
116    public static void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item) {
117      var call = new Func<Exception>(delegate() {
118        try {
119          Store(item);
120        }
121        catch (Exception ex) {
122          return ex;
123        }
124        return null;
125      });
126      call.BeginInvoke(delegate(IAsyncResult result) {
127        Exception ex = call.EndInvoke(result);
128        if (ex != null) exceptionCallback(ex);
129      }, null);
130    }
131    #endregion
132
133    #region Delete
134    public static void Delete(IHiveItem item) {
135      if (item is HiveExperiment)
136        ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(item.Id));
137      if (item is RefreshableHiveExperiment)
138        ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(item.Id));
139      item.Id = Guid.Empty;
140    }
141    #endregion
142
143    #region Events
144    public event EventHandler Refreshing;
145    private void OnRefreshing() {
146      EventHandler handler = Refreshing;
147      if (handler != null) handler(this, EventArgs.Empty);
148    }
149    public event EventHandler Refreshed;
150    private void OnRefreshed() {
151      var handler = Refreshed;
152      if (handler != null) handler(this, EventArgs.Empty);
153    }
154    public event EventHandler HiveExperimentsChanged;
155    private void OnHiveExperimentsChanged() {
156      var handler = HiveExperimentsChanged;
157      if (handler != null) handler(this, EventArgs.Empty);
158    }
159    #endregion
160
161    public static void StartExperiment(Action<Exception> exceptionCallback, RefreshableHiveExperiment refreshableHiveExperiment) {
162      HiveClient.StoreAsync(
163        new Action<Exception>((Exception ex) => {
164          refreshableHiveExperiment.HiveExperiment.ExecutionState = ExecutionState.Prepared;
165          exceptionCallback(ex);
166        }), refreshableHiveExperiment);
167      refreshableHiveExperiment.HiveExperiment.ExecutionState = ExecutionState.Started;
168    }
169
170    public static void PauseExperiment(HiveExperiment hiveExperiment) {
171      ServiceLocator.Instance.CallHiveService(service => {
172        foreach (HiveJob job in hiveExperiment.GetAllHiveJobs()) {
173          if (job.Job.State != JobState.Finished && job.Job.State != JobState.Aborted && job.Job.State != JobState.Failed)
174            service.PauseJob(job.Job.Id);
175        }
176      });
177      hiveExperiment.ExecutionState = ExecutionState.Paused;
178    }
179
180    public static void StopExperiment(HiveExperiment hiveExperiment) {
181      ServiceLocator.Instance.CallHiveService(service => {
182        foreach (HiveJob job in hiveExperiment.GetAllHiveJobs()) {
183          if (job.Job.State != JobState.Finished && job.Job.State != JobState.Aborted && job.Job.State != JobState.Failed)
184            service.StopJob(job.Job.Id);
185        }
186      });
187      // execution state does not need to be set. it will be set to Stopped, when all jobs have been downloaded
188    }
189
190    #region Upload Experiment
191    private void UploadExperiment(RefreshableHiveExperiment refreshableHiveExperiment) {
192      try {
193        refreshableHiveExperiment.HiveExperiment.Progress = new Progress("Connecting to server...");
194        refreshableHiveExperiment.HiveExperiment.IsProgressing = true;
195        ServiceLocator.Instance.CallHiveService(service => {
196          IEnumerable<string> resourceNames = ToResourceNameList(refreshableHiveExperiment.HiveExperiment.ResourceNames);
197          var resourceIds = new List<Guid>();
198          foreach (var resourceName in resourceNames) {
199            Guid resourceId = service.GetResourceId(resourceName);
200            if (resourceId == Guid.Empty) {
201              throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
202            }
203            resourceIds.Add(resourceId);
204          }
205
206          foreach (OptimizerHiveJob hiveJob in refreshableHiveExperiment.HiveExperiment.HiveJobs.OfType<OptimizerHiveJob>()) {
207            hiveJob.SetIndexInParentOptimizerList(null);
208          }
209
210          // upload HiveExperiment
211          refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading HiveExperiment...";
212          refreshableHiveExperiment.HiveExperiment.Id = service.AddHiveExperiment(refreshableHiveExperiment.HiveExperiment);
213
214          int totalJobCount = refreshableHiveExperiment.HiveExperiment.GetAllHiveJobs().Count();
215          int jobCount = 0;
216
217          // upload plugins
218          refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading plugins...";
219          this.OnlinePlugins = service.GetPlugins();
220          this.AlreadyUploadedPlugins = new List<Plugin>();
221          Plugin configFilePlugin = UploadConfigurationFile(service);
222          this.alreadyUploadedPlugins.Add(configFilePlugin);
223
224          // upload jobs
225          refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading jobs...";
226
227          foreach (HiveJob hiveJob in refreshableHiveExperiment.HiveExperiment.HiveJobs) {
228            UploadJobWithChildren(refreshableHiveExperiment.HiveExperiment.Progress, service, hiveJob, null, resourceIds, ref jobCount, totalJobCount, configFilePlugin.Id, refreshableHiveExperiment.HiveExperiment.UseLocalPlugins, refreshableHiveExperiment.HiveExperiment.Id, refreshableHiveExperiment.Log, refreshableHiveExperiment.HiveExperiment.IsPrivileged);
229          }
230
231          if (refreshableHiveExperiment.RefreshAutomatically) refreshableHiveExperiment.StartResultPolling();
232        });
233      }
234      finally {
235        refreshableHiveExperiment.HiveExperiment.IsProgressing = false;
236      }
237    }
238
239    /// <summary>
240    /// Uploads the local configuration file as plugin
241    /// </summary>
242    private static Plugin UploadConfigurationFile(IHiveService service) {
243      string exeFilePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "HeuristicLab 3.3.exe");
244      string configFileName = Path.GetFileName(ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath);
245      string configFilePath = ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath;
246      byte[] hash;
247
248      byte[] data = File.ReadAllBytes(configFilePath);
249      using (SHA1 sha1 = SHA1.Create()) {
250        hash = sha1.ComputeHash(data);
251      }
252
253      Plugin configPlugin = new Plugin() { Name = "Configuration", IsLocal = true, Version = new Version(), Hash = hash };
254      PluginData configFile = new PluginData() { FileName = configFileName, Data = data };
255      configPlugin.Id = service.AddPlugin(configPlugin, new List<PluginData> { configFile });
256      return configPlugin;
257    }
258
259    /// <summary>
260    /// Uploads the given job and all its child-jobs while setting the proper parentJobId values for the childs
261    /// </summary>
262    /// <param name="service"></param>
263    /// <param name="hiveJob"></param>
264    /// <param name="parentHiveJob">shall be null if its the root job</param>
265    /// <param name="groups"></param>
266    private void UploadJobWithChildren(IProgress progress, IHiveService service, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable<Guid> groups, ref int jobCount, int totalJobCount, Guid configPluginId, bool useLocalPlugins, Guid hiveExperimentId, ILog log, bool isPrivileged) {
267      jobCount++;
268      progress.Status = string.Format("Serializing job {0} of {1}", jobCount, totalJobCount);
269      JobData jobData;
270      List<IPluginDescription> plugins;
271
272      if (hiveJob.ItemJob.ComputeInParallel && (hiveJob.ItemJob.Item is Optimization.Experiment || hiveJob.ItemJob.Item is Optimization.BatchRun)) {
273        hiveJob.Job.IsParentJob = true;
274        hiveJob.Job.FinishWhenChildJobsFinished = true;
275        jobData = hiveJob.GetAsJobData(true, out plugins);
276      } else {
277        hiveJob.Job.IsParentJob = false;
278        hiveJob.Job.FinishWhenChildJobsFinished = false;
279        jobData = hiveJob.GetAsJobData(false, out plugins);
280      }
281
282      TryAndRepeat(() => {
283        hiveJob.Job.PluginsNeededIds = PluginUtil.GetPluginDependencies(service, this.onlinePlugins, this.alreadyUploadedPlugins, plugins, useLocalPlugins);
284      }, -1, "Failed to upload plugins");
285      hiveJob.Job.PluginsNeededIds.Add(configPluginId);
286      hiveJob.Job.HiveExperimentId = hiveExperimentId;
287      hiveJob.Job.IsPrivileged = isPrivileged;
288
289      progress.Status = string.Format("Uploading job {0} of {1} ({2} kb, {3} objects)", jobCount, totalJobCount, jobData.Data.Count() / 1024, hiveJob.ItemJob.GetObjectGraphObjects().Count());
290      progress.ProgressValue = (double)jobCount / totalJobCount;
291
292      log.LogMessage(progress.Status);
293      TryAndRepeat(() => {
294        if (parentHiveJob != null) {
295          hiveJob.Job.Id = service.AddChildJob(parentHiveJob.Job.Id, hiveJob.Job, jobData);
296        } else {
297          hiveJob.Job.Id = service.AddJob(hiveJob.Job, jobData, groups.ToList());
298        }
299      }, -1, "Failed to add job", log);
300
301      foreach (HiveJob child in hiveJob.ChildHiveJobs) {
302        UploadJobWithChildren(progress, service, child, hiveJob, groups, ref jobCount, totalJobCount, configPluginId, useLocalPlugins, hiveExperimentId, log, isPrivileged);
303      }
304    }
305    #endregion
306
307    #region Download Experiment
308    public static void LoadExperiment(HiveExperiment hiveExperiment) {
309      hiveExperiment.Progress = new Progress();
310      try {
311        hiveExperiment.IsProgressing = true;
312        int totalJobCount = 0;
313        IEnumerable<LightweightJob> allJobs;
314
315        hiveExperiment.Progress.Status = "Connecting to Server...";
316        // fetch all Job objects to create the full tree of tree of HiveJob objects
317        hiveExperiment.Progress.Status = "Downloading list of jobs...";
318        allJobs = ServiceLocator.Instance.CallHiveService(s => s.GetLightweightExperimentJobs(hiveExperiment.Id));
319        totalJobCount = allJobs.Count();
320
321        HiveJobDownloader downloader = new HiveJobDownloader(allJobs.Select(x => x.Id));
322        downloader.StartAsync();
323
324        while (!downloader.IsFinished) {
325          hiveExperiment.Progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount;
326          hiveExperiment.Progress.Status = string.Format("Downloading/deserializing jobs... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount);
327          Thread.Sleep(500);
328
329          if (downloader.IsFaulted) {
330            throw downloader.Exception;
331          }
332        }
333        IDictionary<Guid, HiveJob> allHiveJobs = downloader.Results;
334
335        hiveExperiment.HiveJobs = new ItemCollection<HiveJob>(allHiveJobs.Values.Where(x => !x.Job.ParentJobId.HasValue));
336
337        if (hiveExperiment.IsFinished()) {
338          hiveExperiment.ExecutionState = Core.ExecutionState.Stopped;
339        } else {
340          hiveExperiment.ExecutionState = Core.ExecutionState.Started;
341        }
342
343        // build child-job tree
344        foreach (HiveJob hiveJob in hiveExperiment.HiveJobs) {
345          BuildHiveJobTree(hiveJob, allJobs, allHiveJobs);
346        }
347
348        hiveExperiment.OnLoaded();
349      }
350      finally {
351        hiveExperiment.IsProgressing = false;
352      }
353    }
354
355    private static void BuildHiveJobTree(HiveJob parentHiveJob, IEnumerable<LightweightJob> allJobs, IDictionary<Guid, HiveJob> allHiveJobs) {
356      IEnumerable<LightweightJob> childJobs = from job in allJobs
357                                              where job.ParentJobId.HasValue && job.ParentJobId.Value == parentHiveJob.Job.Id
358                                              orderby job.DateCreated ascending
359                                              select job;
360      foreach (LightweightJob job in childJobs) {
361        HiveJob childHiveJob = allHiveJobs[job.Id];
362        parentHiveJob.AddChildHiveJob(childHiveJob);
363        BuildHiveJobTree(childHiveJob, allJobs, allHiveJobs);
364      }
365    }
366    #endregion
367
368    /// <summary>
369    /// Converts a string which can contain Ids separated by ';' to a enumerable
370    /// </summary>
371    private static IEnumerable<string> ToResourceNameList(string resourceNames) {
372      if (!string.IsNullOrEmpty(resourceNames)) {
373        return resourceNames.Split(';');
374      } else {
375        return new List<string>();
376      }
377    }
378
379    public static ItemJob LoadItemJob(Guid jobId) {
380      JobData jobData = ServiceLocator.Instance.CallHiveService(s => s.GetJobData(jobId));
381      try {
382        return PersistenceUtil.Deserialize<ItemJob>(jobData.Data);
383      }
384      catch {
385        return null;
386      }
387    }
388
389    /// <summary>
390    /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
391    /// If repetitions is -1, it is repeated infinitely.
392    /// </summary>
393    public static void TryAndRepeat(Action action, int repetitions, string errorMessage, ILog log = null) {
394      while (true) {
395        try { action(); return; }
396        catch (Exception e) {
397          if (repetitions == 0) throw new HiveException(errorMessage, e);
398          if (log != null) log.LogMessage(string.Format("{0}: {1} - will try again!", errorMessage, e.ToString()));
399          repetitions--;
400        }
401      }
402    }
403  }
404}
Note: See TracBrowser for help on using the repository browser.