Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Grid.HiveBridge/3.2/HiveGridServerWrapper.cs @ 2575

Last change on this file since 2575 was 2114, checked in by gkronber, 15 years ago
  • fixed a bug in determination of JobState in HiveGridServerWrapper
  • fixed a bug in discrimination between snap shot results and end results in HiveGridServerWrapper

#679 (HiveGridWrapper treats snap shot results the same way as end results)

File size: 7.4 KB
RevLine 
[2058]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Threading;
26using HeuristicLab.Hive.Contracts.Interfaces;
27using HeuristicLab.Hive.Contracts;
28using System.IO;
29using System.IO.Compression;
30using System.Xml;
31using HeuristicLab.Core;
32using HeuristicLab.PluginInfrastructure;
33using HeuristicLab.Hive.Contracts.BusinessObjects;
[2073]34using System.ServiceModel;
35using HeuristicLab.Tracing;
[2058]36
37namespace HeuristicLab.Grid.HiveBridge {
38  public class HiveGridServerWrapper : IGridServer {
[2073]39    private const int MAX_CONNECTION_RETRIES = 10;
40    private const int RETRY_TIMEOUT_SEC = 60;
[2058]41    private string address;
[2073]42    private IExecutionEngineFacade executionEngine;
43    private object connectionLock = new object();
[2058]44
45    public HiveGridServerWrapper(string address) {
46      this.address = address;
47    }
48
49    public JobState JobState(Guid guid) {
[2099]50      ResponseObject<SerializedJobResult> response = SavelyExecute(() => executionEngine.GetLastSerializedResult(guid, false));
[2114]51      if (response != null) {
[2073]52        return HeuristicLab.Grid.JobState.Busy;
[2058]53      } else return HeuristicLab.Grid.JobState.Unknown;
54    }
55
56    public Guid BeginExecuteEngine(byte[] engine) {
57      var jobObj = CreateJobObj(engine);
58
[2073]59      ResponseObject<HeuristicLab.Hive.Contracts.BusinessObjects.Job> res = SavelyExecute(() => executionEngine.AddJob(jobObj));
60      return res == null ? Guid.Empty : res.Obj.Id;
[2058]61    }
62
63    public byte[] TryEndExecuteEngine(Guid guid) {
[2099]64      ResponseObject<SerializedJobResult> response = SavelyExecute(() => executionEngine.GetLastSerializedResult(guid, false));
[2073]65      if (response != null &&
66        response.Success && response.Obj != null) {
[2099]67        HeuristicLab.Hive.Engine.Job restoredJob = (HeuristicLab.Hive.Engine.Job)PersistenceManager.RestoreFromGZip(response.Obj.SerializedJobResultData);
[2078]68        // only return the engine when it wasn't canceled (result is only a snapshot)
[2114]69        if (restoredJob.Progress == 1.0) {
[2078]70          // Serialize the engine
71          MemoryStream memStream = new MemoryStream();
72          GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
73          XmlDocument document = PersistenceManager.CreateXmlDocument();
74          Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
75          XmlNode rootNode = document.CreateElement("Root");
76          document.AppendChild(rootNode);
77          rootNode.AppendChild(PersistenceManager.Persist(restoredJob.Engine, document, dictionary));
78          document.Save(stream);
79          stream.Close();
80          return memStream.ToArray();
81        }
82      }
83
84      return null;
[2058]85    }
86
[2092]87    private HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob CreateJobObj(byte[] serializedEngine) {
[2058]88      HeuristicLab.Hive.Contracts.BusinessObjects.Job jobObj = new HeuristicLab.Hive.Contracts.BusinessObjects.Job();
89
[2064]90      List<HivePluginInfo> requiredPlugins = new List<HivePluginInfo>();
91      IEngine engine = RestoreEngine(serializedEngine, requiredPlugins);
92
93      HeuristicLab.Hive.Engine.Job job = new HeuristicLab.Hive.Engine.Job();
94      job.Engine.OperatorGraph.AddOperator(engine.OperatorGraph.InitialOperator);
95      job.Engine.OperatorGraph.InitialOperator = engine.OperatorGraph.InitialOperator;
[2073]96      job.Engine.Reset();
[2064]97
98      // Serialize the job
99      MemoryStream memStream = new MemoryStream();
100      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
101      XmlDocument document = PersistenceManager.CreateXmlDocument();
102      Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
103      XmlNode rootNode = document.CreateElement("Root");
104      document.AppendChild(rootNode);
105      rootNode.AppendChild(PersistenceManager.Persist(job, document, dictionary));
106      document.Save(stream);
107      stream.Close();
108
[2092]109      SerializedJob computableJob =
110        new SerializedJob();
111      computableJob.SerializedJobData = memStream.ToArray();
[2064]112      jobObj.CoresNeeded = 1;
113      jobObj.PluginsNeeded = requiredPlugins;
114      jobObj.State = HeuristicLab.Hive.Contracts.BusinessObjects.State.offline;
[2082]115
116      computableJob.JobInfo = jobObj;
117
118      return computableJob;
[2064]119    }
120
121    private IEngine RestoreEngine(byte[] serializedEngine, List<HivePluginInfo> requiredPlugins) {
[2058]122      // unzip and restore to determine the list of required plugins (NB: inefficient!)
[2064]123      MemoryStream memStream = new MemoryStream(serializedEngine);
[2058]124      GZipStream stream = new GZipStream(memStream, CompressionMode.Decompress, true);
125      XmlDocument document = new XmlDocument();
126      document.Load(stream);
127
128      Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
[2064]129      XmlNode rootNode = document.ChildNodes[1].ChildNodes[0];
130      IEngine engine = (IEngine)PersistenceManager.Restore(rootNode, dictionary);
[2058]131      stream.Close();
132
133      DiscoveryService service = new DiscoveryService();
134      List<PluginInfo> plugins = new List<PluginInfo>();
135
136      foreach (IStorable storeable in dictionary.Values) {
137        PluginInfo pluginInfo = service.GetDeclaringPlugin(storeable.GetType());
138        if (!plugins.Contains(pluginInfo)) {
139          plugins.Add(pluginInfo);
140          foreach (var dependency in pluginInfo.Dependencies) {
141            if (!plugins.Contains(dependency)) plugins.Add(dependency);
142          }
143        }
144      }
145
146      foreach (PluginInfo uniquePlugin in plugins) {
147        HivePluginInfo pluginInfo =
148          new HivePluginInfo();
149        pluginInfo.Name = uniquePlugin.Name;
150        pluginInfo.Version = uniquePlugin.Version.ToString();
151        pluginInfo.BuildDate = uniquePlugin.BuildDate;
[2064]152        requiredPlugins.Add(pluginInfo);
[2058]153      }
[2064]154      return engine;
[2058]155    }
[2073]156
157    private TResult SavelyExecute<TResult>(Func<TResult> a) where TResult : Response {
158      int retries = 0;
159      if (executionEngine == null)
160        executionEngine = ServiceLocator.CreateExecutionEngineFacade(address);
161
162      do {
163        try {
164          lock (connectionLock) {
165            return a();
166          }
167        }
168        catch (TimeoutException) {
169          retries++;
170          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
171        }
172        catch (CommunicationException) {
173          executionEngine = ServiceLocator.CreateExecutionEngineFacade(address);
174          retries++;
175          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
176        }
177      } while (retries < MAX_CONNECTION_RETRIES);
178      Logger.Warn("Reached max connection retries");
179      return null;
180    }
[2058]181  }
182}
Note: See TracBrowser for help on using the repository browser.