Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Grid.HiveBridge/3.2/HiveGridServerWrapper.cs @ 2080

Last change on this file since 2080 was 2078, checked in by gkronber, 15 years ago

Snapshot results should be ignored in the HiveGridServerWrapper. Using Canceled property of the engine to discriminate between snapshots and final results. #642 (Hive backend for CEDMA)

File size: 7.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Threading;
26using HeuristicLab.Hive.Contracts.Interfaces;
27using HeuristicLab.Hive.Contracts;
28using System.IO;
29using System.IO.Compression;
30using System.Xml;
31using HeuristicLab.Core;
32using HeuristicLab.PluginInfrastructure;
33using HeuristicLab.Hive.Contracts.BusinessObjects;
34using System.ServiceModel;
35using HeuristicLab.Tracing;
36
37namespace HeuristicLab.Grid.HiveBridge {
38  public class HiveGridServerWrapper : IGridServer {
39    private const int MAX_CONNECTION_RETRIES = 10;
40    private const int RETRY_TIMEOUT_SEC = 60;
41    private string address;
42    private IExecutionEngineFacade executionEngine;
43    private object connectionLock = new object();
44
45    public HiveGridServerWrapper(string address) {
46      this.address = address;
47    }
48
49    public JobState JobState(Guid guid) {
50      ResponseObject<JobResult> response = SavelyExecute(() => executionEngine.GetLastResult(guid, false));
51      if (response != null && response.Success == true &&
52        (response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE ||
53          response.StatusMessage == ApplicationConstants.RESPONSE_JOB_REQUEST_SET ||
54          response.StatusMessage == ApplicationConstants.RESPONSE_JOB_REQUEST_ALLREADY_SET ||
55          response.StatusMessage == ApplicationConstants.RESPONSE_JOB_JOB_RESULT_SENT)) {
56        return HeuristicLab.Grid.JobState.Busy;
57      } else return HeuristicLab.Grid.JobState.Unknown;
58    }
59
60    public Guid BeginExecuteEngine(byte[] engine) {
61      var jobObj = CreateJobObj(engine);
62
63      ResponseObject<HeuristicLab.Hive.Contracts.BusinessObjects.Job> res = SavelyExecute(() => executionEngine.AddJob(jobObj));
64      return res == null ? Guid.Empty : res.Obj.Id;
65    }
66
67    public byte[] TryEndExecuteEngine(Guid guid) {
68      ResponseObject<JobResult> response = SavelyExecute(() => executionEngine.GetLastResult(guid, false));
69      if (response != null &&
70        response.Success && response.Obj != null) {
71        HeuristicLab.Hive.Engine.Job restoredJob = (HeuristicLab.Hive.Engine.Job)PersistenceManager.RestoreFromGZip(response.Obj.Result);
72        // only return the engine when it wasn't canceled (result is only a snapshot)
73        if (!restoredJob.Engine.Canceled) {
74          // Serialize the engine
75          MemoryStream memStream = new MemoryStream();
76          GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
77          XmlDocument document = PersistenceManager.CreateXmlDocument();
78          Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
79          XmlNode rootNode = document.CreateElement("Root");
80          document.AppendChild(rootNode);
81          rootNode.AppendChild(PersistenceManager.Persist(restoredJob.Engine, document, dictionary));
82          document.Save(stream);
83          stream.Close();
84          return memStream.ToArray();
85        }
86      }
87
88      return null;
89    }
90
91    private HeuristicLab.Hive.Contracts.BusinessObjects.Job CreateJobObj(byte[] serializedEngine) {
92      HeuristicLab.Hive.Contracts.BusinessObjects.Job jobObj = new HeuristicLab.Hive.Contracts.BusinessObjects.Job();
93
94      List<HivePluginInfo> requiredPlugins = new List<HivePluginInfo>();
95      IEngine engine = RestoreEngine(serializedEngine, requiredPlugins);
96
97      HeuristicLab.Hive.Engine.Job job = new HeuristicLab.Hive.Engine.Job();
98      job.Engine.OperatorGraph.AddOperator(engine.OperatorGraph.InitialOperator);
99      job.Engine.OperatorGraph.InitialOperator = engine.OperatorGraph.InitialOperator;
100      job.Engine.Reset();
101
102      // Serialize the job
103      MemoryStream memStream = new MemoryStream();
104      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
105      XmlDocument document = PersistenceManager.CreateXmlDocument();
106      Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
107      XmlNode rootNode = document.CreateElement("Root");
108      document.AppendChild(rootNode);
109      rootNode.AppendChild(PersistenceManager.Persist(job, document, dictionary));
110      document.Save(stream);
111      stream.Close();
112
113      jobObj.SerializedJob = memStream.ToArray();
114      jobObj.CoresNeeded = 1;
115      jobObj.PluginsNeeded = requiredPlugins;
116      jobObj.State = HeuristicLab.Hive.Contracts.BusinessObjects.State.offline;
117      return jobObj;
118    }
119
120    private IEngine RestoreEngine(byte[] serializedEngine, List<HivePluginInfo> requiredPlugins) {
121      // unzip and restore to determine the list of required plugins (NB: inefficient!)
122      MemoryStream memStream = new MemoryStream(serializedEngine);
123      GZipStream stream = new GZipStream(memStream, CompressionMode.Decompress, true);
124      XmlDocument document = new XmlDocument();
125      document.Load(stream);
126
127      Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
128      XmlNode rootNode = document.ChildNodes[1].ChildNodes[0];
129      IEngine engine = (IEngine)PersistenceManager.Restore(rootNode, dictionary);
130      stream.Close();
131
132      DiscoveryService service = new DiscoveryService();
133      List<PluginInfo> plugins = new List<PluginInfo>();
134
135      foreach (IStorable storeable in dictionary.Values) {
136        PluginInfo pluginInfo = service.GetDeclaringPlugin(storeable.GetType());
137        if (!plugins.Contains(pluginInfo)) {
138          plugins.Add(pluginInfo);
139          foreach (var dependency in pluginInfo.Dependencies) {
140            if (!plugins.Contains(dependency)) plugins.Add(dependency);
141          }
142        }
143      }
144
145      foreach (PluginInfo uniquePlugin in plugins) {
146        HivePluginInfo pluginInfo =
147          new HivePluginInfo();
148        pluginInfo.Name = uniquePlugin.Name;
149        pluginInfo.Version = uniquePlugin.Version.ToString();
150        pluginInfo.BuildDate = uniquePlugin.BuildDate;
151        requiredPlugins.Add(pluginInfo);
152      }
153      return engine;
154    }
155
156    private TResult SavelyExecute<TResult>(Func<TResult> a) where TResult : Response {
157      int retries = 0;
158      if (executionEngine == null)
159        executionEngine = ServiceLocator.CreateExecutionEngineFacade(address);
160
161      do {
162        try {
163          lock (connectionLock) {
164            return a();
165          }
166        }
167        catch (TimeoutException) {
168          retries++;
169          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
170        }
171        catch (CommunicationException) {
172          executionEngine = ServiceLocator.CreateExecutionEngineFacade(address);
173          retries++;
174          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
175        }
176      } while (retries < MAX_CONNECTION_RETRIES);
177      Logger.Warn("Reached max connection retries");
178      return null;
179    }
180  }
181}
Note: See TracBrowser for help on using the repository browser.