Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Engine/3.2/HiveEngine.cs @ 2228

Last change on this file since 2228 was 2111, checked in by gkronber, 15 years ago

Implemented discrimination of snapshots vs. end results in HiveEngine and Job. #545 (Engine which can be executed in the Hive)

File size: 12.4 KB
RevLine 
[1432]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using HeuristicLab.Core;
26using System.Threading;
[1440]27using HeuristicLab.Hive.JobBase;
28using HeuristicLab.Hive.Contracts.Interfaces;
[1487]29using HeuristicLab.Hive.Contracts;
[1503]30using HeuristicLab.PluginInfrastructure;
[1580]31using HeuristicLab.Hive.Contracts.BusinessObjects;
[1591]32using System.IO;
[1726]33using System.Xml;
[1730]34using System.IO.Compression;
[2032]35using HeuristicLab.Tracing;
[1432]36
37namespace HeuristicLab.Hive.Engine {
38  /// <summary>
39  /// Represents an engine that executes its operator-graph on the hive.
40  /// in parallel.
41  /// </summary>
[1440]42  public class HiveEngine : ItemBase, IEngine, IEditable {
[2018]43    private const int SNAPSHOT_POLLING_INTERVAL_MS = 1000;
44    private const int RESULT_POLLING_INTERVAL_MS = 10000;
[2111]45    private const int MAX_SNAPSHOT_RETRIES = 20;
[1510]46    private Guid jobId;
[1440]47    private Job job;
[2032]48    private object locker = new object();
49    private volatile bool abortRequested;
50
[1440]51    public string HiveServerUrl { get; set; }
[1432]52
[1440]53    public HiveEngine() {
54      job = new Job();
[2032]55      abortRequested = false;
[2104]56      Priority = ThreadPriority.Lowest;
[1432]57    }
58
[1440]59    #region IEngine Members
60
61    public IOperatorGraph OperatorGraph {
62      get { return job.Engine.OperatorGraph; }
[1432]63    }
64
[1440]65    public IScope GlobalScope {
66      get { return job.Engine.GlobalScope; }
[1432]67    }
68
[1440]69    public TimeSpan ExecutionTime {
70      get { return job.Engine.ExecutionTime; }
[1432]71    }
[1440]72
[1815]73    public ThreadPriority Priority {
74      get { return job.Engine.Priority; }
75      set { job.Engine.Priority = value; }
76    }
77
[1440]78    public bool Running {
79      get { return job.Engine.Running; }
80    }
81
82    public bool Canceled {
83      get { return job.Engine.Canceled; }
84    }
85
86    public bool Terminated {
87      get { return job.Engine.Terminated; }
88    }
89
90    public void Execute() {
[1730]91      var jobObj = CreateJobObj();
[1925]92      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
[1730]93      ResponseObject<Contracts.BusinessObjects.Job> res = executionEngineFacade.AddJob(jobObj);
94      jobId = res.Obj.Id;
[2018]95
96      StartResultPollingThread();
[1730]97    }
98
[2018]99    private void StartResultPollingThread() {
100      // start a backgroud thread to poll the final result of the job
101      Thread t = new Thread(() => {
102        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
[2099]103        ResponseObject<SerializedJobResult> response = null;
[2032]104        Job restoredJob = null;
[2018]105        do {
[2032]106          Thread.Sleep(RESULT_POLLING_INTERVAL_MS);
107          lock (locker) {
108            HiveLogger.Debug("HiveEngine: Results-polling - GetLastResult");
[2099]109            response = executionEngineFacade.GetLastSerializedResult(jobId, false);
[2032]110            HiveLogger.Debug("HiveEngine: Results-polling - Server: " + response.StatusMessage + " success: " + response.Success);
111            // loop while
112            // 1. the user doesn't request an abort
113            // 2. there is a problem with server communication (success==false)
114            // 3. no result for the job is available yet (response.Obj==null)
115            // 4. the result that we get from the server is a snapshot and not the final result
116            if (abortRequested) return;
117            if (response.Success && response.Obj != null) {
118              HiveLogger.Debug("HiveEngine: Results-polling - Got result!");
[2099]119              restoredJob = (Job)PersistenceManager.RestoreFromGZip(response.Obj.SerializedJobResultData);
[2111]120              HiveLogger.Debug("HiveEngine: Results-polling - IsSnapshotResult: " + (restoredJob.Progress<1.0));
[2032]121            }
[1925]122          }
[2111]123        } while (restoredJob == null || (restoredJob.Progress < 1.0));
[2032]124
125        job = restoredJob;
[2105]126        PluginManager.ControlManager.ShowControl(job.Engine.CreateView());
[2032]127        OnChanged();
128        OnFinished();
[2018]129      });
[2032]130      HiveLogger.Debug("HiveEngine: Starting results-polling thread");
[2018]131      t.Start();
[1440]132    }
133
[1726]134    public void RequestSnapshot() {
135      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
[2111]136      int retryCount = 0;
[2099]137      ResponseObject<SerializedJobResult> response;
[2032]138      lock (locker) {
139        HiveLogger.Debug("HiveEngine: Abort - RequestSnapshot");
140        Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
141        if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) {
142          // job is finished already
143          HiveLogger.Debug("HiveEngine: Abort - GetLastResult(false)");
[2099]144          response = executionEngineFacade.GetLastSerializedResult(jobId, false);
[2032]145          HiveLogger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
146        } else {
147          // server sent snapshot request to client
148          // poll until snapshot is ready
149          do {
[2018]150            Thread.Sleep(SNAPSHOT_POLLING_INTERVAL_MS);
[2032]151            HiveLogger.Debug("HiveEngine: Abort - GetLastResult(true)");
[2099]152            response = executionEngineFacade.GetLastSerializedResult(jobId, true);
[2032]153            HiveLogger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
[2111]154            retryCount++;
[2032]155            // loop while
156            // 1. problem with communication with server
157            // 2. job result not yet ready
158          } while (
[2111]159            (retryCount < MAX_SNAPSHOT_RETRIES) && (
[2032]160            !response.Success ||
[2111]161            response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE)
162            );
[1726]163        }
164      }
[2099]165      SerializedJobResult jobResult = response.Obj;
[2032]166      if (jobResult != null) {
167        HiveLogger.Debug("HiveEngine: Results-polling - Got result!");
[2099]168        job = (Job)PersistenceManager.RestoreFromGZip(jobResult.SerializedJobResultData);
[2104]169        PluginManager.ControlManager.ShowControl(job.Engine.CreateView());
[2032]170      }
171      //HiveLogger.Debug("HiveEngine: Results-polling - Exception!");
172      //Exception ex = new Exception(response.Obj.Exception.Message);
173      //ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
[1726]174    }
175
[1440]176    public void ExecuteStep() {
177      throw new NotSupportedException();
178    }
179
180    public void ExecuteSteps(int steps) {
181      throw new NotSupportedException();
182    }
183
184    public void Abort() {
[2032]185      abortRequested = true;
[2111]186      // RequestSnapshot();
[1510]187      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
[1726]188      executionEngineFacade.AbortJob(jobId);
[2032]189      OnChanged();
[1726]190      OnFinished();
[1440]191    }
192
193    public void Reset() {
[2032]194      abortRequested = false;
[1726]195      job.Engine.Reset();
196      jobId = Guid.NewGuid();
197      OnInitialized();
[1440]198    }
199
[2092]200    private HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob CreateJobObj() {
[2111]201      HeuristicLab.Hive.Contracts.BusinessObjects.Job jobObj =
[2082]202        new HeuristicLab.Hive.Contracts.BusinessObjects.Job();
[2018]203
204      MemoryStream memStream = new MemoryStream();
205      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
206      XmlDocument document = PersistenceManager.CreateXmlDocument();
207      Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
208      XmlNode rootNode = document.CreateElement("Root");
209      document.AppendChild(rootNode);
210      rootNode.AppendChild(PersistenceManager.Persist(job, document, dictionary));
211      document.Save(stream);
212      stream.Close();
213
[2092]214      HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob executableJobObj =
215        new HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob();
[2082]216      executableJobObj.JobInfo = jobObj;
[2092]217      executableJobObj.SerializedJobData = memStream.ToArray();
[2082]218
[2018]219      DiscoveryService service = new DiscoveryService();
220      List<PluginInfo> plugins = new List<PluginInfo>();
221
222      foreach (IStorable storeable in dictionary.Values) {
223        PluginInfo pluginInfo = service.GetDeclaringPlugin(storeable.GetType());
224        if (!plugins.Contains(pluginInfo)) {
225          plugins.Add(pluginInfo);
226          foreach (var dependency in pluginInfo.Dependencies) {
227            if (!plugins.Contains(dependency)) plugins.Add(dependency);
228          }
229        }
230      }
231
232      List<HivePluginInfo> pluginsNeeded =
233        new List<HivePluginInfo>();
234      foreach (PluginInfo uniquePlugin in plugins) {
235        HivePluginInfo pluginInfo =
236          new HivePluginInfo();
237        pluginInfo.Name = uniquePlugin.Name;
238        pluginInfo.Version = uniquePlugin.Version.ToString();
239        pluginInfo.BuildDate = uniquePlugin.BuildDate;
240        pluginsNeeded.Add(pluginInfo);
241      }
242
243      jobObj.CoresNeeded = 1;
244      jobObj.PluginsNeeded = pluginsNeeded;
245      jobObj.State = HeuristicLab.Hive.Contracts.BusinessObjects.State.offline;
[2082]246      return executableJobObj;
[2018]247    }
248
[1440]249    public event EventHandler Initialized;
[1726]250    /// <summary>
251    /// Fires a new <c>Initialized</c> event.
252    /// </summary>
253    protected virtual void OnInitialized() {
254      if (Initialized != null)
255        Initialized(this, new EventArgs());
256    }
[1440]257
258    public event EventHandler<OperationEventArgs> OperationExecuted;
[1726]259    /// <summary>
260    /// Fires a new <c>OperationExecuted</c> event.
261    /// </summary>
262    /// <param name="operation">The operation that has been executed.</param>
263    protected virtual void OnOperationExecuted(IOperation operation) {
264      if (OperationExecuted != null)
265        OperationExecuted(this, new OperationEventArgs(operation));
266    }
[1440]267
268    public event EventHandler<ExceptionEventArgs> ExceptionOccurred;
[1726]269    /// <summary>
270    /// Aborts the execution and fires a new <c>ExceptionOccurred</c> event.
271    /// </summary>
272    /// <param name="exception">The exception that was thrown.</param>
273    protected virtual void OnExceptionOccurred(Exception exception) {
274      Abort();
275      if (ExceptionOccurred != null)
276        ExceptionOccurred(this, new ExceptionEventArgs(exception));
277    }
[1440]278
279    public event EventHandler ExecutionTimeChanged;
[1726]280    /// <summary>
281    /// Fires a new <c>ExecutionTimeChanged</c> event.
282    /// </summary>
283    protected virtual void OnExecutionTimeChanged() {
284      if (ExecutionTimeChanged != null)
285        ExecutionTimeChanged(this, new EventArgs());
286    }
[1440]287
288    public event EventHandler Finished;
[1726]289    /// <summary>
290    /// Fires a new <c>Finished</c> event.
291    /// </summary>
292    protected virtual void OnFinished() {
293      if (Finished != null)
294        Finished(this, new EventArgs());
295    }
[1440]296
297    #endregion
298
299    public override IView CreateView() {
300      return new HiveEngineEditor(this);
301    }
302
303    #region IEditable Members
304
305    public IEditor CreateEditor() {
306      return new HiveEngineEditor(this);
307    }
308    #endregion
[1726]309
310    public override System.Xml.XmlNode GetXmlNode(string name, System.Xml.XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
311      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
312      XmlAttribute attr = document.CreateAttribute("HiveServerUrl");
313      attr.Value = HiveServerUrl;
314      node.Attributes.Append(attr);
315      node.AppendChild(PersistenceManager.Persist("Job", job, document, persistedObjects));
316      return node;
317    }
318
319    public override void Populate(System.Xml.XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
320      base.Populate(node, restoredObjects);
321      HiveServerUrl = node.Attributes["HiveServerUrl"].Value;
322      job = (Job)PersistenceManager.Restore(node.SelectSingleNode("Job"), restoredObjects);
323    }
[1432]324  }
325}
Note: See TracBrowser for help on using the repository browser.