Free cookie consent management tool by TermsFeed Policy Generator

source: branches/Persistence Test/HeuristicLab.Hive.Engine/3.2/HiveEngine.cs @ 4021

Last change on this file since 4021 was 2474, checked in by swagner, 15 years ago

Implemented generic EventArgs (#796)

File size: 12.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using HeuristicLab.Core;
26using System.Threading;
27using System.IO;
28using System.Xml;
29using System.IO.Compression;
30using HeuristicLab.Common;
31using HeuristicLab.Hive.JobBase;
32using HeuristicLab.Hive.Contracts.Interfaces;
33using HeuristicLab.Hive.Contracts;
34using HeuristicLab.PluginInfrastructure;
35using HeuristicLab.Hive.Contracts.BusinessObjects;
36using HeuristicLab.Tracing;
37
38namespace HeuristicLab.Hive.Engine {
39  /// <summary>
40  /// Represents an engine that executes its operator-graph on the hive.
41  /// in parallel.
42  /// </summary>
43  public class HiveEngine : ItemBase, IEngine, IEditable {
44    private const int SNAPSHOT_POLLING_INTERVAL_MS = 1000;
45    private const int RESULT_POLLING_INTERVAL_MS = 10000;
46    private const int MAX_SNAPSHOT_RETRIES = 20;
47    private Guid jobId;
48    private Job job;
49    private object locker = new object();
50    private volatile bool abortRequested;
51
52    public string HiveServerUrl { get; set; }
53
54    public HiveEngine() {
55      job = new Job();
56      abortRequested = false;
57      Priority = ThreadPriority.Lowest;
58    }
59
60    #region IEngine Members
61
62    public IOperatorGraph OperatorGraph {
63      get { return job.Engine.OperatorGraph; }
64    }
65
66    public IScope GlobalScope {
67      get { return job.Engine.GlobalScope; }
68    }
69
70    public TimeSpan ExecutionTime {
71      get { return job.Engine.ExecutionTime; }
72    }
73
74    public ThreadPriority Priority {
75      get { return job.Engine.Priority; }
76      set { job.Engine.Priority = value; }
77    }
78
79    public bool Running {
80      get { return job.Engine.Running; }
81    }
82
83    public bool Canceled {
84      get { return job.Engine.Canceled; }
85    }
86
87    public bool Terminated {
88      get { return job.Engine.Terminated; }
89    }
90
91    public void Execute() {
92      var jobObj = CreateJobObj();
93      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
94      ResponseObject<Contracts.BusinessObjects.Job> res = executionEngineFacade.AddJob(jobObj);
95      jobId = res.Obj.Id;
96
97      StartResultPollingThread();
98    }
99
100    private void StartResultPollingThread() {
101      // start a backgroud thread to poll the final result of the job
102      Thread t = new Thread(() => {
103        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
104        ResponseObject<SerializedJobResult> response = null;
105        Job restoredJob = null;
106        do {
107          Thread.Sleep(RESULT_POLLING_INTERVAL_MS);
108          lock (locker) {
109            HiveLogger.Debug("HiveEngine: Results-polling - GetLastResult");
110            response = executionEngineFacade.GetLastSerializedResult(jobId, false);
111            HiveLogger.Debug("HiveEngine: Results-polling - Server: " + response.StatusMessage + " success: " + response.Success);
112            // loop while
113            // 1. the user doesn't request an abort
114            // 2. there is a problem with server communication (success==false)
115            // 3. no result for the job is available yet (response.Obj==null)
116            // 4. the result that we get from the server is a snapshot and not the final result
117            if (abortRequested) return;
118            if (response.Success && response.Obj != null) {
119              HiveLogger.Debug("HiveEngine: Results-polling - Got result!");
120              restoredJob = (Job)PersistenceManager.RestoreFromGZip(response.Obj.SerializedJobResultData);
121              HiveLogger.Debug("HiveEngine: Results-polling - IsSnapshotResult: " + (restoredJob.Progress<1.0));
122            }
123          }
124        } while (restoredJob == null || (restoredJob.Progress < 1.0));
125
126        job = restoredJob;
127        PluginManager.ControlManager.ShowControl(job.Engine.CreateView());
128        OnChanged();
129        OnFinished();
130      });
131      HiveLogger.Debug("HiveEngine: Starting results-polling thread");
132      t.Start();
133    }
134
135    public void RequestSnapshot() {
136      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
137      int retryCount = 0;
138      ResponseObject<SerializedJobResult> response;
139      lock (locker) {
140        HiveLogger.Debug("HiveEngine: Abort - RequestSnapshot");
141        Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
142        if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) {
143          // job is finished already
144          HiveLogger.Debug("HiveEngine: Abort - GetLastResult(false)");
145          response = executionEngineFacade.GetLastSerializedResult(jobId, false);
146          HiveLogger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
147        } else {
148          // server sent snapshot request to client
149          // poll until snapshot is ready
150          do {
151            Thread.Sleep(SNAPSHOT_POLLING_INTERVAL_MS);
152            HiveLogger.Debug("HiveEngine: Abort - GetLastResult(true)");
153            response = executionEngineFacade.GetLastSerializedResult(jobId, true);
154            HiveLogger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
155            retryCount++;
156            // loop while
157            // 1. problem with communication with server
158            // 2. job result not yet ready
159          } while (
160            (retryCount < MAX_SNAPSHOT_RETRIES) && (
161            !response.Success ||
162            response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE)
163            );
164        }
165      }
166      SerializedJobResult jobResult = response.Obj;
167      if (jobResult != null) {
168        HiveLogger.Debug("HiveEngine: Results-polling - Got result!");
169        job = (Job)PersistenceManager.RestoreFromGZip(jobResult.SerializedJobResultData);
170        PluginManager.ControlManager.ShowControl(job.Engine.CreateView());
171      }
172      //HiveLogger.Debug("HiveEngine: Results-polling - Exception!");
173      //Exception ex = new Exception(response.Obj.Exception.Message);
174      //ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
175    }
176
177    public void ExecuteStep() {
178      throw new NotSupportedException();
179    }
180
181    public void ExecuteSteps(int steps) {
182      throw new NotSupportedException();
183    }
184
185    public void Abort() {
186      abortRequested = true;
187      // RequestSnapshot();
188      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
189      executionEngineFacade.AbortJob(jobId);
190      OnChanged();
191      OnFinished();
192    }
193
194    public void Reset() {
195      abortRequested = false;
196      job.Engine.Reset();
197      jobId = Guid.NewGuid();
198      OnInitialized();
199    }
200
201    private HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob CreateJobObj() {
202      HeuristicLab.Hive.Contracts.BusinessObjects.Job jobObj =
203        new HeuristicLab.Hive.Contracts.BusinessObjects.Job();
204
205      MemoryStream memStream = new MemoryStream();
206      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
207      XmlDocument document = PersistenceManager.CreateXmlDocument();
208      Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
209      XmlNode rootNode = document.CreateElement("Root");
210      document.AppendChild(rootNode);
211      rootNode.AppendChild(PersistenceManager.Persist(job, document, dictionary));
212      document.Save(stream);
213      stream.Close();
214
215      HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob executableJobObj =
216        new HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob();
217      executableJobObj.JobInfo = jobObj;
218      executableJobObj.SerializedJobData = memStream.ToArray();
219
220      DiscoveryService service = new DiscoveryService();
221      List<PluginInfo> plugins = new List<PluginInfo>();
222
223      foreach (IStorable storeable in dictionary.Values) {
224        PluginInfo pluginInfo = service.GetDeclaringPlugin(storeable.GetType());
225        if (!plugins.Contains(pluginInfo)) {
226          plugins.Add(pluginInfo);
227          foreach (var dependency in pluginInfo.Dependencies) {
228            if (!plugins.Contains(dependency)) plugins.Add(dependency);
229          }
230        }
231      }
232
233      List<HivePluginInfo> pluginsNeeded =
234        new List<HivePluginInfo>();
235      foreach (PluginInfo uniquePlugin in plugins) {
236        HivePluginInfo pluginInfo =
237          new HivePluginInfo();
238        pluginInfo.Name = uniquePlugin.Name;
239        pluginInfo.Version = uniquePlugin.Version.ToString();
240        pluginInfo.BuildDate = uniquePlugin.BuildDate;
241        pluginsNeeded.Add(pluginInfo);
242      }
243
244      jobObj.CoresNeeded = 1;
245      jobObj.PluginsNeeded = pluginsNeeded;
246      jobObj.State = HeuristicLab.Hive.Contracts.BusinessObjects.State.offline;
247      return executableJobObj;
248    }
249
250    public event EventHandler Initialized;
251    /// <summary>
252    /// Fires a new <c>Initialized</c> event.
253    /// </summary>
254    protected virtual void OnInitialized() {
255      if (Initialized != null)
256        Initialized(this, new EventArgs());
257    }
258
259    public event EventHandler<EventArgs<IOperation>> OperationExecuted;
260    /// <summary>
261    /// Fires a new <c>OperationExecuted</c> event.
262    /// </summary>
263    /// <param name="operation">The operation that has been executed.</param>
264    protected virtual void OnOperationExecuted(IOperation operation) {
265      if (OperationExecuted != null)
266        OperationExecuted(this, new EventArgs<IOperation>(operation));
267    }
268
269    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
270    /// <summary>
271    /// Aborts the execution and fires a new <c>ExceptionOccurred</c> event.
272    /// </summary>
273    /// <param name="exception">The exception that was thrown.</param>
274    protected virtual void OnExceptionOccurred(Exception exception) {
275      Abort();
276      if (ExceptionOccurred != null)
277        ExceptionOccurred(this, new EventArgs<Exception>(exception));
278    }
279
280    public event EventHandler ExecutionTimeChanged;
281    /// <summary>
282    /// Fires a new <c>ExecutionTimeChanged</c> event.
283    /// </summary>
284    protected virtual void OnExecutionTimeChanged() {
285      if (ExecutionTimeChanged != null)
286        ExecutionTimeChanged(this, new EventArgs());
287    }
288
289    public event EventHandler Finished;
290    /// <summary>
291    /// Fires a new <c>Finished</c> event.
292    /// </summary>
293    protected virtual void OnFinished() {
294      if (Finished != null)
295        Finished(this, new EventArgs());
296    }
297
298    #endregion
299
300    public override IView CreateView() {
301      return new HiveEngineEditor(this);
302    }
303
304    #region IEditable Members
305
306    public IEditor CreateEditor() {
307      return new HiveEngineEditor(this);
308    }
309    #endregion
310
311    public override System.Xml.XmlNode GetXmlNode(string name, System.Xml.XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
312      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
313      XmlAttribute attr = document.CreateAttribute("HiveServerUrl");
314      attr.Value = HiveServerUrl;
315      node.Attributes.Append(attr);
316      node.AppendChild(PersistenceManager.Persist("Job", job, document, persistedObjects));
317      return node;
318    }
319
320    public override void Populate(System.Xml.XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
321      base.Populate(node, restoredObjects);
322      HiveServerUrl = node.Attributes["HiveServerUrl"].Value;
323      job = (Job)PersistenceManager.Restore(node.SelectSingleNode("Job"), restoredObjects);
324    }
325  }
326}
Note: See TracBrowser for help on using the repository browser.