Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Engine/3.2/HiveEngine.cs @ 3578

Last change on this file since 3578 was 3578, checked in by kgrading, 14 years ago

Removed References to HiveLogging and updated the default logging mechanism (#991)

File size: 13.2 KB
RevLine 
[1432]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using HeuristicLab.Core;
26using System.Threading;
[2474]27using System.IO;
28using System.Xml;
29using System.IO.Compression;
30using HeuristicLab.Common;
[1440]31using HeuristicLab.Hive.JobBase;
32using HeuristicLab.Hive.Contracts.Interfaces;
[1487]33using HeuristicLab.Hive.Contracts;
[1503]34using HeuristicLab.PluginInfrastructure;
[1580]35using HeuristicLab.Hive.Contracts.BusinessObjects;
[2032]36using HeuristicLab.Tracing;
[1432]37
38namespace HeuristicLab.Hive.Engine {
39  /// <summary>
40  /// Represents an engine that executes its operator-graph on the hive.
41  /// in parallel.
42  /// </summary>
[1440]43  public class HiveEngine : ItemBase, IEngine, IEditable {
[2018]44    private const int SNAPSHOT_POLLING_INTERVAL_MS = 1000;
45    private const int RESULT_POLLING_INTERVAL_MS = 10000;
[2111]46    private const int MAX_SNAPSHOT_RETRIES = 20;
[1510]47    private Guid jobId;
[1440]48    private Job job;
[2032]49    private object locker = new object();
50    private volatile bool abortRequested;
51
[1440]52    public string HiveServerUrl { get; set; }
[2846]53    public string MultiSubmitCount { get; set; }
[3018]54    public string RessourceIds { get; set; }
[1432]55
[1440]56    public HiveEngine() {
57      job = new Job();
[2032]58      abortRequested = false;
[2104]59      Priority = ThreadPriority.Lowest;
[1432]60    }
61
[1440]62    #region IEngine Members
63
64    public IOperatorGraph OperatorGraph {
65      get { return job.Engine.OperatorGraph; }
[1432]66    }
67
[1440]68    public IScope GlobalScope {
69      get { return job.Engine.GlobalScope; }
[1432]70    }
71
[1440]72    public TimeSpan ExecutionTime {
73      get { return job.Engine.ExecutionTime; }
[1432]74    }
[1440]75
[1815]76    public ThreadPriority Priority {
77      get { return job.Engine.Priority; }
78      set { job.Engine.Priority = value; }
79    }
80
[1440]81    public bool Running {
82      get { return job.Engine.Running; }
83    }
84
85    public bool Canceled {
86      get { return job.Engine.Canceled; }
87    }
88
89    public bool Terminated {
90      get { return job.Engine.Terminated; }
91    }
92
93    public void Execute() {
[1730]94      var jobObj = CreateJobObj();
[1925]95      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
[3220]96
97      List<string> groups = new List<string>();
98      if (!String.Empty.Equals(RessourceIds)) {
99        groups.AddRange(RessourceIds.Split(';'));       
100      }
101
102      /*if(!String.Empty.Equals(RessourceIds)) {
[3018]103        String[] ids = RessourceIds.Split(';');
104        foreach (string sid in ids) {       
105          try {
106            System.Guid gid = new Guid(sid);
107            jobObj.JobInfo.AssignedResourceIds.Add(gid);
108          } catch(Exception ex) {
109           
110          }   
111        }
[3220]112      } */     
[3018]113
[2846]114      int loops = 1;
115     
116      Int32.TryParse(MultiSubmitCount, out loops);
117      if(loops == 0)
118        loops = 1;
[2018]119
[2846]120      for (int i = 0; i < loops; i++) {
[3220]121        ResponseObject<Contracts.BusinessObjects.JobDto> res = executionEngineFacade.AddJobWithGroupStrings(jobObj, groups);
[2846]122        jobId = res.Obj.Id;
123      }
124     
[2018]125      StartResultPollingThread();
[1730]126    }
127
[2018]128    private void StartResultPollingThread() {
129      // start a backgroud thread to poll the final result of the job
130      Thread t = new Thread(() => {
131        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
[3011]132        ResponseObject<SerializedJob> response = null;
[2032]133        Job restoredJob = null;
[2018]134        do {
[2032]135          Thread.Sleep(RESULT_POLLING_INTERVAL_MS);
136          lock (locker) {
[3578]137            Logger.Debug("HiveEngine: Results-polling - GetLastResult");
[2099]138            response = executionEngineFacade.GetLastSerializedResult(jobId, false);
[3578]139            Logger.Debug("HiveEngine: Results-polling - Server: " + response.StatusMessage + " success: " + response.Success);
[2032]140            // loop while
141            // 1. the user doesn't request an abort
142            // 2. there is a problem with server communication (success==false)
143            // 3. no result for the job is available yet (response.Obj==null)
144            // 4. the result that we get from the server is a snapshot and not the final result
145            if (abortRequested) return;
146            if (response.Success && response.Obj != null) {
[3578]147              Logger.Debug("HiveEngine: Results-polling - Got result!");
[3011]148              restoredJob = (Job)PersistenceManager.RestoreFromGZip(response.Obj.SerializedJobData);
[3578]149              Logger.Debug("HiveEngine: Results-polling - IsSnapshotResult: " + (restoredJob.Progress < 1.0));
[2032]150            }
[1925]151          }
[2111]152        } while (restoredJob == null || (restoredJob.Progress < 1.0));
[2032]153
154        job = restoredJob;
[2591]155        ControlManager.Manager.ShowControl(job.Engine.CreateView());
[2032]156        OnChanged();
157        OnFinished();
[2018]158      });
[3578]159      Logger.Debug("HiveEngine: Starting results-polling thread");
[2018]160      t.Start();
[1440]161    }
162
[1726]163    public void RequestSnapshot() {
164      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
[2111]165      int retryCount = 0;
[3011]166      ResponseObject<SerializedJob> response;
[2032]167      lock (locker) {
[3578]168        Logger.Debug("HiveEngine: Abort - RequestSnapshot");
[2032]169        Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
170        if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) {
171          // job is finished already
[3578]172          Logger.Debug("HiveEngine: Abort - GetLastResult(false)");
[2099]173          response = executionEngineFacade.GetLastSerializedResult(jobId, false);
[3578]174          Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
[2032]175        } else {
176          // server sent snapshot request to client
177          // poll until snapshot is ready
178          do {
[2018]179            Thread.Sleep(SNAPSHOT_POLLING_INTERVAL_MS);
[3578]180            Logger.Debug("HiveEngine: Abort - GetLastResult(true)");
[2099]181            response = executionEngineFacade.GetLastSerializedResult(jobId, true);
[3578]182            Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
[2111]183            retryCount++;
[2032]184            // loop while
185            // 1. problem with communication with server
186            // 2. job result not yet ready
187          } while (
[2111]188            (retryCount < MAX_SNAPSHOT_RETRIES) && (
[2032]189            !response.Success ||
[2111]190            response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE)
191            );
[1726]192        }
193      }
[3011]194      SerializedJob jobResult = response.Obj;
[2032]195      if (jobResult != null) {
[3578]196        Logger.Debug("HiveEngine: Results-polling - Got result!");
[3011]197        job = (Job)PersistenceManager.RestoreFromGZip(jobResult.SerializedJobData);
[2591]198        ControlManager.Manager.ShowControl(job.Engine.CreateView());
[2032]199      }
200      //HiveLogger.Debug("HiveEngine: Results-polling - Exception!");
201      //Exception ex = new Exception(response.Obj.Exception.Message);
202      //ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
[1726]203    }
204
[1440]205    public void ExecuteStep() {
206      throw new NotSupportedException();
207    }
208
209    public void ExecuteSteps(int steps) {
210      throw new NotSupportedException();
211    }
212
213    public void Abort() {
[2032]214      abortRequested = true;
[2111]215      // RequestSnapshot();
[1510]216      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
[1726]217      executionEngineFacade.AbortJob(jobId);
[2032]218      OnChanged();
[1726]219      OnFinished();
[1440]220    }
221
222    public void Reset() {
[2032]223      abortRequested = false;
[1726]224      job.Engine.Reset();
225      jobId = Guid.NewGuid();
226      OnInitialized();
[1440]227    }
228
[2092]229    private HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob CreateJobObj() {
[3011]230      HeuristicLab.Hive.Contracts.BusinessObjects.JobDto jobObj =
231        new HeuristicLab.Hive.Contracts.BusinessObjects.JobDto();
[2018]232
233      MemoryStream memStream = new MemoryStream();
234      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
235      XmlDocument document = PersistenceManager.CreateXmlDocument();
236      Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
237      XmlNode rootNode = document.CreateElement("Root");
238      document.AppendChild(rootNode);
239      rootNode.AppendChild(PersistenceManager.Persist(job, document, dictionary));
240      document.Save(stream);
241      stream.Close();
242
[2092]243      HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob executableJobObj =
244        new HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob();
[2082]245      executableJobObj.JobInfo = jobObj;
[2092]246      executableJobObj.SerializedJobData = memStream.ToArray();
[2082]247
[2591]248      List<IPluginDescription> plugins = new List<IPluginDescription>();
[2018]249
250      foreach (IStorable storeable in dictionary.Values) {
[2591]251        IPluginDescription pluginInfo = ApplicationManager.Manager.GetDeclaringPlugin(storeable.GetType());
[2018]252        if (!plugins.Contains(pluginInfo)) {
253          plugins.Add(pluginInfo);
254          foreach (var dependency in pluginInfo.Dependencies) {
255            if (!plugins.Contains(dependency)) plugins.Add(dependency);
256          }
257        }
258      }
259
[3011]260      List<HivePluginInfoDto> pluginsNeeded =
261        new List<HivePluginInfoDto>();
[2591]262      foreach (IPluginDescription uniquePlugin in plugins) {
[3011]263        HivePluginInfoDto pluginInfo =
264          new HivePluginInfoDto();
[2018]265        pluginInfo.Name = uniquePlugin.Name;
266        pluginInfo.Version = uniquePlugin.Version.ToString();
267        pluginInfo.BuildDate = uniquePlugin.BuildDate;
268        pluginsNeeded.Add(pluginInfo);
269      }
270
271      jobObj.CoresNeeded = 1;
272      jobObj.PluginsNeeded = pluginsNeeded;
273      jobObj.State = HeuristicLab.Hive.Contracts.BusinessObjects.State.offline;
[2082]274      return executableJobObj;
[2018]275    }
276
[1440]277    public event EventHandler Initialized;
[1726]278    /// <summary>
279    /// Fires a new <c>Initialized</c> event.
280    /// </summary>
281    protected virtual void OnInitialized() {
282      if (Initialized != null)
283        Initialized(this, new EventArgs());
284    }
[1440]285
[2474]286    public event EventHandler<EventArgs<IOperation>> OperationExecuted;
[1726]287    /// <summary>
288    /// Fires a new <c>OperationExecuted</c> event.
289    /// </summary>
290    /// <param name="operation">The operation that has been executed.</param>
291    protected virtual void OnOperationExecuted(IOperation operation) {
292      if (OperationExecuted != null)
[2474]293        OperationExecuted(this, new EventArgs<IOperation>(operation));
[1726]294    }
[1440]295
[2474]296    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
[1726]297    /// <summary>
298    /// Aborts the execution and fires a new <c>ExceptionOccurred</c> event.
299    /// </summary>
300    /// <param name="exception">The exception that was thrown.</param>
301    protected virtual void OnExceptionOccurred(Exception exception) {
302      Abort();
303      if (ExceptionOccurred != null)
[2474]304        ExceptionOccurred(this, new EventArgs<Exception>(exception));
[1726]305    }
[1440]306
307    public event EventHandler ExecutionTimeChanged;
[1726]308    /// <summary>
309    /// Fires a new <c>ExecutionTimeChanged</c> event.
310    /// </summary>
311    protected virtual void OnExecutionTimeChanged() {
312      if (ExecutionTimeChanged != null)
313        ExecutionTimeChanged(this, new EventArgs());
314    }
[1440]315
316    public event EventHandler Finished;
[1726]317    /// <summary>
318    /// Fires a new <c>Finished</c> event.
319    /// </summary>
320    protected virtual void OnFinished() {
321      if (Finished != null)
322        Finished(this, new EventArgs());
323    }
[1440]324
325    #endregion
326
327    public override IView CreateView() {
328      return new HiveEngineEditor(this);
329    }
330
331    #region IEditable Members
332
333    public IEditor CreateEditor() {
334      return new HiveEngineEditor(this);
335    }
336    #endregion
[1726]337
338    public override System.Xml.XmlNode GetXmlNode(string name, System.Xml.XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
339      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
340      XmlAttribute attr = document.CreateAttribute("HiveServerUrl");
341      attr.Value = HiveServerUrl;
342      node.Attributes.Append(attr);
343      node.AppendChild(PersistenceManager.Persist("Job", job, document, persistedObjects));
344      return node;
345    }
346
347    public override void Populate(System.Xml.XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
348      base.Populate(node, restoredObjects);
349      HiveServerUrl = node.Attributes["HiveServerUrl"].Value;
350      job = (Job)PersistenceManager.Restore(node.SelectSingleNode("Job"), restoredObjects);
351    }
[1432]352  }
353}
Note: See TracBrowser for help on using the repository browser.