Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Engine/3.2/HiveEngine.cs @ 2099

Last change on this file since 2099 was 2099, checked in by svonolfe, 15 years ago

Further avoided out of memory exceptions by updating the JobResult DAO (#372)

File size: 12.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using HeuristicLab.Core;
26using System.Threading;
27using HeuristicLab.Hive.JobBase;
28using HeuristicLab.Hive.Contracts.Interfaces;
29using HeuristicLab.Hive.Contracts;
30using HeuristicLab.PluginInfrastructure;
31using HeuristicLab.Hive.Contracts.BusinessObjects;
32using System.IO;
33using System.Xml;
34using System.IO.Compression;
35using HeuristicLab.Tracing;
36
37namespace HeuristicLab.Hive.Engine {
38  /// <summary>
39  /// Represents an engine that executes its operator-graph on the hive.
40  /// in parallel.
41  /// </summary>
42  public class HiveEngine : ItemBase, IEngine, IEditable {
43    private const int SNAPSHOT_POLLING_INTERVAL_MS = 1000;
44    private const int RESULT_POLLING_INTERVAL_MS = 10000;
45    private Guid jobId;
46    private Job job;
47    private object locker = new object();
48    private volatile bool abortRequested;
49
50    public string HiveServerUrl { get; set; }
51
52    public HiveEngine() {
53      job = new Job();
54      abortRequested = false;
55    }
56
57    #region IEngine Members
58
59    public IOperatorGraph OperatorGraph {
60      get { return job.Engine.OperatorGraph; }
61    }
62
63    public IScope GlobalScope {
64      get { return job.Engine.GlobalScope; }
65    }
66
67    public TimeSpan ExecutionTime {
68      get { return job.Engine.ExecutionTime; }
69    }
70
71    public ThreadPriority Priority {
72      get { return job.Engine.Priority; }
73      set { job.Engine.Priority = value; }
74    }
75
76    public bool Running {
77      get { return job.Engine.Running; }
78    }
79
80    public bool Canceled {
81      get { return job.Engine.Canceled; }
82    }
83
84    public bool Terminated {
85      get { return job.Engine.Terminated; }
86    }
87
88    public void Execute() {
89      var jobObj = CreateJobObj();
90
91      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
92      ResponseObject<Contracts.BusinessObjects.Job> res = executionEngineFacade.AddJob(jobObj);
93      jobId = res.Obj.Id;
94
95      StartResultPollingThread();
96    }
97
98    private void StartResultPollingThread() {
99      // start a backgroud thread to poll the final result of the job
100      Thread t = new Thread(() => {
101        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
102        ResponseObject<SerializedJobResult> response = null;
103        Job restoredJob = null;
104        do {
105          Thread.Sleep(RESULT_POLLING_INTERVAL_MS);
106          lock (locker) {
107            HiveLogger.Debug("HiveEngine: Results-polling - GetLastResult");
108            response = executionEngineFacade.GetLastSerializedResult(jobId, false);
109            HiveLogger.Debug("HiveEngine: Results-polling - Server: " + response.StatusMessage + " success: " + response.Success);
110            // loop while
111            // 1. the user doesn't request an abort
112            // 2. there is a problem with server communication (success==false)
113            // 3. no result for the job is available yet (response.Obj==null)
114            // 4. the result that we get from the server is a snapshot and not the final result
115            if (abortRequested) return;
116            if (response.Success && response.Obj != null) {
117              HiveLogger.Debug("HiveEngine: Results-polling - Got result!");
118              restoredJob = (Job)PersistenceManager.RestoreFromGZip(response.Obj.SerializedJobResultData);
119              HiveLogger.Debug("HiveEngine: Results-polling - IsSnapshotResult: " + restoredJob.Engine.Canceled);
120            }
121          }
122        } while (restoredJob == null || restoredJob.Engine.Canceled);
123
124        job = restoredJob;
125        OnChanged();
126        OnFinished();
127      });
128      HiveLogger.Debug("HiveEngine: Starting results-polling thread");
129      t.Start();
130    }
131
132    public void RequestSnapshot() {
133      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
134
135      ResponseObject<SerializedJobResult> response;
136      lock (locker) {
137        HiveLogger.Debug("HiveEngine: Abort - RequestSnapshot");
138        Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
139        if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) {
140          // job is finished already
141          HiveLogger.Debug("HiveEngine: Abort - GetLastResult(false)");
142          response = executionEngineFacade.GetLastSerializedResult(jobId, false);
143          HiveLogger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
144        } else {
145          // server sent snapshot request to client
146          // poll until snapshot is ready
147          do {
148            Thread.Sleep(SNAPSHOT_POLLING_INTERVAL_MS);
149            HiveLogger.Debug("HiveEngine: Abort - GetLastResult(true)");
150            response = executionEngineFacade.GetLastSerializedResult(jobId, true);
151            HiveLogger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
152            // loop while
153            // 1. problem with communication with server
154            // 2. job result not yet ready
155          } while (
156            !response.Success ||
157            response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE);
158        }
159      }
160      SerializedJobResult jobResult = response.Obj;
161      if (jobResult != null) {
162        HiveLogger.Debug("HiveEngine: Results-polling - Got result!");
163        job = (Job)PersistenceManager.RestoreFromGZip(jobResult.SerializedJobResultData);
164        //PluginManager.ControlManager.ShowControl(job.Engine.CreateView());
165      }
166      //HiveLogger.Debug("HiveEngine: Results-polling - Exception!");
167      //Exception ex = new Exception(response.Obj.Exception.Message);
168      //ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
169    }
170
171    public void ExecuteStep() {
172      throw new NotSupportedException();
173    }
174
175    public void ExecuteSteps(int steps) {
176      throw new NotSupportedException();
177    }
178
179    public void Abort() {
180      abortRequested = true;
181      RequestSnapshot();
182      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
183      executionEngineFacade.AbortJob(jobId);
184      OnChanged();
185      OnFinished();
186    }
187
188    public void Reset() {
189      abortRequested = false;
190      job.Engine.Reset();
191      jobId = Guid.NewGuid();
192      OnInitialized();
193    }
194
195    private HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob CreateJobObj() {
196      HeuristicLab.Hive.Contracts.BusinessObjects.Job jobObj =
197        new HeuristicLab.Hive.Contracts.BusinessObjects.Job();
198
199      MemoryStream memStream = new MemoryStream();
200      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
201      XmlDocument document = PersistenceManager.CreateXmlDocument();
202      Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
203      XmlNode rootNode = document.CreateElement("Root");
204      document.AppendChild(rootNode);
205      rootNode.AppendChild(PersistenceManager.Persist(job, document, dictionary));
206      document.Save(stream);
207      stream.Close();
208
209      HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob executableJobObj =
210        new HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob();
211      executableJobObj.JobInfo = jobObj;
212      executableJobObj.SerializedJobData = memStream.ToArray();
213
214      DiscoveryService service = new DiscoveryService();
215      List<PluginInfo> plugins = new List<PluginInfo>();
216
217      foreach (IStorable storeable in dictionary.Values) {
218        PluginInfo pluginInfo = service.GetDeclaringPlugin(storeable.GetType());
219        if (!plugins.Contains(pluginInfo)) {
220          plugins.Add(pluginInfo);
221          foreach (var dependency in pluginInfo.Dependencies) {
222            if (!plugins.Contains(dependency)) plugins.Add(dependency);
223          }
224        }
225      }
226
227      List<HivePluginInfo> pluginsNeeded =
228        new List<HivePluginInfo>();
229      foreach (PluginInfo uniquePlugin in plugins) {
230        HivePluginInfo pluginInfo =
231          new HivePluginInfo();
232        pluginInfo.Name = uniquePlugin.Name;
233        pluginInfo.Version = uniquePlugin.Version.ToString();
234        pluginInfo.BuildDate = uniquePlugin.BuildDate;
235        pluginsNeeded.Add(pluginInfo);
236      }
237
238      jobObj.CoresNeeded = 1;
239      jobObj.PluginsNeeded = pluginsNeeded;
240      jobObj.State = HeuristicLab.Hive.Contracts.BusinessObjects.State.offline;
241      return executableJobObj;
242    }
243
244    public event EventHandler Initialized;
245    /// <summary>
246    /// Fires a new <c>Initialized</c> event.
247    /// </summary>
248    protected virtual void OnInitialized() {
249      if (Initialized != null)
250        Initialized(this, new EventArgs());
251    }
252
253    public event EventHandler<OperationEventArgs> OperationExecuted;
254    /// <summary>
255    /// Fires a new <c>OperationExecuted</c> event.
256    /// </summary>
257    /// <param name="operation">The operation that has been executed.</param>
258    protected virtual void OnOperationExecuted(IOperation operation) {
259      if (OperationExecuted != null)
260        OperationExecuted(this, new OperationEventArgs(operation));
261    }
262
263    public event EventHandler<ExceptionEventArgs> ExceptionOccurred;
264    /// <summary>
265    /// Aborts the execution and fires a new <c>ExceptionOccurred</c> event.
266    /// </summary>
267    /// <param name="exception">The exception that was thrown.</param>
268    protected virtual void OnExceptionOccurred(Exception exception) {
269      Abort();
270      if (ExceptionOccurred != null)
271        ExceptionOccurred(this, new ExceptionEventArgs(exception));
272    }
273
274    public event EventHandler ExecutionTimeChanged;
275    /// <summary>
276    /// Fires a new <c>ExecutionTimeChanged</c> event.
277    /// </summary>
278    protected virtual void OnExecutionTimeChanged() {
279      if (ExecutionTimeChanged != null)
280        ExecutionTimeChanged(this, new EventArgs());
281    }
282
283    public event EventHandler Finished;
284    /// <summary>
285    /// Fires a new <c>Finished</c> event.
286    /// </summary>
287    protected virtual void OnFinished() {
288      if (Finished != null)
289        Finished(this, new EventArgs());
290    }
291
292    #endregion
293
294    public override IView CreateView() {
295      return new HiveEngineEditor(this);
296    }
297
298    #region IEditable Members
299
300    public IEditor CreateEditor() {
301      return new HiveEngineEditor(this);
302    }
303    #endregion
304
305    public override System.Xml.XmlNode GetXmlNode(string name, System.Xml.XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
306      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
307      XmlAttribute attr = document.CreateAttribute("HiveServerUrl");
308      attr.Value = HiveServerUrl;
309      node.Attributes.Append(attr);
310      node.AppendChild(PersistenceManager.Persist("Job", job, document, persistedObjects));
311      return node;
312    }
313
314    public override void Populate(System.Xml.XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
315      base.Populate(node, restoredObjects);
316      HiveServerUrl = node.Attributes["HiveServerUrl"].Value;
317      job = (Job)PersistenceManager.Restore(node.SelectSingleNode("Job"), restoredObjects);
318    }
319  }
320}
Note: See TracBrowser for help on using the repository browser.