Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Engine/3.3/HiveEngine.cs @ 4107

Last change on this file since 4107 was 4107, checked in by cneumuel, 14 years ago

migration from 3.2 to 3.3 completed. Hive Server and Client are now executable and as functional as they were in 3.2. (#1096)

File size: 15.2 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using HeuristicLab.Core;
26using System.Threading;
27using System.IO;
28using System.Xml;
29using System.IO.Compression;
30using System.Linq;
31using HeuristicLab.Common;
32using HeuristicLab.Hive.JobBase;
33using HeuristicLab.Hive.Contracts.Interfaces;
34using HeuristicLab.Hive.Contracts;
35using HeuristicLab.PluginInfrastructure;
36using HeuristicLab.Hive.Contracts.BusinessObjects;
37using HeuristicLab.Tracing;
38using HeuristicLab.Persistence.Default.Xml;
39using System.Reflection;
40
41namespace HeuristicLab.Hive.Engine {
42  /// <summary>
43  /// Represents an engine that executes its operator-graph on the hive.
44  /// in parallel.
45  /// </summary>
46  public class HiveEngine : HeuristicLab.Core.Engine {
47    private const int SNAPSHOT_POLLING_INTERVAL_MS = 1000;
48    private const int RESULT_POLLING_INTERVAL_MS = 10000;
49    private const int MAX_SNAPSHOT_RETRIES = 20;
50    private Guid jobId;
51    private Job job;
52    private object locker = new object();
53    private volatile bool abortRequested;
54    private ThreadPriority priority;
55
56    public string HiveServerUrl { get; set; }
57    public string MultiSubmitCount { get; set; }
58    public string RessourceIds { get; set; }
59
60    public HiveEngine() {
61      // <debug>
62      HiveServerUrl = "net.tcp://10.42.1.153:9000/ExecutionEngine/ExecutionEngineFacade";
63      RessourceIds = "MyGroup";
64      // </debug>
65
66      job = new Job();
67      abortRequested = false;
68      Priority = ThreadPriority.Lowest;
69    }
70
71    //public OperatorGraph OperatorGraph {
72    //  get { return job.Engine.OperatorGraph; }
73    //}
74
75    //public IScope GlobalScope {
76    //  get { return job.Engine.GlobalScope; }
77    //}
78
79    public TimeSpan ExecutionTime {
80      get { return job.Engine.ExecutionTime; }
81    }
82
83    public ThreadPriority Priority {
84      get { return this.priority; }
85      set { this.priority = value; }
86    }
87
88    //public bool Running {
89    //  get { return job.Engine.ExecutionState == Core.ExecutionState.Started; }
90    //}
91
92    //public bool Canceled {
93    //  get { return job.Engine.Canceled; }
94    //}
95
96    //public bool Terminated {
97    //  get { return job.Engine.Terminated; }
98    //}
99
100    public ExecutionState ExecutionState {
101      get { return job.Engine.ExecutionState; }
102    }
103
104    protected override void ProcessNextOperation() {
105      var jobObj = CreateJobObj();
106      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
107
108      List<string> groups = new List<string>();
109      if (!String.Empty.Equals(RessourceIds)) {
110        groups.AddRange(RessourceIds.Split(';'));
111      }
112
113      /*if(!String.Empty.Equals(RessourceIds)) {
114        String[] ids = RessourceIds.Split(';');
115        foreach (string sid in ids) {       
116          try {
117            System.Guid gid = new Guid(sid);
118            jobObj.JobInfo.AssignedResourceIds.Add(gid);
119          } catch(Exception ex) {
120           
121          }   
122        }
123      } */
124
125      int loops = 1;
126
127      Int32.TryParse(MultiSubmitCount, out loops);
128      if (loops == 0)
129        loops = 1;
130
131      for (int i = 0; i < loops; i++) {
132        ResponseObject<Contracts.BusinessObjects.JobDto> res = executionEngineFacade.AddJobWithGroupStrings(jobObj, groups);
133        jobId = res.Obj.Id;
134      }
135
136      StartResultPollingThread();
137    }
138
139    private void StartResultPollingThread() {
140      // start a backgroud thread to poll the final result of the job
141      Thread t = new Thread(() => {
142        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
143        ResponseObject<SerializedJob> response = null;
144        Job restoredJob = null;
145        do {
146          Thread.Sleep(RESULT_POLLING_INTERVAL_MS);
147          lock (locker) {
148            Logger.Debug("HiveEngine: Results-polling - GetLastResult");
149            response = executionEngineFacade.GetLastSerializedResult(jobId, false, false);
150            Logger.Debug("HiveEngine: Results-polling - Server: " + response.StatusMessage + " success: " + response.Success);
151            // loop while
152            // 1. the user doesn't request an abort
153            // 2. there is a problem with server communication (success==false)
154            // 3. no result for the job is available yet (response.Obj==null)
155            // 4. the result that we get from the server is a snapshot and not the final result
156            if (abortRequested) return;
157            if (response.Success && response.Obj != null && response.StatusMessage != ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE) {
158              Logger.Debug("HiveEngine: Results-polling - Got result!");
159              restoredJob = XmlParser.Deserialize<Job>(new MemoryStream(response.Obj.SerializedJobData));
160              Logger.Debug("HiveEngine: Results-polling - IsSnapshotResult: " + (restoredJob.Progress < 1.0));
161            }
162          }
163        } while (restoredJob == null || (restoredJob.Progress < 1.0));
164
165        job = restoredJob;
166
167        //ControlManager.Manager.ShowControl(job.Engine.CreateView());
168        //OnChanged();
169
170        //OnFinished();
171      });
172      Logger.Debug("HiveEngine: Starting results-polling thread");
173      t.Start();
174    }
175
176    private HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob CreateJobObj() {
177      HeuristicLab.Hive.Contracts.BusinessObjects.JobDto jobObj = new HeuristicLab.Hive.Contracts.BusinessObjects.JobDto();
178
179      // create xml document with <Root> as root-node and the persisted job as child
180      MemoryStream memStream = new MemoryStream();
181      XmlGenerator.Serialize(job, memStream);
182
183      // convert memStream to byte-array
184      HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob executableJobObj = new HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob();
185      executableJobObj.JobInfo = jobObj;
186      executableJobObj.SerializedJobData = memStream.ToArray();
187
188      // find out which which plugins are needed to recreate the type
189      List<HivePluginInfoDto> pluginsNeeded = (
190        from p in GetDeclaringPlugins(job.GetType())
191        select new HivePluginInfoDto() {
192          Name = p.Name,
193          Version = p.Version,
194          BuildDate = p.BuildDate,
195        }).ToList();
196
197      jobObj.CoresNeeded = 1;
198      jobObj.PluginsNeeded = pluginsNeeded;
199      jobObj.State = HeuristicLab.Hive.Contracts.BusinessObjects.State.offline;
200      return executableJobObj;
201    }
202
203    /// <summary>
204    /// Returns a list of plugins in which the type itself and all members
205    /// of the type are declared. Objectgraph is searched recursively.
206    /// </summary>
207    private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {
208      HashSet<Type> types = new HashSet<Type>();
209      FindTypes(type, types, "HeuristicLab.");
210      IEnumerable<IPluginDescription> plugins = from t in types
211                                                select ApplicationManager.Manager.GetDeclaringPlugin(t);
212      return plugins;
213    }
214
215    /// <summary>
216    /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart
217    /// </summary>
218    /// <param name="type">the type to be searched</param>
219    /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>
220    /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>
221    private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {
222
223      // search is not performed on attributes
224
225      if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {
226        types.Add(type);
227
228        // interfaces
229        foreach (Type t in type.GetInterfaces()) {
230          FindTypes(t, types, namespaceStart);
231        }
232
233        // events
234        foreach (EventInfo info in type.GetEvents()) {
235          FindTypes(info.EventHandlerType, types, namespaceStart);
236          FindTypes(info.DeclaringType, types, namespaceStart);
237        }
238
239        // properties
240        foreach (PropertyInfo info in type.GetProperties()) {
241          FindTypes(info.PropertyType, types, namespaceStart);
242        }
243
244        // fields
245        foreach (FieldInfo info in type.GetFields()) {
246          FindTypes(info.FieldType, types, namespaceStart);
247        }
248
249        // constructors : maybe constructors them out (?)
250        foreach (ConstructorInfo info in type.GetConstructors()) {
251          foreach (ParameterInfo paramInfo in info.GetParameters()) {
252            FindTypes(paramInfo.ParameterType, types, namespaceStart);
253          }
254        }
255
256        // methods
257        foreach (MethodInfo info in type.GetMethods()) {
258          foreach (ParameterInfo paramInfo in info.GetParameters()) {
259            FindTypes(paramInfo.ParameterType, types, namespaceStart);
260          }
261          FindTypes(info.ReturnType, types, namespaceStart);
262        }
263      }
264    }
265
266    //public event EventHandler Initialized;
267    ///// <summary>
268    ///// Fires a new <c>Initialized</c> event.
269    ///// </summary>
270    //protected virtual void OnInitialized() {
271    //  if (Initialized != null)
272    //    Initialized(this, new EventArgs());
273    //}
274
275    //public event EventHandler<EventArgs<IOperation>> OperationExecuted;
276    ///// <summary>
277    ///// Fires a new <c>OperationExecuted</c> event.
278    ///// </summary>
279    ///// <param name="operation">The operation that has been executed.</param>
280    //protected virtual void OnOperationExecuted(IOperation operation) {
281    //  if (OperationExecuted != null)
282    //    OperationExecuted(this, new EventArgs<IOperation>(operation));
283    //}
284
285    //public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
286    ///// <summary>
287    ///// Aborts the execution and fires a new <c>ExceptionOccurred</c> event.
288    ///// </summary>
289    ///// <param name="exception">The exception that was thrown.</param>
290    //protected virtual void OnExceptionOccurred(Exception exception) {
291    //  Abort();
292    //  if (ExceptionOccurred != null)
293    //    ExceptionOccurred(this, new EventArgs<Exception>(exception));
294    //}
295
296    //public event EventHandler ExecutionTimeChanged;
297    ///// <summary>
298    ///// Fires a new <c>ExecutionTimeChanged</c> event.
299    ///// </summary>
300    //protected virtual void OnExecutionTimeChanged() {
301    //  if (ExecutionTimeChanged != null)
302    //    ExecutionTimeChanged(this, new EventArgs());
303    //}
304
305    //public event EventHandler Finished;
306    ///// <summary>
307    ///// Fires a new <c>Finished</c> event.
308    ///// </summary>
309    //protected virtual void OnFinished() {
310    //  if (Finished != null)
311    //    Finished(this, new EventArgs());
312    //}
313
314    //public void Abort() {
315    //  abortRequested = true;
316    //  // RequestSnapshot();
317    //  IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
318    //  executionEngineFacade.AbortJob(jobId);
319    //  //OnChanged();
320    //  OnFinished();
321    //}
322
323    //public void Reset() {
324    //  abortRequested = false;
325
326    //  throw new NotImplementedException("[chn] how to reset Engine?");
327    //  //job.Engine.Reset();
328
329    //  jobId = Guid.NewGuid();
330    //  OnInitialized();
331    //}
332
333    //public void RequestSnapshot() {
334    //  IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
335    //  int retryCount = 0;
336    //  ResponseObject<SerializedJob> response;
337    //  lock (locker) {
338    //    Logger.Debug("HiveEngine: Abort - RequestSnapshot");
339    //    Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
340    //    if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) {
341    //      // job is finished already
342    //      Logger.Debug("HiveEngine: Abort - GetLastResult(false)");
343    //      response = executionEngineFacade.GetLastSerializedResult(jobId, false, false);
344    //      Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
345    //    } else {
346    //      // server sent snapshot request to client
347    //      // poll until snapshot is ready
348    //      do {
349    //        Thread.Sleep(SNAPSHOT_POLLING_INTERVAL_MS);
350    //        Logger.Debug("HiveEngine: Abort - GetLastResult(true)");
351    //        response = executionEngineFacade.GetLastSerializedResult(jobId, false, true);
352    //        Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
353    //        retryCount++;
354    //        // loop while
355    //        // 1. problem with communication with server
356    //        // 2. job result not yet ready
357    //      } while (
358    //        (retryCount < MAX_SNAPSHOT_RETRIES) && (
359    //        !response.Success ||
360    //        response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE)
361    //        );
362    //    }
363    //  }
364    //  SerializedJob jobResult = response.Obj;
365    //  if (jobResult != null) {
366    //    Logger.Debug("HiveEngine: Results-polling - Got result!");
367
368    //    //job = (Job)PersistenceManager.RestoreFromGZip(jobResult.SerializedJobData);
369    //    //ControlManager.Manager.ShowControl(job.Engine.CreateView());
370    //    throw new NotImplementedException("TODO[chn]use persistency-3.3");
371    //  }
372    //  //HiveLogger.Debug("HiveEngine: Results-polling - Exception!");
373    //  //Exception ex = new Exception(response.Obj.Exception.Message);
374    //  //ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
375    //}
376
377    //public override System.Xml.XmlNode GetXmlNode(string name, System.Xml.XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
378    //  XmlNode node = base.GetXmlNode(name, document, persistedObjects);
379    //  XmlAttribute attr = document.CreateAttribute("HiveServerUrl");
380    //  attr.Value = HiveServerUrl;
381    //  node.Attributes.Append(attr);
382    //  node.AppendChild(PersistenceManager.Persist("Job", job, document, persistedObjects));
383    //  return node;
384    //}
385
386    //public override void Populate(System.Xml.XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
387    //  base.Populate(node, restoredObjects);
388    //  HiveServerUrl = node.Attributes["HiveServerUrl"].Value;
389    //  job = (Job)PersistenceManager.Restore(node.SelectSingleNode("Job"), restoredObjects);
390    //}
391
392
393  }
394}
Note: See TracBrowser for help on using the repository browser.