Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Engine/3.3/HiveEngine.cs @ 4111

Last change on this file since 4111 was 4111, checked in by cneumuel, 14 years ago

added experiment plugins (#1115)

File size: 14.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Text;
25using HeuristicLab.Core;
26using System.Threading;
27using System.IO;
28using System.Xml;
29using System.IO.Compression;
30using System.Linq;
31using HeuristicLab.Common;
32using HeuristicLab.Hive.JobBase;
33using HeuristicLab.Hive.Contracts.Interfaces;
34using HeuristicLab.Hive.Contracts;
35using HeuristicLab.PluginInfrastructure;
36using HeuristicLab.Hive.Contracts.BusinessObjects;
37using HeuristicLab.Tracing;
38using HeuristicLab.Persistence.Default.Xml;
39using System.Reflection;
40
41namespace HeuristicLab.Hive.Engine {
42  /// <summary>
43  /// Represents an engine that executes its operator-graph on the hive.
44  /// in parallel.
45  /// </summary>
46  public class HiveEngine : HeuristicLab.Core.Engine {
47    private const int SNAPSHOT_POLLING_INTERVAL_MS = 1000;
48    private const int RESULT_POLLING_INTERVAL_MS = 10000;
49    private const int MAX_SNAPSHOT_RETRIES = 20;
50    private Guid jobId;
51    private Job job;
52    private object locker = new object();
53    private volatile bool abortRequested;
54    private ThreadPriority priority;
55
56    public string HiveServerUrl { get; set; }
57    public string MultiSubmitCount { get; set; }
58    public string RessourceIds { get; set; }
59
60    public HiveEngine() {
61      // <debug>
62      HiveServerUrl = "net.tcp://10.42.1.153:9000/ExecutionEngine/ExecutionEngineFacade";
63      RessourceIds = "MyGroup";
64      // </debug>
65
66      job = new Job();
67      abortRequested = false;
68      Priority = ThreadPriority.Lowest;
69    }
70
71    //public OperatorGraph OperatorGraph {
72    //  get { return currentJob.Engine.; }
73    //}
74
75    //public IScope GlobalScope {
76    //  get { return job.Engine.GlobalScope; }
77    //}
78
79    public ThreadPriority Priority {
80      get { return this.priority; }
81      set { this.priority = value; }
82    }
83
84    public new void Prepare(IOperation initialOperation) {
85      this.job.Prepare(initialOperation);
86      OnPrepared();
87    }
88
89    public override void Start() {
90      SerializedJob jobObj = CreateSerializedJob();
91      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
92
93      List<string> groups = new List<string>();
94      if (!String.IsNullOrEmpty(RessourceIds)) {
95        groups.AddRange(RessourceIds.Split(';'));
96      }
97
98      ResponseObject<JobDto> res = executionEngineFacade.AddJobWithGroupStrings(jobObj, groups);
99      jobId = res.Obj.Id;
100
101      StartResultPollingThread();
102    }
103
104    private void StartResultPollingThread() {
105      // start a backgroud thread to poll the final result of the job
106      Thread t = new Thread(() => {
107        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
108        ResponseObject<SerializedJob> response = null;
109        Job restoredJob = null;
110        do {
111          Thread.Sleep(RESULT_POLLING_INTERVAL_MS);
112          lock (locker) {
113            Logger.Debug("HiveEngine: Results-polling - GetLastResult");
114            response = executionEngineFacade.GetLastSerializedResult(jobId, false, false);
115            Logger.Debug("HiveEngine: Results-polling - Server: " + response.StatusMessage + " success: " + response.Success);
116            // loop while
117            // 1. the user doesn't request an abort
118            // 2. there is a problem with server communication (success==false)
119            // 3. no result for the job is available yet (response.Obj==null)
120            // 4. the result that we get from the server is a snapshot and not the final result
121            if (abortRequested) return;
122            if (response.Success && response.Obj != null && response.StatusMessage != ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE) {
123              Logger.Debug("HiveEngine: Results-polling - Got result!");
124              restoredJob = XmlParser.Deserialize<Job>(new MemoryStream(response.Obj.SerializedJobData));
125              Logger.Debug("HiveEngine: Results-polling - IsSnapshotResult: " + (restoredJob.Progress < 1.0));
126            }
127          }
128        } while (restoredJob == null || (restoredJob.Progress < 1.0));
129
130        job = restoredJob;
131
132        // [chn] how to show the View in 3.3?
133        //ControlManager.Manager.ShowControl(job.Engine.CreateView());
134        //OnChanged();
135
136        //OnFinished();
137      });
138      Logger.Debug("HiveEngine: Starting results-polling thread");
139      t.Start();
140    }
141
142    public override void Pause() {
143      base.Pause();
144      if (job != null) {
145        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
146        executionEngineFacade.AbortJob(jobId);
147      }
148    }
149
150    public override void Stop() {
151      base.Stop();
152      if (job != null) {
153        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
154        executionEngineFacade.AbortJob(jobId);
155      }
156    }
157
158    /// <summary>
159    /// Creates a SerializedJob Object which contains the currentJob as binary array
160    /// and some metainfo about that job.
161    /// </summary>
162    private SerializedJob CreateSerializedJob() {
163      JobDto jobDto = new JobDto();
164
165      // serialize current job
166      MemoryStream memStream = new MemoryStream();
167      XmlGenerator.Serialize(job, memStream);
168
169      // convert memStream to byte-array
170      SerializedJob executableJobObj = new SerializedJob();
171      executableJobObj.JobInfo = jobDto;
172      executableJobObj.SerializedJobData = memStream.ToArray();
173
174      // find out which which plugins are needed to recreate the type
175      List<HivePluginInfoDto> pluginsNeeded = (
176        from p in GetDeclaringPlugins(job.GetType())
177        select new HivePluginInfoDto() {
178          Name = p.Name,
179          Version = p.Version,
180          BuildDate = p.BuildDate,
181        }).ToList();
182
183      jobDto.CoresNeeded = 1;
184      jobDto.PluginsNeeded = pluginsNeeded;
185      jobDto.State = State.offline;
186      return executableJobObj;
187    }
188
189
190    //public event EventHandler Initialized;
191    ///// <summary>
192    ///// Fires a new <c>Initialized</c> event.
193    ///// </summary>
194    //protected virtual void OnInitialized() {
195    //  if (Initialized != null)
196    //    Initialized(this, new EventArgs());
197    //}
198
199    //public event EventHandler<EventArgs<IOperation>> OperationExecuted;
200    ///// <summary>
201    ///// Fires a new <c>OperationExecuted</c> event.
202    ///// </summary>
203    ///// <param name="operation">The operation that has been executed.</param>
204    //protected virtual void OnOperationExecuted(IOperation operation) {
205    //  if (OperationExecuted != null)
206    //    OperationExecuted(this, new EventArgs<IOperation>(operation));
207    //}
208
209    //public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
210    ///// <summary>
211    ///// Aborts the execution and fires a new <c>ExceptionOccurred</c> event.
212    ///// </summary>
213    ///// <param name="exception">The exception that was thrown.</param>
214    //protected virtual void OnExceptionOccurred(Exception exception) {
215    //  Abort();
216    //  if (ExceptionOccurred != null)
217    //    ExceptionOccurred(this, new EventArgs<Exception>(exception));
218    //}
219
220    //public event EventHandler ExecutionTimeChanged;
221    ///// <summary>
222    ///// Fires a new <c>ExecutionTimeChanged</c> event.
223    ///// </summary>
224    //protected virtual void OnExecutionTimeChanged() {
225    //  if (ExecutionTimeChanged != null)
226    //    ExecutionTimeChanged(this, new EventArgs());
227    //}
228
229    //public event EventHandler Finished;
230    ///// <summary>
231    ///// Fires a new <c>Finished</c> event.
232    ///// </summary>
233    //protected virtual void OnFinished() {
234    //  if (Finished != null)
235    //    Finished(this, new EventArgs());
236    //}
237
238    //public void Abort() {
239    //  abortRequested = true;
240    //  // RequestSnapshot();
241    //  IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
242    //  executionEngineFacade.AbortJob(jobId);
243    //  //OnChanged();
244    //  OnFinished();
245    //}
246
247    public void RequestSnapshot() {
248      if (job != null) {
249        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
250        int retryCount = 0;
251        ResponseObject<SerializedJob> response;
252        lock (locker) {
253          Logger.Debug("HiveEngine: Abort - RequestSnapshot");
254          Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
255          if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) {
256            // job is finished already
257            Logger.Debug("HiveEngine: Abort - GetLastResult(false)");
258            response = executionEngineFacade.GetLastSerializedResult(jobId, false, false);
259            Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
260          } else {
261            // server sent snapshot request to client
262            // poll until snapshot is ready
263            do {
264              Thread.Sleep(SNAPSHOT_POLLING_INTERVAL_MS);
265              Logger.Debug("HiveEngine: Abort - GetLastResult(true)");
266              response = executionEngineFacade.GetLastSerializedResult(jobId, false, true);
267              Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
268              retryCount++;
269              // loop while
270              // 1. problem with communication with server
271              // 2. job result not yet ready
272            } while (
273              (retryCount < MAX_SNAPSHOT_RETRIES) && (
274              !response.Success ||
275              response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE)
276              );
277          }
278
279        }
280        SerializedJob jobResult = response.Obj;
281        if (jobResult != null) {
282          Logger.Debug("HiveEngine: Results-polling - Got result!");
283
284          job = XmlParser.Deserialize<Job>(new MemoryStream(jobResult.SerializedJobData));
285
286          throw new NotImplementedException("[chn] how to create a view in 3.3 and why should i do this here? shouldnt the caller of this method receive a result and decide what to do?");
287          //ControlManager.Manager.ShowControl(job.Engine.CreateView());
288        }
289      }
290      //HiveLogger.Debug("HiveEngine: Results-polling - Exception!");
291      //Exception ex = new Exception(response.Obj.Exception.Message);
292      //ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
293    }
294
295    protected override void ProcessNextOperation() {
296      throw new NotSupportedException();
297    }
298
299    #region Required Plugin Search
300    /// <summary>
301    /// Returns a list of plugins in which the type itself and all members
302    /// of the type are declared. Objectgraph is searched recursively.
303    /// </summary>
304    private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {
305      HashSet<Type> types = new HashSet<Type>();
306      FindTypes(type, types, "HeuristicLab.");
307      return GetDeclaringPlugins(types);
308    }
309
310    /// <summary>
311    /// Returns the plugins (including dependencies) in which the given types are declared
312    /// </summary>
313    private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) {
314      HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>();
315      foreach (Type t in types) {
316        FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins);
317      }
318      return plugins;
319    }
320
321    /// <summary>
322    /// Finds the dependencies of the given plugin and adds it to the plugins hashset.
323    /// Also searches the dependencies recursively.
324    /// </summary>
325    private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) {
326      if (!plugins.Contains(plugin)) {
327        plugins.Add(plugin);
328        foreach (IPluginDescription dependency in plugin.Dependencies) {
329          FindDeclaringPlugins(dependency, plugins);
330        }
331      }
332    }
333
334    /// <summary>
335    /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart
336    /// Be aware that search is not performed on attributes
337    /// </summary>
338    /// <param name="type">the type to be searched</param>
339    /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>
340    /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>
341    private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {
342      if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {
343        types.Add(type);
344
345        // constructors
346        foreach (ConstructorInfo info in type.GetConstructors()) {
347          foreach (ParameterInfo paramInfo in info.GetParameters()) {
348            FindTypes(paramInfo.ParameterType, types, namespaceStart);
349          }
350        }
351
352        // interfaces
353        foreach (Type t in type.GetInterfaces()) {
354          FindTypes(t, types, namespaceStart);
355        }
356
357        // events
358        foreach (EventInfo info in type.GetEvents()) {
359          FindTypes(info.EventHandlerType, types, namespaceStart);
360          FindTypes(info.DeclaringType, types, namespaceStart);
361        }
362
363        // properties
364        foreach (PropertyInfo info in type.GetProperties()) {
365          FindTypes(info.PropertyType, types, namespaceStart);
366        }
367
368        // fields
369        foreach (FieldInfo info in type.GetFields()) {
370          FindTypes(info.FieldType, types, namespaceStart);
371        }
372
373        // methods
374        foreach (MethodInfo info in type.GetMethods()) {
375          foreach (ParameterInfo paramInfo in info.GetParameters()) {
376            FindTypes(paramInfo.ParameterType, types, namespaceStart);
377          }
378          FindTypes(info.ReturnType, types, namespaceStart);
379        }
380      }
381    }
382    #endregion
383  }
384}
Note: See TracBrowser for help on using the repository browser.