Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Engine/3.2/HiveEngine.cs @ 2910

Last change on this file since 2910 was 2846, checked in by kgrading, 14 years ago

various improvements (#828) concerning the stability of the server

File size: 12.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using HeuristicLab.Core;
26using System.Threading;
27using System.IO;
28using System.Xml;
29using System.IO.Compression;
30using HeuristicLab.Common;
31using HeuristicLab.Hive.JobBase;
32using HeuristicLab.Hive.Contracts.Interfaces;
33using HeuristicLab.Hive.Contracts;
34using HeuristicLab.PluginInfrastructure;
35using HeuristicLab.Hive.Contracts.BusinessObjects;
36using HeuristicLab.Tracing;
37
38namespace HeuristicLab.Hive.Engine {
39  /// <summary>
40  /// Represents an engine that executes its operator-graph on the hive.
41  /// in parallel.
42  /// </summary>
43  public class HiveEngine : ItemBase, IEngine, IEditable {
44    private const int SNAPSHOT_POLLING_INTERVAL_MS = 1000;
45    private const int RESULT_POLLING_INTERVAL_MS = 10000;
46    private const int MAX_SNAPSHOT_RETRIES = 20;
47    private Guid jobId;
48    private Job job;
49    private object locker = new object();
50    private volatile bool abortRequested;
51
52    public string HiveServerUrl { get; set; }
53    public string MultiSubmitCount { get; set; }
54
55    public HiveEngine() {
56      job = new Job();
57      abortRequested = false;
58      Priority = ThreadPriority.Lowest;
59    }
60
61    #region IEngine Members
62
63    public IOperatorGraph OperatorGraph {
64      get { return job.Engine.OperatorGraph; }
65    }
66
67    public IScope GlobalScope {
68      get { return job.Engine.GlobalScope; }
69    }
70
71    public TimeSpan ExecutionTime {
72      get { return job.Engine.ExecutionTime; }
73    }
74
75    public ThreadPriority Priority {
76      get { return job.Engine.Priority; }
77      set { job.Engine.Priority = value; }
78    }
79
80    public bool Running {
81      get { return job.Engine.Running; }
82    }
83
84    public bool Canceled {
85      get { return job.Engine.Canceled; }
86    }
87
88    public bool Terminated {
89      get { return job.Engine.Terminated; }
90    }
91
92    public void Execute() {
93      var jobObj = CreateJobObj();
94      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
95     
96      int loops = 1;
97     
98      Int32.TryParse(MultiSubmitCount, out loops);
99      if(loops == 0)
100        loops = 1;
101
102      for (int i = 0; i < loops; i++) {
103        ResponseObject<Contracts.BusinessObjects.Job> res = executionEngineFacade.AddJob(jobObj);
104        jobId = res.Obj.Id;
105      }
106     
107      StartResultPollingThread();
108    }
109
110    private void StartResultPollingThread() {
111      // start a backgroud thread to poll the final result of the job
112      Thread t = new Thread(() => {
113        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
114        ResponseObject<SerializedJobResult> response = null;
115        Job restoredJob = null;
116        do {
117          Thread.Sleep(RESULT_POLLING_INTERVAL_MS);
118          lock (locker) {
119            HiveLogger.Debug("HiveEngine: Results-polling - GetLastResult");
120            response = executionEngineFacade.GetLastSerializedResult(jobId, false);
121            HiveLogger.Debug("HiveEngine: Results-polling - Server: " + response.StatusMessage + " success: " + response.Success);
122            // loop while
123            // 1. the user doesn't request an abort
124            // 2. there is a problem with server communication (success==false)
125            // 3. no result for the job is available yet (response.Obj==null)
126            // 4. the result that we get from the server is a snapshot and not the final result
127            if (abortRequested) return;
128            if (response.Success && response.Obj != null) {
129              HiveLogger.Debug("HiveEngine: Results-polling - Got result!");
130              restoredJob = (Job)PersistenceManager.RestoreFromGZip(response.Obj.SerializedJobResultData);
131              HiveLogger.Debug("HiveEngine: Results-polling - IsSnapshotResult: " + (restoredJob.Progress<1.0));
132            }
133          }
134        } while (restoredJob == null || (restoredJob.Progress < 1.0));
135
136        job = restoredJob;
137        ControlManager.Manager.ShowControl(job.Engine.CreateView());
138        OnChanged();
139        OnFinished();
140      });
141      HiveLogger.Debug("HiveEngine: Starting results-polling thread");
142      t.Start();
143    }
144
145    public void RequestSnapshot() {
146      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
147      int retryCount = 0;
148      ResponseObject<SerializedJobResult> response;
149      lock (locker) {
150        HiveLogger.Debug("HiveEngine: Abort - RequestSnapshot");
151        Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
152        if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) {
153          // job is finished already
154          HiveLogger.Debug("HiveEngine: Abort - GetLastResult(false)");
155          response = executionEngineFacade.GetLastSerializedResult(jobId, false);
156          HiveLogger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
157        } else {
158          // server sent snapshot request to client
159          // poll until snapshot is ready
160          do {
161            Thread.Sleep(SNAPSHOT_POLLING_INTERVAL_MS);
162            HiveLogger.Debug("HiveEngine: Abort - GetLastResult(true)");
163            response = executionEngineFacade.GetLastSerializedResult(jobId, true);
164            HiveLogger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
165            retryCount++;
166            // loop while
167            // 1. problem with communication with server
168            // 2. job result not yet ready
169          } while (
170            (retryCount < MAX_SNAPSHOT_RETRIES) && (
171            !response.Success ||
172            response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE)
173            );
174        }
175      }
176      SerializedJobResult jobResult = response.Obj;
177      if (jobResult != null) {
178        HiveLogger.Debug("HiveEngine: Results-polling - Got result!");
179        job = (Job)PersistenceManager.RestoreFromGZip(jobResult.SerializedJobResultData);
180        ControlManager.Manager.ShowControl(job.Engine.CreateView());
181      }
182      //HiveLogger.Debug("HiveEngine: Results-polling - Exception!");
183      //Exception ex = new Exception(response.Obj.Exception.Message);
184      //ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
185    }
186
187    public void ExecuteStep() {
188      throw new NotSupportedException();
189    }
190
191    public void ExecuteSteps(int steps) {
192      throw new NotSupportedException();
193    }
194
195    public void Abort() {
196      abortRequested = true;
197      // RequestSnapshot();
198      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
199      executionEngineFacade.AbortJob(jobId);
200      OnChanged();
201      OnFinished();
202    }
203
204    public void Reset() {
205      abortRequested = false;
206      job.Engine.Reset();
207      jobId = Guid.NewGuid();
208      OnInitialized();
209    }
210
211    private HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob CreateJobObj() {
212      HeuristicLab.Hive.Contracts.BusinessObjects.Job jobObj =
213        new HeuristicLab.Hive.Contracts.BusinessObjects.Job();
214
215      MemoryStream memStream = new MemoryStream();
216      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
217      XmlDocument document = PersistenceManager.CreateXmlDocument();
218      Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
219      XmlNode rootNode = document.CreateElement("Root");
220      document.AppendChild(rootNode);
221      rootNode.AppendChild(PersistenceManager.Persist(job, document, dictionary));
222      document.Save(stream);
223      stream.Close();
224
225      HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob executableJobObj =
226        new HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob();
227      executableJobObj.JobInfo = jobObj;
228      executableJobObj.SerializedJobData = memStream.ToArray();
229
230      List<IPluginDescription> plugins = new List<IPluginDescription>();
231
232      foreach (IStorable storeable in dictionary.Values) {
233        IPluginDescription pluginInfo = ApplicationManager.Manager.GetDeclaringPlugin(storeable.GetType());
234        if (!plugins.Contains(pluginInfo)) {
235          plugins.Add(pluginInfo);
236          foreach (var dependency in pluginInfo.Dependencies) {
237            if (!plugins.Contains(dependency)) plugins.Add(dependency);
238          }
239        }
240      }
241
242      List<HivePluginInfo> pluginsNeeded =
243        new List<HivePluginInfo>();
244      foreach (IPluginDescription uniquePlugin in plugins) {
245        HivePluginInfo pluginInfo =
246          new HivePluginInfo();
247        pluginInfo.Name = uniquePlugin.Name;
248        pluginInfo.Version = uniquePlugin.Version.ToString();
249        pluginInfo.BuildDate = uniquePlugin.BuildDate;
250        pluginsNeeded.Add(pluginInfo);
251      }
252
253      jobObj.CoresNeeded = 1;
254      jobObj.PluginsNeeded = pluginsNeeded;
255      jobObj.State = HeuristicLab.Hive.Contracts.BusinessObjects.State.offline;
256      return executableJobObj;
257    }
258
259    public event EventHandler Initialized;
260    /// <summary>
261    /// Fires a new <c>Initialized</c> event.
262    /// </summary>
263    protected virtual void OnInitialized() {
264      if (Initialized != null)
265        Initialized(this, new EventArgs());
266    }
267
268    public event EventHandler<EventArgs<IOperation>> OperationExecuted;
269    /// <summary>
270    /// Fires a new <c>OperationExecuted</c> event.
271    /// </summary>
272    /// <param name="operation">The operation that has been executed.</param>
273    protected virtual void OnOperationExecuted(IOperation operation) {
274      if (OperationExecuted != null)
275        OperationExecuted(this, new EventArgs<IOperation>(operation));
276    }
277
278    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
279    /// <summary>
280    /// Aborts the execution and fires a new <c>ExceptionOccurred</c> event.
281    /// </summary>
282    /// <param name="exception">The exception that was thrown.</param>
283    protected virtual void OnExceptionOccurred(Exception exception) {
284      Abort();
285      if (ExceptionOccurred != null)
286        ExceptionOccurred(this, new EventArgs<Exception>(exception));
287    }
288
289    public event EventHandler ExecutionTimeChanged;
290    /// <summary>
291    /// Fires a new <c>ExecutionTimeChanged</c> event.
292    /// </summary>
293    protected virtual void OnExecutionTimeChanged() {
294      if (ExecutionTimeChanged != null)
295        ExecutionTimeChanged(this, new EventArgs());
296    }
297
298    public event EventHandler Finished;
299    /// <summary>
300    /// Fires a new <c>Finished</c> event.
301    /// </summary>
302    protected virtual void OnFinished() {
303      if (Finished != null)
304        Finished(this, new EventArgs());
305    }
306
307    #endregion
308
309    public override IView CreateView() {
310      return new HiveEngineEditor(this);
311    }
312
313    #region IEditable Members
314
315    public IEditor CreateEditor() {
316      return new HiveEngineEditor(this);
317    }
318    #endregion
319
320    public override System.Xml.XmlNode GetXmlNode(string name, System.Xml.XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
321      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
322      XmlAttribute attr = document.CreateAttribute("HiveServerUrl");
323      attr.Value = HiveServerUrl;
324      node.Attributes.Append(attr);
325      node.AppendChild(PersistenceManager.Persist("Job", job, document, persistedObjects));
326      return node;
327    }
328
329    public override void Populate(System.Xml.XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
330      base.Populate(node, restoredObjects);
331      HiveServerUrl = node.Attributes["HiveServerUrl"].Value;
332      job = (Job)PersistenceManager.Restore(node.SelectSingleNode("Job"), restoredObjects);
333    }
334  }
335}
Note: See TracBrowser for help on using the repository browser.