Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs @ 4120

Last change on this file since 4120 was 4120, checked in by cneumuel, 14 years ago

further improvement and stabilisation of HiveExperiment (#1115)

File size: 17.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Linq;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
27using HeuristicLab.Optimization;
28using System.Drawing;
29using HeuristicLab.Collections;
30using System.Collections.Generic;
31using HeuristicLab.Hive.Contracts.BusinessObjects;
32using System.IO;
33using HeuristicLab.Persistence.Default.Xml;
34using HeuristicLab.PluginInfrastructure;
35using System.Reflection;
36using HeuristicLab.Hive.Contracts.Interfaces;
37using HeuristicLab.Hive.Contracts;
38using System.Threading;
39using HeuristicLab.Tracing;
40using HeuristicLab.Hive.JobBase;
41using System.Diagnostics;
42using System.Collections;
43
44namespace HeuristicLab.Hive.Experiment {
45  /// <summary>
46  /// An experiment which contains multiple batch runs of algorithms.
47  /// </summary>
48  [Item(itemName, itemDescription)]
49  [Creatable("Testing & Analysis")]
50  [StorableClass]
51  public class HiveExperiment : NamedItem, IExecutable {
52    private const string itemName = "Hive Experiment";
53    private const string itemDescription = "An experiment which contains multiple batch runs of algorithms which are executed in the Hive.";
54    private const int resultPollingIntervalMs = 10000;
55
56    private object locker = new object();
57    private const int maxSnapshotRetries = 20;
58    private System.Timers.Timer timer;
59    private bool pausePending, stopPending;
60    private DateTime lastUpdateTime;
61
62    [Storable]
63    private IDictionary<Guid, IOptimizer> pendingOptimizers = new Dictionary<Guid, IOptimizer>();
64
65    [Storable]
66    private JobItemList jobItems;
67    public JobItemList JobItems {
68      get { return jobItems; }
69    }
70   
71
72    [Storable]
73    private string serverUrl;
74    public string ServerUrl {
75      get { return serverUrl; }
76      set {
77        if (serverUrl != value) {
78          serverUrl = value;
79          OnServerUrlChanged();
80        }
81      }
82    }
83
84    [Storable]
85    private string resourceIds;
86    public string ResourceIds {
87      get { return resourceIds; }
88      set {
89        if (resourceIds != value) {
90          resourceIds = value;
91          OnResourceIdsChanged();
92        }
93      }
94    }
95
96    [Storable]
97    private HeuristicLab.Optimization.Experiment experiment;
98    public HeuristicLab.Optimization.Experiment Experiment {
99      get { return experiment; }
100      set {
101        if (experiment != value) {
102          experiment = value;
103          OnExperimentChanged();
104        }
105      }
106    }
107
108    [Storable]
109    private ILog log;
110    public ILog Log {
111      get { return log; }
112    }
113
114    [StorableConstructor]
115    public HiveExperiment(bool deserializing)
116      : base(deserializing) {
117    }
118
119    public HiveExperiment()
120      : base(itemName, itemDescription) {
121      this.ServerUrl = HeuristicLab.Hive.Experiment.Properties.Settings.Default.HiveServerUrl;
122      this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
123      this.log = new Log();
124      pausePending = stopPending = false;
125      jobItems = new JobItemList();
126      InitTimer();
127    }
128
129    public override IDeepCloneable Clone(Cloner cloner) {
130      log.LogMessage("I am beeing cloned");
131      HiveExperiment clone = (HiveExperiment)base.Clone(cloner);
132      clone.resourceIds = this.resourceIds;
133      clone.serverUrl = this.serverUrl;
134      clone.experiment = (HeuristicLab.Optimization.Experiment)cloner.Clone(experiment);
135      clone.executionState = this.executionState;
136      clone.executionTime = this.executionTime;
137      clone.pendingOptimizers = new Dictionary<Guid, IOptimizer>();
138      foreach (var pair in this.pendingOptimizers)
139        clone.pendingOptimizers[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);
140      clone.log = (ILog)cloner.Clone(log);
141      clone.stopPending = this.stopPending;
142      clone.pausePending = this.pausePending;
143      clone.jobItems = (JobItemList)cloner.Clone(jobItems);
144      return clone;
145    }
146
147    [StorableHook(HookType.AfterDeserialization)]
148    private void AfterDeserialization() {
149      InitTimer();
150      log.LogMessage("I was deserialized.");
151    }
152
153    private void InitTimer() {
154      timer = new System.Timers.Timer(100);
155      timer.AutoReset = true;
156      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
157    }
158
159    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
160      DateTime now = DateTime.Now;
161      ExecutionTime += now - lastUpdateTime;
162      lastUpdateTime = now;
163    }
164
165    public IEnumerable<string> ResourceGroups {
166      get {
167        if (!string.IsNullOrEmpty(resourceIds)) {
168          return resourceIds.Split(';');
169        } else {
170          return new List<string>();
171        }
172      }
173    }
174    #region IExecutable Members
175
176    [Storable]
177    private Core.ExecutionState executionState;
178    public ExecutionState ExecutionState {
179      get { return executionState; }
180      private set {
181        if (executionState != value) {
182          executionState = value;
183          OnExecutionStateChanged();
184        }
185      }
186    }
187
188    [Storable]
189    private TimeSpan executionTime;
190    public TimeSpan ExecutionTime {
191      get { return executionTime; }
192      private set {
193        if (executionTime != value) {
194          executionTime = value;
195          OnExecutionTimeChanged();
196        }
197      }
198    }
199
200    public void Pause() {
201      throw new NotSupportedException();
202    }
203
204    public void Prepare() {
205      if (experiment != null) {
206        experiment.Prepare();
207        this.ExecutionState = Core.ExecutionState.Prepared;
208        OnPrepared();
209      }
210    }
211
212    public void Start() {
213      OnStarted();
214      lastUpdateTime = DateTime.Now;
215      this.ExecutionState = Core.ExecutionState.Started;
216      Thread t = new Thread(() => {
217        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(ServerUrl);
218
219        pendingOptimizers = new Dictionary<Guid, IOptimizer>();
220        IEnumerable<string> groups = ResourceGroups;
221
222        foreach (IOptimizer optimizer in GetOptimizers(false)) {
223          SerializedJob serializedJob = CreateSerializedJob(optimizer);
224          ResponseObject<JobDto> response = executionEngineFacade.AddJobWithGroupStrings(serializedJob, groups);
225          pendingOptimizers.Add(response.Obj.Id, optimizer);
226          StartResultPollingThread(response.Obj);
227
228          JobItem jobItem = new JobItem() {
229            JobDto = response.Obj,
230            LatestSnapshot = new ResponseObject<SerializedJob>() {
231              Obj = serializedJob,
232              StatusMessage = "Initial Snapshot",
233              Success = true
234            }
235          };
236          jobItems.Add(jobItem);
237
238          log.LogMessage("Sent job to server (jobId: " + response.Obj.Id + ")");
239        }
240      });
241      t.Start();
242    }
243
244    /// <summary>
245    /// Returns all optimizers in the current Experiment
246    /// </summary>
247    /// <param name="flatout">if false only top level optimizers are returned, if true the optimizer-tree is flatted</param>
248    /// <returns></returns>
249    private IEnumerable<IOptimizer> GetOptimizers(bool flatout) {
250      if (!flatout) {
251        return experiment.Optimizers;
252      } else {
253        throw new NotImplementedException();
254      }
255    }
256
257    private void ReplaceOptimizer(IOptimizer originalOptimizer, IOptimizer newOptimizer) {
258      int originalOptimizerIndex = experiment.Optimizers.IndexOf(originalOptimizer);
259      experiment.Optimizers[originalOptimizerIndex] = newOptimizer;
260    }
261
262    public void Stop() {
263      // todo
264    }
265   
266    #endregion
267
268    private SerializedJob CreateSerializedJob(IOptimizer optimizer) {
269      IJob job = new OptimizerJob() {
270        Optimizer = optimizer
271      };
272
273      // serialize job
274      MemoryStream memStream = new MemoryStream();
275      XmlGenerator.Serialize(job, memStream);
276      byte[] jobByteArray = memStream.ToArray();
277      memStream.Dispose();
278
279      // find out which which plugins are needed for the given object
280      List<HivePluginInfoDto> pluginsNeeded = (
281        from p in GetDeclaringPlugins(optimizer.GetType())
282        select new HivePluginInfoDto() {
283          Name = p.Name,
284          Version = p.Version
285        }).ToList();
286
287      JobDto jobDto = new JobDto() {
288        CoresNeeded = 1, // [chn] how to determine real cores needed?
289        PluginsNeeded = pluginsNeeded,
290        State = State.Offline,
291        MemoryNeeded = 0,
292        UserId = Guid.Empty // [chn] set real userid here!
293      };
294
295      SerializedJob serializedJob = new SerializedJob() {
296        JobInfo = jobDto,
297        SerializedJobData = jobByteArray
298      };
299
300      return serializedJob;
301    }
302
303    private void StartResultPollingThread(JobDto job) {
304      Thread t = new Thread(() => {
305        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(ServerUrl);
306        IJob restoredObject = null;
307
308        do {
309          Thread.Sleep(resultPollingIntervalMs);
310          lock (locker) {
311            if (stopPending) return;
312           
313            ResponseObject<SerializedJob> response = executionEngineFacade.GetLastSerializedResult(job.Id, false, false);
314            log.LogMessage("Received response for job: " + response.StatusMessage + " (jobId: " + job.Id + ")");
315           
316           
317            // loop while
318            // 1. the user doesn't request an abort
319            // 2. there is a problem with server communication (success==false)
320            // 3. no result for the job is available yet (response.Obj==null)
321            // 4. the result that we get from the server is a snapshot and not the final result
322            if (response.Success && response.Obj != null && response.StatusMessage != ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE) {
323              restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(response.Obj.SerializedJobData));
324            }
325          }
326        } while (restoredObject == null || restoredObject.ExecutionState != Core.ExecutionState.Stopped);
327
328        log.LogMessage("Job finished (jobId: " + job.Id + ")");
329        // job retrieved... replace the existing optimizers with the finished one
330        IOptimizer originalOptimizer = pendingOptimizers[job.Id];
331        IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
332
333        ReplaceOptimizer(originalOptimizer, restoredOptimizer);
334        pendingOptimizers.Remove(job.Id);
335
336        if (pendingOptimizers.Count == 0) {
337          // finished
338          this.ExecutionState = Core.ExecutionState.Stopped;
339          OnStopped();
340        }
341      });
342
343      Logger.Debug("HiveEngine: Starting results-polling thread");
344      t.Start();
345    }
346   
347    #region Required Plugin Search
348    /// <summary>
349    /// Returns a list of plugins in which the type itself and all members
350    /// of the type are declared. Objectgraph is searched recursively.
351    /// </summary>
352    private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {
353      HashSet<Type> types = new HashSet<Type>();
354      FindTypes(type, types, "HeuristicLab.");
355      return GetDeclaringPlugins(types);
356    }
357
358    /// <summary>
359    /// Returns the plugins (including dependencies) in which the given types are declared
360    /// </summary>
361    private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) {
362      HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>();
363      foreach (Type t in types) {
364        FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins);
365      }
366      return plugins;
367    }
368
369    /// <summary>
370    /// Finds the dependencies of the given plugin and adds it to the plugins hashset.
371    /// Also searches the dependencies recursively.
372    /// </summary>
373    private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) {
374      if (!plugins.Contains(plugin)) {
375        plugins.Add(plugin);
376        foreach (IPluginDescription dependency in plugin.Dependencies) {
377          FindDeclaringPlugins(dependency, plugins);
378        }
379      }
380    }
381
382    /// <summary>
383    /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart
384    /// Be aware that search is not performed on attributes
385    /// </summary>
386    /// <param name="type">the type to be searched</param>
387    /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>
388    /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>
389    private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {
390      if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {
391        types.Add(type);
392
393        // constructors
394        foreach (ConstructorInfo info in type.GetConstructors()) {
395          foreach (ParameterInfo paramInfo in info.GetParameters()) {
396            FindTypes(paramInfo.ParameterType, types, namespaceStart);
397          }
398        }
399
400        // interfaces
401        foreach (Type t in type.GetInterfaces()) {
402          FindTypes(t, types, namespaceStart);
403        }
404
405        // events
406        foreach (EventInfo info in type.GetEvents()) {
407          FindTypes(info.EventHandlerType, types, namespaceStart);
408          FindTypes(info.DeclaringType, types, namespaceStart);
409        }
410
411        // properties
412        foreach (PropertyInfo info in type.GetProperties()) {
413          FindTypes(info.PropertyType, types, namespaceStart);
414        }
415
416        // fields
417        foreach (FieldInfo info in type.GetFields()) {
418          FindTypes(info.FieldType, types, namespaceStart);
419        }
420
421        // methods
422        foreach (MethodInfo info in type.GetMethods()) {
423          foreach (ParameterInfo paramInfo in info.GetParameters()) {
424            FindTypes(paramInfo.ParameterType, types, namespaceStart);
425          }
426          FindTypes(info.ReturnType, types, namespaceStart);
427        }
428      }
429    }
430    #endregion
431
432    #region Eventhandler
433
434    public event EventHandler ExecutionTimeChanged;
435    private void OnExecutionTimeChanged() {
436      EventHandler handler = ExecutionTimeChanged;
437      if (handler != null) handler(this, EventArgs.Empty);
438    }
439
440    public event EventHandler ExecutionStateChanged;
441    private void OnExecutionStateChanged() {
442      log.LogMessage("ExecutionState changed to " + executionState.ToString());
443      EventHandler handler = ExecutionStateChanged;
444      if (handler != null) handler(this, EventArgs.Empty);
445    }
446
447    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
448
449    public event EventHandler Started;
450    private void OnStarted() {
451      log.LogMessage("Started");
452      timer.Start();
453      EventHandler handler = Started;
454      if (handler != null) handler(this, EventArgs.Empty);
455    }
456
457    public event EventHandler Stopped;
458    private void OnStopped() {
459      timer.Stop();
460      log.LogMessage("Stopped");
461      EventHandler handler = Stopped;
462      if (handler != null) handler(this, EventArgs.Empty);
463    }
464
465    public event EventHandler Paused;
466    private void OnPaused() {
467      timer.Stop();
468      log.LogMessage("Paused");
469      EventHandler handler = Paused;
470      if (handler != null) handler(this, EventArgs.Empty);
471    }
472
473    public event EventHandler Prepared;
474    protected virtual void OnPrepared() {
475      log.LogMessage("Prepared");
476      EventHandler handler = Prepared;
477      if (handler != null) handler(this, EventArgs.Empty);
478    }
479
480    public event EventHandler ResourceIdsChanged;
481    protected virtual void OnResourceIdsChanged() {
482      EventHandler handler = ResourceIdsChanged;
483      if (handler != null) handler(this, EventArgs.Empty);
484    }
485
486    public event EventHandler ExperimentChanged;
487    protected virtual void OnExperimentChanged() {
488      log.LogMessage("Experiment changed");
489      EventHandler handler = ExperimentChanged;
490      if (handler != null) handler(this, EventArgs.Empty);
491    }
492
493    public event EventHandler ServerUrlChanged;
494    protected virtual void OnServerUrlChanged() {
495      EventHandler handler = ServerUrlChanged;
496      if (handler != null) handler(this, EventArgs.Empty);
497    }
498
499    #endregion
500  }
501}
Note: See TracBrowser for help on using the repository browser.