Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Grid.HiveBridge/3.2/HiveGridServerWrapper.cs @ 2099

Last change on this file since 2099 was 2099, checked in by svonolfe, 15 years ago

Further avoided out of memory exceptions by updating the JobResult DAO (#372)

File size: 7.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using System.Threading;
26using HeuristicLab.Hive.Contracts.Interfaces;
27using HeuristicLab.Hive.Contracts;
28using System.IO;
29using System.IO.Compression;
30using System.Xml;
31using HeuristicLab.Core;
32using HeuristicLab.PluginInfrastructure;
33using HeuristicLab.Hive.Contracts.BusinessObjects;
34using System.ServiceModel;
35using HeuristicLab.Tracing;
36
37namespace HeuristicLab.Grid.HiveBridge {
38  public class HiveGridServerWrapper : IGridServer {
39    private const int MAX_CONNECTION_RETRIES = 10;
40    private const int RETRY_TIMEOUT_SEC = 60;
41    private string address;
42    private IExecutionEngineFacade executionEngine;
43    private object connectionLock = new object();
44
45    public HiveGridServerWrapper(string address) {
46      this.address = address;
47    }
48
49    public JobState JobState(Guid guid) {
50      ResponseObject<SerializedJobResult> response = SavelyExecute(() => executionEngine.GetLastSerializedResult(guid, false));
51      if (response != null && response.Success == true &&
52        (response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE ||
53          response.StatusMessage == ApplicationConstants.RESPONSE_JOB_REQUEST_SET ||
54          response.StatusMessage == ApplicationConstants.RESPONSE_JOB_REQUEST_ALLREADY_SET ||
55          response.StatusMessage == ApplicationConstants.RESPONSE_JOB_JOB_RESULT_SENT)) {
56        return HeuristicLab.Grid.JobState.Busy;
57      } else return HeuristicLab.Grid.JobState.Unknown;
58    }
59
60    public Guid BeginExecuteEngine(byte[] engine) {
61      var jobObj = CreateJobObj(engine);
62
63      ResponseObject<HeuristicLab.Hive.Contracts.BusinessObjects.Job> res = SavelyExecute(() => executionEngine.AddJob(jobObj));
64      return res == null ? Guid.Empty : res.Obj.Id;
65    }
66
67    public byte[] TryEndExecuteEngine(Guid guid) {
68      ResponseObject<SerializedJobResult> response = SavelyExecute(() => executionEngine.GetLastSerializedResult(guid, false));
69      if (response != null &&
70        response.Success && response.Obj != null) {
71        HeuristicLab.Hive.Engine.Job restoredJob = (HeuristicLab.Hive.Engine.Job)PersistenceManager.RestoreFromGZip(response.Obj.SerializedJobResultData);
72        // only return the engine when it wasn't canceled (result is only a snapshot)
73        if (!restoredJob.Engine.Canceled) {
74          // Serialize the engine
75          MemoryStream memStream = new MemoryStream();
76          GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
77          XmlDocument document = PersistenceManager.CreateXmlDocument();
78          Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
79          XmlNode rootNode = document.CreateElement("Root");
80          document.AppendChild(rootNode);
81          rootNode.AppendChild(PersistenceManager.Persist(restoredJob.Engine, document, dictionary));
82          document.Save(stream);
83          stream.Close();
84          return memStream.ToArray();
85        }
86      }
87
88      return null;
89    }
90
91    private HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob CreateJobObj(byte[] serializedEngine) {
92      HeuristicLab.Hive.Contracts.BusinessObjects.Job jobObj = new HeuristicLab.Hive.Contracts.BusinessObjects.Job();
93
94      List<HivePluginInfo> requiredPlugins = new List<HivePluginInfo>();
95      IEngine engine = RestoreEngine(serializedEngine, requiredPlugins);
96
97      HeuristicLab.Hive.Engine.Job job = new HeuristicLab.Hive.Engine.Job();
98      job.Engine.OperatorGraph.AddOperator(engine.OperatorGraph.InitialOperator);
99      job.Engine.OperatorGraph.InitialOperator = engine.OperatorGraph.InitialOperator;
100      job.Engine.Reset();
101
102      // Serialize the job
103      MemoryStream memStream = new MemoryStream();
104      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
105      XmlDocument document = PersistenceManager.CreateXmlDocument();
106      Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
107      XmlNode rootNode = document.CreateElement("Root");
108      document.AppendChild(rootNode);
109      rootNode.AppendChild(PersistenceManager.Persist(job, document, dictionary));
110      document.Save(stream);
111      stream.Close();
112
113      SerializedJob computableJob =
114        new SerializedJob();
115      computableJob.SerializedJobData = memStream.ToArray();
116      jobObj.CoresNeeded = 1;
117      jobObj.PluginsNeeded = requiredPlugins;
118      jobObj.State = HeuristicLab.Hive.Contracts.BusinessObjects.State.offline;
119
120      computableJob.JobInfo = jobObj;
121
122      return computableJob;
123    }
124
125    private IEngine RestoreEngine(byte[] serializedEngine, List<HivePluginInfo> requiredPlugins) {
126      // unzip and restore to determine the list of required plugins (NB: inefficient!)
127      MemoryStream memStream = new MemoryStream(serializedEngine);
128      GZipStream stream = new GZipStream(memStream, CompressionMode.Decompress, true);
129      XmlDocument document = new XmlDocument();
130      document.Load(stream);
131
132      Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
133      XmlNode rootNode = document.ChildNodes[1].ChildNodes[0];
134      IEngine engine = (IEngine)PersistenceManager.Restore(rootNode, dictionary);
135      stream.Close();
136
137      DiscoveryService service = new DiscoveryService();
138      List<PluginInfo> plugins = new List<PluginInfo>();
139
140      foreach (IStorable storeable in dictionary.Values) {
141        PluginInfo pluginInfo = service.GetDeclaringPlugin(storeable.GetType());
142        if (!plugins.Contains(pluginInfo)) {
143          plugins.Add(pluginInfo);
144          foreach (var dependency in pluginInfo.Dependencies) {
145            if (!plugins.Contains(dependency)) plugins.Add(dependency);
146          }
147        }
148      }
149
150      foreach (PluginInfo uniquePlugin in plugins) {
151        HivePluginInfo pluginInfo =
152          new HivePluginInfo();
153        pluginInfo.Name = uniquePlugin.Name;
154        pluginInfo.Version = uniquePlugin.Version.ToString();
155        pluginInfo.BuildDate = uniquePlugin.BuildDate;
156        requiredPlugins.Add(pluginInfo);
157      }
158      return engine;
159    }
160
161    private TResult SavelyExecute<TResult>(Func<TResult> a) where TResult : Response {
162      int retries = 0;
163      if (executionEngine == null)
164        executionEngine = ServiceLocator.CreateExecutionEngineFacade(address);
165
166      do {
167        try {
168          lock (connectionLock) {
169            return a();
170          }
171        }
172        catch (TimeoutException) {
173          retries++;
174          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
175        }
176        catch (CommunicationException) {
177          executionEngine = ServiceLocator.CreateExecutionEngineFacade(address);
178          retries++;
179          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
180        }
181      } while (retries < MAX_CONNECTION_RETRIES);
182      Logger.Warn("Reached max connection retries");
183      return null;
184    }
185  }
186}
Note: See TracBrowser for help on using the repository browser.