Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Engine/3.2/HiveEngine.cs @ 3018

Last change on this file since 3018 was 3018, checked in by kgrading, 14 years ago

added Priority and resource restricted scheduling (#907)

File size: 13.0 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using HeuristicLab.Core;
26using System.Threading;
27using System.IO;
28using System.Xml;
29using System.IO.Compression;
30using HeuristicLab.Common;
31using HeuristicLab.Hive.JobBase;
32using HeuristicLab.Hive.Contracts.Interfaces;
33using HeuristicLab.Hive.Contracts;
34using HeuristicLab.PluginInfrastructure;
35using HeuristicLab.Hive.Contracts.BusinessObjects;
36using HeuristicLab.Tracing;
37
38namespace HeuristicLab.Hive.Engine {
39  /// <summary>
40  /// Represents an engine that executes its operator-graph on the hive.
41  /// in parallel.
42  /// </summary>
43  public class HiveEngine : ItemBase, IEngine, IEditable {
44    private const int SNAPSHOT_POLLING_INTERVAL_MS = 1000;
45    private const int RESULT_POLLING_INTERVAL_MS = 10000;
46    private const int MAX_SNAPSHOT_RETRIES = 20;
47    private Guid jobId;
48    private Job job;
49    private object locker = new object();
50    private volatile bool abortRequested;
51
52    public string HiveServerUrl { get; set; }
53    public string MultiSubmitCount { get; set; }
54    public string RessourceIds { get; set; }
55
56    public HiveEngine() {
57      job = new Job();
58      abortRequested = false;
59      Priority = ThreadPriority.Lowest;
60    }
61
62    #region IEngine Members
63
64    public IOperatorGraph OperatorGraph {
65      get { return job.Engine.OperatorGraph; }
66    }
67
68    public IScope GlobalScope {
69      get { return job.Engine.GlobalScope; }
70    }
71
72    public TimeSpan ExecutionTime {
73      get { return job.Engine.ExecutionTime; }
74    }
75
76    public ThreadPriority Priority {
77      get { return job.Engine.Priority; }
78      set { job.Engine.Priority = value; }
79    }
80
81    public bool Running {
82      get { return job.Engine.Running; }
83    }
84
85    public bool Canceled {
86      get { return job.Engine.Canceled; }
87    }
88
89    public bool Terminated {
90      get { return job.Engine.Terminated; }
91    }
92
93    public void Execute() {
94      var jobObj = CreateJobObj();
95      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
96     
97      if(!String.Empty.Equals(RessourceIds)) {
98        String[] ids = RessourceIds.Split(';');
99        foreach (string sid in ids) {       
100          try {
101            System.Guid gid = new Guid(sid);
102            jobObj.JobInfo.AssignedResourceIds.Add(gid);
103          } catch(Exception ex) {
104           
105          }   
106        }
107      }     
108
109      int loops = 1;
110     
111      Int32.TryParse(MultiSubmitCount, out loops);
112      if(loops == 0)
113        loops = 1;
114
115      for (int i = 0; i < loops; i++) {
116        ResponseObject<Contracts.BusinessObjects.JobDto> res = executionEngineFacade.AddJob(jobObj);
117        jobId = res.Obj.Id;
118      }
119     
120      StartResultPollingThread();
121    }
122
123    private void StartResultPollingThread() {
124      // start a backgroud thread to poll the final result of the job
125      Thread t = new Thread(() => {
126        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
127        ResponseObject<SerializedJob> response = null;
128        Job restoredJob = null;
129        do {
130          Thread.Sleep(RESULT_POLLING_INTERVAL_MS);
131          lock (locker) {
132            HiveLogger.Debug("HiveEngine: Results-polling - GetLastResult");
133            response = executionEngineFacade.GetLastSerializedResult(jobId, false);
134            HiveLogger.Debug("HiveEngine: Results-polling - Server: " + response.StatusMessage + " success: " + response.Success);
135            // loop while
136            // 1. the user doesn't request an abort
137            // 2. there is a problem with server communication (success==false)
138            // 3. no result for the job is available yet (response.Obj==null)
139            // 4. the result that we get from the server is a snapshot and not the final result
140            if (abortRequested) return;
141            if (response.Success && response.Obj != null) {
142              HiveLogger.Debug("HiveEngine: Results-polling - Got result!");
143              restoredJob = (Job)PersistenceManager.RestoreFromGZip(response.Obj.SerializedJobData);
144              HiveLogger.Debug("HiveEngine: Results-polling - IsSnapshotResult: " + (restoredJob.Progress<1.0));
145            }
146          }
147        } while (restoredJob == null || (restoredJob.Progress < 1.0));
148
149        job = restoredJob;
150        ControlManager.Manager.ShowControl(job.Engine.CreateView());
151        OnChanged();
152        OnFinished();
153      });
154      HiveLogger.Debug("HiveEngine: Starting results-polling thread");
155      t.Start();
156    }
157
158    public void RequestSnapshot() {
159      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
160      int retryCount = 0;
161      ResponseObject<SerializedJob> response;
162      lock (locker) {
163        HiveLogger.Debug("HiveEngine: Abort - RequestSnapshot");
164        Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
165        if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) {
166          // job is finished already
167          HiveLogger.Debug("HiveEngine: Abort - GetLastResult(false)");
168          response = executionEngineFacade.GetLastSerializedResult(jobId, false);
169          HiveLogger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
170        } else {
171          // server sent snapshot request to client
172          // poll until snapshot is ready
173          do {
174            Thread.Sleep(SNAPSHOT_POLLING_INTERVAL_MS);
175            HiveLogger.Debug("HiveEngine: Abort - GetLastResult(true)");
176            response = executionEngineFacade.GetLastSerializedResult(jobId, true);
177            HiveLogger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
178            retryCount++;
179            // loop while
180            // 1. problem with communication with server
181            // 2. job result not yet ready
182          } while (
183            (retryCount < MAX_SNAPSHOT_RETRIES) && (
184            !response.Success ||
185            response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE)
186            );
187        }
188      }
189      SerializedJob jobResult = response.Obj;
190      if (jobResult != null) {
191        HiveLogger.Debug("HiveEngine: Results-polling - Got result!");
192        job = (Job)PersistenceManager.RestoreFromGZip(jobResult.SerializedJobData);
193        ControlManager.Manager.ShowControl(job.Engine.CreateView());
194      }
195      //HiveLogger.Debug("HiveEngine: Results-polling - Exception!");
196      //Exception ex = new Exception(response.Obj.Exception.Message);
197      //ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
198    }
199
200    public void ExecuteStep() {
201      throw new NotSupportedException();
202    }
203
204    public void ExecuteSteps(int steps) {
205      throw new NotSupportedException();
206    }
207
208    public void Abort() {
209      abortRequested = true;
210      // RequestSnapshot();
211      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
212      executionEngineFacade.AbortJob(jobId);
213      OnChanged();
214      OnFinished();
215    }
216
217    public void Reset() {
218      abortRequested = false;
219      job.Engine.Reset();
220      jobId = Guid.NewGuid();
221      OnInitialized();
222    }
223
224    private HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob CreateJobObj() {
225      HeuristicLab.Hive.Contracts.BusinessObjects.JobDto jobObj =
226        new HeuristicLab.Hive.Contracts.BusinessObjects.JobDto();
227
228      MemoryStream memStream = new MemoryStream();
229      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
230      XmlDocument document = PersistenceManager.CreateXmlDocument();
231      Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
232      XmlNode rootNode = document.CreateElement("Root");
233      document.AppendChild(rootNode);
234      rootNode.AppendChild(PersistenceManager.Persist(job, document, dictionary));
235      document.Save(stream);
236      stream.Close();
237
238      HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob executableJobObj =
239        new HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob();
240      executableJobObj.JobInfo = jobObj;
241      executableJobObj.SerializedJobData = memStream.ToArray();
242
243      List<IPluginDescription> plugins = new List<IPluginDescription>();
244
245      foreach (IStorable storeable in dictionary.Values) {
246        IPluginDescription pluginInfo = ApplicationManager.Manager.GetDeclaringPlugin(storeable.GetType());
247        if (!plugins.Contains(pluginInfo)) {
248          plugins.Add(pluginInfo);
249          foreach (var dependency in pluginInfo.Dependencies) {
250            if (!plugins.Contains(dependency)) plugins.Add(dependency);
251          }
252        }
253      }
254
255      List<HivePluginInfoDto> pluginsNeeded =
256        new List<HivePluginInfoDto>();
257      foreach (IPluginDescription uniquePlugin in plugins) {
258        HivePluginInfoDto pluginInfo =
259          new HivePluginInfoDto();
260        pluginInfo.Name = uniquePlugin.Name;
261        pluginInfo.Version = uniquePlugin.Version.ToString();
262        pluginInfo.BuildDate = uniquePlugin.BuildDate;
263        pluginsNeeded.Add(pluginInfo);
264      }
265
266      jobObj.CoresNeeded = 1;
267      jobObj.PluginsNeeded = pluginsNeeded;
268      jobObj.State = HeuristicLab.Hive.Contracts.BusinessObjects.State.offline;
269      return executableJobObj;
270    }
271
272    public event EventHandler Initialized;
273    /// <summary>
274    /// Fires a new <c>Initialized</c> event.
275    /// </summary>
276    protected virtual void OnInitialized() {
277      if (Initialized != null)
278        Initialized(this, new EventArgs());
279    }
280
281    public event EventHandler<EventArgs<IOperation>> OperationExecuted;
282    /// <summary>
283    /// Fires a new <c>OperationExecuted</c> event.
284    /// </summary>
285    /// <param name="operation">The operation that has been executed.</param>
286    protected virtual void OnOperationExecuted(IOperation operation) {
287      if (OperationExecuted != null)
288        OperationExecuted(this, new EventArgs<IOperation>(operation));
289    }
290
291    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
292    /// <summary>
293    /// Aborts the execution and fires a new <c>ExceptionOccurred</c> event.
294    /// </summary>
295    /// <param name="exception">The exception that was thrown.</param>
296    protected virtual void OnExceptionOccurred(Exception exception) {
297      Abort();
298      if (ExceptionOccurred != null)
299        ExceptionOccurred(this, new EventArgs<Exception>(exception));
300    }
301
302    public event EventHandler ExecutionTimeChanged;
303    /// <summary>
304    /// Fires a new <c>ExecutionTimeChanged</c> event.
305    /// </summary>
306    protected virtual void OnExecutionTimeChanged() {
307      if (ExecutionTimeChanged != null)
308        ExecutionTimeChanged(this, new EventArgs());
309    }
310
311    public event EventHandler Finished;
312    /// <summary>
313    /// Fires a new <c>Finished</c> event.
314    /// </summary>
315    protected virtual void OnFinished() {
316      if (Finished != null)
317        Finished(this, new EventArgs());
318    }
319
320    #endregion
321
322    public override IView CreateView() {
323      return new HiveEngineEditor(this);
324    }
325
326    #region IEditable Members
327
328    public IEditor CreateEditor() {
329      return new HiveEngineEditor(this);
330    }
331    #endregion
332
333    public override System.Xml.XmlNode GetXmlNode(string name, System.Xml.XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
334      XmlNode node = base.GetXmlNode(name, document, persistedObjects);
335      XmlAttribute attr = document.CreateAttribute("HiveServerUrl");
336      attr.Value = HiveServerUrl;
337      node.Attributes.Append(attr);
338      node.AppendChild(PersistenceManager.Persist("Job", job, document, persistedObjects));
339      return node;
340    }
341
342    public override void Populate(System.Xml.XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
343      base.Populate(node, restoredObjects);
344      HiveServerUrl = node.Attributes["HiveServerUrl"].Value;
345      job = (Job)PersistenceManager.Restore(node.SelectSingleNode("Job"), restoredObjects);
346    }
347  }
348}
Note: See TracBrowser for help on using the repository browser.