Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs @ 4121

Last change on this file since 4121 was 4121, checked in by cneumuel, 14 years ago

Stabilization of Hive, Improvement HiveExperiment GUI (#1115)

File size: 18.0 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Linq;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
27using HeuristicLab.Optimization;
28using System.Drawing;
29using HeuristicLab.Collections;
30using System.Collections.Generic;
31using HeuristicLab.Hive.Contracts.BusinessObjects;
32using System.IO;
33using HeuristicLab.Persistence.Default.Xml;
34using HeuristicLab.PluginInfrastructure;
35using System.Reflection;
36using HeuristicLab.Hive.Contracts.Interfaces;
37using HeuristicLab.Hive.Contracts;
38using System.Threading;
39using HeuristicLab.Tracing;
40using HeuristicLab.Hive.JobBase;
41using System.Diagnostics;
42using System.Collections;
43
44namespace HeuristicLab.Hive.Experiment {
45  /// <summary>
46  /// An experiment which contains multiple batch runs of algorithms.
47  /// </summary>
48  [Item(itemName, itemDescription)]
49  [Creatable("Testing & Analysis")]
50  [StorableClass]
51  public class HiveExperiment : NamedItem, IExecutable {
52    private const string itemName = "Hive Experiment";
53    private const string itemDescription = "An experiment which contains multiple batch runs of algorithms which are executed in the Hive.";
54    private const int resultPollingIntervalMs = 15000;
55
56    private object locker = new object();
57    private const int maxSnapshotRetries = 20;
58    private System.Timers.Timer timer;
59    private bool pausePending, stopPending;
60    private DateTime lastUpdateTime;
61
62    [Storable]
63    private IDictionary<Guid, IOptimizer> pendingOptimizers = new Dictionary<Guid, IOptimizer>();
64
65    [Storable]
66    private JobItemList jobItems;
67    public JobItemList JobItems {
68      get { return jobItems; }
69    }
70   
71
72    [Storable]
73    private string serverUrl;
74    public string ServerUrl {
75      get { return serverUrl; }
76      set {
77        if (serverUrl != value) {
78          serverUrl = value;
79          OnServerUrlChanged();
80        }
81      }
82    }
83
84    [Storable]
85    private string resourceIds;
86    public string ResourceIds {
87      get { return resourceIds; }
88      set {
89        if (resourceIds != value) {
90          resourceIds = value;
91          OnResourceIdsChanged();
92        }
93      }
94    }
95
96    [Storable]
97    private HeuristicLab.Optimization.Experiment experiment;
98    public HeuristicLab.Optimization.Experiment Experiment {
99      get { return experiment; }
100      set {
101        if (experiment != value) {
102          experiment = value;
103          OnExperimentChanged();
104        }
105      }
106    }
107
108    [Storable]
109    private ILog log;
110    public ILog Log {
111      get { return log; }
112    }
113
114    [StorableConstructor]
115    public HiveExperiment(bool deserializing)
116      : base(deserializing) {
117    }
118
119    public HiveExperiment()
120      : base(itemName, itemDescription) {
121      this.ServerUrl = HeuristicLab.Hive.Experiment.Properties.Settings.Default.HiveServerUrl;
122      this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
123      this.log = new Log();
124      pausePending = stopPending = false;
125      jobItems = new JobItemList();
126      InitTimer();
127    }
128
129    public override IDeepCloneable Clone(Cloner cloner) {
130      LogMessage("I am beeing cloned");
131      HiveExperiment clone = (HiveExperiment)base.Clone(cloner);
132      clone.resourceIds = this.resourceIds;
133      clone.serverUrl = this.serverUrl;
134      clone.experiment = (HeuristicLab.Optimization.Experiment)cloner.Clone(experiment);
135      clone.executionState = this.executionState;
136      clone.executionTime = this.executionTime;
137      clone.pendingOptimizers = new Dictionary<Guid, IOptimizer>();
138      foreach (var pair in this.pendingOptimizers)
139        clone.pendingOptimizers[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);
140      clone.log = (ILog)cloner.Clone(log);
141      clone.stopPending = this.stopPending;
142      clone.pausePending = this.pausePending;
143      clone.jobItems = (JobItemList)cloner.Clone(jobItems);
144      return clone;
145    }
146
147    [StorableHook(HookType.AfterDeserialization)]
148    private void AfterDeserialization() {
149      InitTimer();
150      LogMessage("I was deserialized.");
151    }
152
153    private void InitTimer() {
154      timer = new System.Timers.Timer(100);
155      timer.AutoReset = true;
156      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
157    }
158
159    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
160      DateTime now = DateTime.Now;
161      ExecutionTime += now - lastUpdateTime;
162      lastUpdateTime = now;
163    }
164
165    public IEnumerable<string> ResourceGroups {
166      get {
167        if (!string.IsNullOrEmpty(resourceIds)) {
168          return resourceIds.Split(';');
169        } else {
170          return new List<string>();
171        }
172      }
173    }
174    #region IExecutable Members
175
176    [Storable]
177    private Core.ExecutionState executionState;
178    public ExecutionState ExecutionState {
179      get { return executionState; }
180      private set {
181        if (executionState != value) {
182          executionState = value;
183          OnExecutionStateChanged();
184        }
185      }
186    }
187
188    [Storable]
189    private TimeSpan executionTime;
190    public TimeSpan ExecutionTime {
191      get { return executionTime; }
192      private set {
193        if (executionTime != value) {
194          executionTime = value;
195          OnExecutionTimeChanged();
196        }
197      }
198    }
199
200    public void Pause() {
201      throw new NotSupportedException();
202    }
203
204    public void Prepare() {
205      if (experiment != null) {
206        experiment.Prepare();
207        this.ExecutionState = Core.ExecutionState.Prepared;
208        OnPrepared();
209      }
210    }
211
212    public void Start() {
213      OnStarted();
214      lastUpdateTime = DateTime.Now;
215      this.ExecutionState = Core.ExecutionState.Started;
216      Thread t = new Thread(() => {
217        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(ServerUrl);
218
219        pendingOptimizers = new Dictionary<Guid, IOptimizer>();
220        IEnumerable<string> groups = ResourceGroups;
221
222        foreach (IOptimizer optimizer in GetOptimizers(false)) {
223          SerializedJob serializedJob = CreateSerializedJob(optimizer);
224          ResponseObject<JobDto> response = executionEngineFacade.AddJobWithGroupStrings(serializedJob, groups);
225          pendingOptimizers.Add(response.Obj.Id, optimizer);
226
227          JobItem jobItem = new JobItem() {
228            JobDto = response.Obj,
229            LatestSnapshot = new ResponseObject<SerializedJob>() {
230              Obj = serializedJob,
231              StatusMessage = "Initial Snapshot",
232              Success = true
233            }
234          };
235          jobItems.Add(jobItem);
236
237          LogMessage("Sent job to server (jobId: " + response.Obj.Id + ")");
238        }
239
240        // start results polling after sending sending the jobs to the server (to avoid race conflicts at the optimizers-collection)
241        foreach (JobItem jobItem in jobItems) {
242          StartResultPollingThread(jobItem.JobDto);
243        }
244      });
245      t.Start();
246    }
247
248    /// <summary>
249    /// Returns all optimizers in the current Experiment
250    /// </summary>
251    /// <param name="flatout">if false only top level optimizers are returned, if true the optimizer-tree is flatted</param>
252    /// <returns></returns>
253    private IEnumerable<IOptimizer> GetOptimizers(bool flatout) {
254      if (!flatout) {
255        return experiment.Optimizers;
256      } else {
257        throw new NotImplementedException();
258      }
259    }
260
261    private void ReplaceOptimizer(IOptimizer originalOptimizer, IOptimizer newOptimizer) {
262      lock (locker) {
263        int originalOptimizerIndex = experiment.Optimizers.IndexOf(originalOptimizer);
264        experiment.Optimizers[originalOptimizerIndex] = newOptimizer;
265      }
266    }
267
268    public void Stop() {
269      // todo
270    }
271   
272    #endregion
273
274    private SerializedJob CreateSerializedJob(IOptimizer optimizer) {
275      IJob job = new OptimizerJob() {
276        Optimizer = optimizer
277      };
278
279      // serialize job
280      MemoryStream memStream = new MemoryStream();
281      XmlGenerator.Serialize(job, memStream);
282      byte[] jobByteArray = memStream.ToArray();
283      memStream.Dispose();
284
285      // find out which which plugins are needed for the given object
286      List<HivePluginInfoDto> pluginsNeeded = (
287        from p in GetDeclaringPlugins(optimizer.GetType())
288        select new HivePluginInfoDto() {
289          Name = p.Name,
290          Version = p.Version
291        }).ToList();
292
293      JobDto jobDto = new JobDto() {
294        CoresNeeded = 1, // [chn] how to determine real cores needed?
295        PluginsNeeded = pluginsNeeded,
296        State = State.Offline,
297        MemoryNeeded = 0,
298        UserId = Guid.Empty // [chn] set real userid here!
299      };
300
301      SerializedJob serializedJob = new SerializedJob() {
302        JobInfo = jobDto,
303        SerializedJobData = jobByteArray
304      };
305
306      return serializedJob;
307    }
308
309    private void StartResultPollingThread(JobDto job) {
310      Thread t = new Thread(() => {
311        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(ServerUrl);
312        IJob restoredObject = null;
313
314        do {
315          Thread.Sleep(resultPollingIntervalMs);
316          //lock (locker) { [chn] try without locking for better performance
317            if (stopPending) return;
318
319            ResponseObject<JobDto> response = executionEngineFacade.GetJobById(job.Id);
320            LogMessage("Response: " + response.StatusMessage + " (jobId: " + job.Id + ")");
321
322            if (response.Obj != null) {
323              UpdateJobItem(response.Obj);
324            }
325           
326            // loop while
327            // 1. the user doesn't request an abort
328            // 2. there is a problem with server communication (success==false)
329            // 3. no result for the job is available yet (response.Obj==null)
330            // 4. the result that we get from the server is a snapshot and not the final result
331            if (response.Success && response.Obj != null && response.Obj.State == State.Finished) {
332              ResponseObject<SerializedJob> jobResponse = executionEngineFacade.GetLastSerializedResult(job.Id, false, false);
333              restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
334              UpdateSnapshot(jobResponse);
335            }
336          //}
337        } while (restoredObject == null || restoredObject.ExecutionState != Core.ExecutionState.Stopped);
338
339        LogMessage("Job finished (jobId: " + job.Id + ")");
340        // job retrieved... replace the existing optimizers with the finished one
341        IOptimizer originalOptimizer = pendingOptimizers[job.Id];
342        IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
343
344        ReplaceOptimizer(originalOptimizer, restoredOptimizer);
345        pendingOptimizers.Remove(job.Id);
346
347        if (pendingOptimizers.Count == 0) {
348          // finished
349          this.ExecutionState = Core.ExecutionState.Stopped;
350          OnStopped();
351        }
352      });
353      t.Start();
354    }
355
356    private void UpdateJobItem(JobDto jobDto) {
357      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id);
358      jobItem.JobDto = jobDto;
359    }
360
361    private void UpdateSnapshot(ResponseObject<SerializedJob> response) {
362      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id);
363      jobItem.LatestSnapshot = response;
364    }
365
366    private void LogMessage(string message) {
367      // HeuristicLab.Log is not Thread-Safe, so lock every call
368      lock (locker) {
369        log.LogMessage(message);
370      }
371    }
372   
373    #region Required Plugin Search
374    /// <summary>
375    /// Returns a list of plugins in which the type itself and all members
376    /// of the type are declared. Objectgraph is searched recursively.
377    /// </summary>
378    private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {
379      HashSet<Type> types = new HashSet<Type>();
380      FindTypes(type, types, "HeuristicLab.");
381      return GetDeclaringPlugins(types);
382    }
383
384    /// <summary>
385    /// Returns the plugins (including dependencies) in which the given types are declared
386    /// </summary>
387    private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) {
388      HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>();
389      foreach (Type t in types) {
390        FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins);
391      }
392      return plugins;
393    }
394
395    /// <summary>
396    /// Finds the dependencies of the given plugin and adds it to the plugins hashset.
397    /// Also searches the dependencies recursively.
398    /// </summary>
399    private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) {
400      if (!plugins.Contains(plugin)) {
401        plugins.Add(plugin);
402        foreach (IPluginDescription dependency in plugin.Dependencies) {
403          FindDeclaringPlugins(dependency, plugins);
404        }
405      }
406    }
407
408    /// <summary>
409    /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart
410    /// Be aware that search is not performed on attributes
411    /// </summary>
412    /// <param name="type">the type to be searched</param>
413    /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>
414    /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>
415    private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {
416      if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {
417        types.Add(type);
418
419        // constructors
420        foreach (ConstructorInfo info in type.GetConstructors()) {
421          foreach (ParameterInfo paramInfo in info.GetParameters()) {
422            FindTypes(paramInfo.ParameterType, types, namespaceStart);
423          }
424        }
425
426        // interfaces
427        foreach (Type t in type.GetInterfaces()) {
428          FindTypes(t, types, namespaceStart);
429        }
430
431        // events
432        foreach (EventInfo info in type.GetEvents()) {
433          FindTypes(info.EventHandlerType, types, namespaceStart);
434          FindTypes(info.DeclaringType, types, namespaceStart);
435        }
436
437        // properties
438        foreach (PropertyInfo info in type.GetProperties()) {
439          FindTypes(info.PropertyType, types, namespaceStart);
440        }
441
442        // fields
443        foreach (FieldInfo info in type.GetFields()) {
444          FindTypes(info.FieldType, types, namespaceStart);
445        }
446
447        // methods
448        foreach (MethodInfo info in type.GetMethods()) {
449          foreach (ParameterInfo paramInfo in info.GetParameters()) {
450            FindTypes(paramInfo.ParameterType, types, namespaceStart);
451          }
452          FindTypes(info.ReturnType, types, namespaceStart);
453        }
454      }
455    }
456    #endregion
457
458    #region Eventhandler
459
460    public event EventHandler ExecutionTimeChanged;
461    private void OnExecutionTimeChanged() {
462      EventHandler handler = ExecutionTimeChanged;
463      if (handler != null) handler(this, EventArgs.Empty);
464    }
465
466    public event EventHandler ExecutionStateChanged;
467    private void OnExecutionStateChanged() {
468      LogMessage("ExecutionState changed to " + executionState.ToString());
469      EventHandler handler = ExecutionStateChanged;
470      if (handler != null) handler(this, EventArgs.Empty);
471    }
472
473    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
474
475    public event EventHandler Started;
476    private void OnStarted() {
477      LogMessage("Started");
478      timer.Start();
479      EventHandler handler = Started;
480      if (handler != null) handler(this, EventArgs.Empty);
481    }
482
483    public event EventHandler Stopped;
484    private void OnStopped() {
485      timer.Stop();
486      LogMessage("Stopped");
487      EventHandler handler = Stopped;
488      if (handler != null) handler(this, EventArgs.Empty);
489    }
490
491    public event EventHandler Paused;
492    private void OnPaused() {
493      timer.Stop();
494      LogMessage("Paused");
495      EventHandler handler = Paused;
496      if (handler != null) handler(this, EventArgs.Empty);
497    }
498
499    public event EventHandler Prepared;
500    protected virtual void OnPrepared() {
501      LogMessage("Prepared");
502      EventHandler handler = Prepared;
503      if (handler != null) handler(this, EventArgs.Empty);
504    }
505
506    public event EventHandler ResourceIdsChanged;
507    protected virtual void OnResourceIdsChanged() {
508      EventHandler handler = ResourceIdsChanged;
509      if (handler != null) handler(this, EventArgs.Empty);
510    }
511
512    public event EventHandler ExperimentChanged;
513    protected virtual void OnExperimentChanged() {
514      LogMessage("Experiment changed");
515      EventHandler handler = ExperimentChanged;
516      if (handler != null) handler(this, EventArgs.Empty);
517    }
518
519    public event EventHandler ServerUrlChanged;
520    protected virtual void OnServerUrlChanged() {
521      EventHandler handler = ServerUrlChanged;
522      if (handler != null) handler(this, EventArgs.Empty);
523    }
524
525    #endregion
526  }
527}
Note: See TracBrowser for help on using the repository browser.