Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs @ 4135

Last change on this file since 4135 was 4135, checked in by cneumuel, 14 years ago

added HeuristicLab.Hive.Tracing (#1092)

File size: 21.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Linq;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
27using HeuristicLab.Optimization;
28using System.Drawing;
29using HeuristicLab.Collections;
30using System.Collections.Generic;
31using HeuristicLab.Hive.Contracts.BusinessObjects;
32using System.IO;
33using HeuristicLab.Persistence.Default.Xml;
34using HeuristicLab.PluginInfrastructure;
35using System.Reflection;
36using HeuristicLab.Hive.Contracts.Interfaces;
37using HeuristicLab.Hive.Contracts;
38using System.Threading;
39using HeuristicLab.Tracing;
40using HeuristicLab.Hive.JobBase;
41using System.Diagnostics;
42using System.Collections;
43
44namespace HeuristicLab.Hive.Experiment {
45  /// <summary>
46  /// An experiment which contains multiple batch runs of algorithms.
47  /// </summary>
48  [Item(itemName, itemDescription)]
49  [Creatable("Testing & Analysis")]
50  [StorableClass]
51  public class HiveExperiment : NamedItem, IExecutable {
52    private const string itemName = "Hive Experiment";
53    private const string itemDescription = "An experiment which contains multiple batch runs of algorithms which are executed in the Hive.";
54    private const int resultPollingIntervalMs = 15000;
55    private const int maxSnapshotRetries = 20;
56    private object locker = new object();
57
58    private System.Timers.Timer timer;
59    private bool pausePending, stopPending;
60
61    [Storable]
62    private DateTime lastUpdateTime;
63
64    private bool isPollingResults;
65    public bool IsPollingResults {
66      get { return isPollingResults; }
67      private set {
68        if (isPollingResults != value) {
69          isPollingResults = value;
70          OnIsPollingResultsChanged();
71        }
72      }
73    }
74
75    private bool stopResultsPollingPending = false;
76
77    private IDictionary<Guid, Thread> resultPollingThreads;
78   
79    [Storable]
80    private IDictionary<Guid, IOptimizer> pendingOptimizers = new Dictionary<Guid, IOptimizer>();
81
82    [Storable]
83    private JobItemList jobItems;
84    public JobItemList JobItems {
85      get { return jobItems; }
86    }
87
88
89    [Storable]
90    private string serverUrl;
91    public string ServerUrl {
92      get { return serverUrl; }
93      set {
94        if (serverUrl != value) {
95          serverUrl = value;
96          OnServerUrlChanged();
97        }
98      }
99    }
100
101    [Storable]
102    private string resourceIds;
103    public string ResourceIds {
104      get { return resourceIds; }
105      set {
106        if (resourceIds != value) {
107          resourceIds = value;
108          OnResourceIdsChanged();
109        }
110      }
111    }
112
113    [Storable]
114    private HeuristicLab.Optimization.Experiment experiment;
115    public HeuristicLab.Optimization.Experiment Experiment {
116      get { return experiment; }
117      set {
118        if (experiment != value) {
119          experiment = value;
120          OnExperimentChanged();
121        }
122      }
123    }
124
125    [Storable]
126    private ILog log;
127    public ILog Log {
128      get { return log; }
129    }
130
131    [StorableConstructor]
132    public HiveExperiment(bool deserializing)
133      : base(deserializing) {
134      this.resultPollingThreads = new Dictionary<Guid, Thread>();
135      jobItems = new JobItemList();
136    }
137
138    public HiveExperiment()
139      : base(itemName, itemDescription) {
140      this.ServerUrl = HeuristicLab.Hive.Experiment.Properties.Settings.Default.HiveServerUrl;
141      this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
142      this.log = new Log();
143      pausePending = stopPending = false;
144      jobItems = new JobItemList();
145      isPollingResults = false;
146      resultPollingThreads = new Dictionary<Guid, Thread>();
147      InitTimer();
148    }
149
150    public override IDeepCloneable Clone(Cloner cloner) {
151      LogMessage("I am beeing cloned");
152      HiveExperiment clone = (HiveExperiment)base.Clone(cloner);
153      clone.resourceIds = this.resourceIds;
154      clone.serverUrl = this.serverUrl;
155      clone.experiment = (HeuristicLab.Optimization.Experiment)cloner.Clone(experiment);
156      clone.executionState = this.executionState;
157      clone.executionTime = this.executionTime;
158      clone.pendingOptimizers = new Dictionary<Guid, IOptimizer>();
159      foreach (var pair in this.pendingOptimizers)
160        clone.pendingOptimizers[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);
161      clone.log = (ILog)cloner.Clone(log);
162      clone.stopPending = this.stopPending;
163      clone.pausePending = this.pausePending;
164      clone.jobItems = (JobItemList)cloner.Clone(jobItems);
165      clone.lastUpdateTime = this.lastUpdateTime;
166      clone.isPollingResults = this.isPollingResults;
167      return clone;
168    }
169
170    [StorableHook(HookType.AfterDeserialization)]
171    private void AfterDeserialization() {
172      InitTimer();
173      this.IsPollingResults = false;
174      this.stopResultsPollingPending = false;
175      LogMessage("I was deserialized.");
176    }
177
178    private void InitTimer() {
179      timer = new System.Timers.Timer(100);
180      timer.AutoReset = true;
181      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
182    }
183
184    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
185      DateTime now = DateTime.Now;
186      ExecutionTime += now - lastUpdateTime;
187      lastUpdateTime = now;
188    }
189
190    public IEnumerable<string> ResourceGroups {
191      get {
192        if (!string.IsNullOrEmpty(resourceIds)) {
193          return resourceIds.Split(';');
194        } else {
195          return new List<string>();
196        }
197      }
198    }
199    #region IExecutable Members
200
201    [Storable]
202    private Core.ExecutionState executionState;
203    public ExecutionState ExecutionState {
204      get { return executionState; }
205      private set {
206        if (executionState != value) {
207          executionState = value;
208          OnExecutionStateChanged();
209        }
210      }
211    }
212
213    [Storable]
214    private TimeSpan executionTime;
215    public TimeSpan ExecutionTime {
216      get { return executionTime; }
217      private set {
218        if (executionTime != value) {
219          executionTime = value;
220          OnExecutionTimeChanged();
221        }
222      }
223    }
224
225    public void Pause() {
226      throw new NotSupportedException();
227    }
228
229    public void Prepare() {
230      if (experiment != null) {
231        experiment.Prepare();
232        this.ExecutionState = Core.ExecutionState.Prepared;
233        OnPrepared();
234      }
235    }
236
237    public void Start() {
238      OnStarted();
239      lastUpdateTime = DateTime.Now;
240      this.ExecutionState = Core.ExecutionState.Started;
241      Thread t = new Thread(() => {
242        IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
243
244        pendingOptimizers = new Dictionary<Guid, IOptimizer>();
245        IEnumerable<string> groups = ResourceGroups;
246
247        foreach (IOptimizer optimizer in GetOptimizers(false)) {
248          SerializedJob serializedJob = CreateSerializedJob(optimizer);
249          ResponseObject<JobDto> response = executionEngineFacade.AddJobWithGroupStrings(serializedJob, groups);
250          pendingOptimizers.Add(response.Obj.Id, optimizer);
251
252          JobItem jobItem = new JobItem() {
253            JobDto = response.Obj,
254            LatestSnapshot = new ResponseObject<SerializedJob>() {
255              Obj = serializedJob,
256              StatusMessage = "Initial Snapshot",
257              Success = true
258            }
259          };
260          jobItems.Add(jobItem);
261          jobItem.LogMessage("Job sent to Hive");
262
263          LogMessage("Sent job to Hive (jobId: " + response.Obj.Id + ")");
264        }
265       
266        // start results polling after sending sending the jobs to the server (to avoid race conflicts at the optimizers-collection)
267        StartResultPolling();
268      });
269      t.Start();
270    }
271
272    public void Stop() {
273      this.ExecutionState = Core.ExecutionState.Stopped;
274      foreach (JobItem jobItem in jobItems) {
275        AbortJob(jobItem.JobDto.Id);
276      }
277      OnStopped();
278    }
279
280    private void CreateResultPollingThreads() {
281      foreach(JobItem jobItem in JobItems) {
282        if (!resultPollingThreads.ContainsKey(jobItem.JobDto.Id)) {
283          resultPollingThreads.Add(jobItem.JobDto.Id, CreateResultPollingThread(jobItem.JobDto));
284        }
285      }
286    }
287
288    public void StartResultPolling() {
289      this.stopResultsPollingPending = false;
290      CreateResultPollingThreads();
291      foreach (Thread pollingThread in resultPollingThreads.Values) {
292        if (pollingThread.ThreadState != System.Threading.ThreadState.Running) {
293          pollingThread.Start();
294        }
295      }
296      this.IsPollingResults = true;
297    }
298
299    public void StopResultPolling() {
300      this.stopResultsPollingPending = true;
301      foreach (Thread pollingThread in resultPollingThreads.Values) {
302        pollingThread.Interrupt();
303      }
304      this.stopResultsPollingPending = false;
305    }
306
307    private JobItem GetJobItemById(Guid jobId) {
308      return jobItems.Single(x => x.JobDto.Id == jobId);
309    }
310
311    /// <summary>
312    /// Returns all optimizers in the current Experiment
313    /// </summary>
314    /// <param name="flatout">if false only top level optimizers are returned, if true the optimizer-tree is flatted</param>
315    /// <returns></returns>
316    private IEnumerable<IOptimizer> GetOptimizers(bool flatout) {
317      if (!flatout) {
318        return experiment.Optimizers;
319      } else {
320        throw new NotImplementedException();
321      }
322    }
323
324    private void ReplaceOptimizer(IOptimizer originalOptimizer, IOptimizer newOptimizer) {
325      lock (locker) {
326        int originalOptimizerIndex = experiment.Optimizers.IndexOf(originalOptimizer);
327        experiment.Optimizers[originalOptimizerIndex] = newOptimizer;
328      }
329    }
330
331    public void AbortJob(Guid jobId) {
332      IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
333      Response response = executionEngineFacade.AbortJob(jobId);
334      GetJobItemById(jobId).LogMessage("Aborting Job: " + response.StatusMessage);
335    }
336
337    #endregion
338
339    private IExecutionEngineFacade GetExecutionEngineFacade() {
340      return ServiceLocator.CreateExecutionEngineFacade(ServerUrl);
341    }
342
343    private SerializedJob CreateSerializedJob(IOptimizer optimizer) {
344      IJob job = new OptimizerJob() {
345        Optimizer = optimizer
346      };
347
348      // serialize job
349      MemoryStream memStream = new MemoryStream();
350      XmlGenerator.Serialize(job, memStream);
351      byte[] jobByteArray = memStream.ToArray();
352      memStream.Dispose();
353
354      // find out which which plugins are needed for the given object
355      List<HivePluginInfoDto> pluginsNeeded = (
356        from p in GetDeclaringPlugins(optimizer.GetType())
357        select new HivePluginInfoDto() {
358          Name = p.Name,
359          Version = p.Version
360        }).ToList();
361
362      JobDto jobDto = new JobDto() {
363        CoresNeeded = 1, // [chn] how to determine real cores needed?
364        PluginsNeeded = pluginsNeeded,
365        State = State.Offline,
366        MemoryNeeded = 0,
367        UserId = Guid.Empty // [chn] set real userid here!
368      };
369
370      SerializedJob serializedJob = new SerializedJob() {
371        JobInfo = jobDto,
372        SerializedJobData = jobByteArray
373      };
374
375      return serializedJob;
376    }
377
378    private Thread CreateResultPollingThread(JobDto job) {
379      return new Thread(() => {
380        try {
381          GetJobItemById(job.Id).LogMessage("Starting job results polling");
382          IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
383          IJob restoredObject = null;
384
385          do {
386            Thread.Sleep(resultPollingIntervalMs);
387            if (stopPending || !this.IsPollingResults) {
388              return;
389            }
390
391            ResponseObject<JobDto> response = executionEngineFacade.GetJobById(job.Id);
392            LogMessage("Response: " + response.StatusMessage + " (jobId: " + job.Id + ")");
393            GetJobItemById(job.Id).LogMessage("Response: " + response.StatusMessage);
394
395            UpdateJobItem(response.Obj);
396
397            if (response.Obj.State == State.Abort) {
398              // job is aborted, don't poll for results anymore
399              GetJobItemById(job.Id).LogMessage("Job successfully aborted");
400              return;
401            }           
402
403            // loop while
404            // 1. the user doesn't request an abort
405            // 2. there is a problem with server communication (success==false)
406            // 3. no result for the job is available yet (response.Obj==null)
407            // 4. the result that we get from the server is a snapshot and not the final result
408            if (response.Success && response.Obj != null && response.Obj.State == State.Finished) {
409              ResponseObject<SerializedJob> jobResponse = executionEngineFacade.GetLastSerializedResult(job.Id, false, false);
410              restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
411              UpdateSnapshot(jobResponse);
412            }
413          } while (restoredObject == null || restoredObject.ExecutionState != Core.ExecutionState.Stopped);
414
415          LogMessage("Job finished (jobId: " + job.Id + ")");
416          GetJobItemById(job.Id).LogMessage("Job finished");
417          // job retrieved... replace the existing optimizers with the finished one
418          IOptimizer originalOptimizer = pendingOptimizers[job.Id];
419          IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
420
421          ReplaceOptimizer(originalOptimizer, restoredOptimizer);
422          pendingOptimizers.Remove(job.Id);
423
424          if (pendingOptimizers.Count == 0) {
425            // finished
426            this.ExecutionState = Core.ExecutionState.Stopped;
427            OnStopped();
428          }
429        } catch (ThreadInterruptedException exception) {
430
431        } finally {
432          GetJobItemById(job.Id).LogMessage("ResultsPolling Thread stopped");
433          resultPollingThreads.Remove(job.Id);
434          if (resultPollingThreads.Count == 0) {
435            IsPollingResults = false;
436          }
437        }
438      });
439    }
440
441    private void UpdateJobItem(JobDto jobDto) {
442      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id);
443      jobItem.JobDto = jobDto;
444    }
445
446    private void UpdateSnapshot(ResponseObject<SerializedJob> response) {
447      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id);
448      jobItem.LatestSnapshot = response;
449    }
450
451    private void LogMessage(string message) {
452      // HeuristicLab.Log is not Thread-Safe, so lock every call
453      lock (locker) {
454        log.LogMessage(message);
455      }
456    }
457
458    #region Required Plugin Search
459    /// <summary>
460    /// Returns a list of plugins in which the type itself and all members
461    /// of the type are declared. Objectgraph is searched recursively.
462    /// </summary>
463    private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {
464      HashSet<Type> types = new HashSet<Type>();
465      FindTypes(type, types, "HeuristicLab.");
466      return GetDeclaringPlugins(types);
467    }
468
469    /// <summary>
470    /// Returns the plugins (including dependencies) in which the given types are declared
471    /// </summary>
472    private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) {
473      HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>();
474      foreach (Type t in types) {
475        FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins);
476      }
477      return plugins;
478    }
479
480    /// <summary>
481    /// Finds the dependencies of the given plugin and adds it to the plugins hashset.
482    /// Also searches the dependencies recursively.
483    /// </summary>
484    private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) {
485      if (!plugins.Contains(plugin)) {
486        plugins.Add(plugin);
487        foreach (IPluginDescription dependency in plugin.Dependencies) {
488          FindDeclaringPlugins(dependency, plugins);
489        }
490      }
491    }
492
493    /// <summary>
494    /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart
495    /// Be aware that search is not performed on attributes
496    /// </summary>
497    /// <param name="type">the type to be searched</param>
498    /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>
499    /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>
500    private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {
501      if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {
502        types.Add(type);
503
504        // constructors
505        foreach (ConstructorInfo info in type.GetConstructors()) {
506          foreach (ParameterInfo paramInfo in info.GetParameters()) {
507            FindTypes(paramInfo.ParameterType, types, namespaceStart);
508          }
509        }
510
511        // interfaces
512        foreach (Type t in type.GetInterfaces()) {
513          FindTypes(t, types, namespaceStart);
514        }
515
516        // events
517        foreach (EventInfo info in type.GetEvents()) {
518          FindTypes(info.EventHandlerType, types, namespaceStart);
519          FindTypes(info.DeclaringType, types, namespaceStart);
520        }
521
522        // properties
523        foreach (PropertyInfo info in type.GetProperties()) {
524          FindTypes(info.PropertyType, types, namespaceStart);
525        }
526
527        // fields
528        foreach (FieldInfo info in type.GetFields()) {
529          FindTypes(info.FieldType, types, namespaceStart);
530        }
531
532        // methods
533        foreach (MethodInfo info in type.GetMethods()) {
534          foreach (ParameterInfo paramInfo in info.GetParameters()) {
535            FindTypes(paramInfo.ParameterType, types, namespaceStart);
536          }
537          FindTypes(info.ReturnType, types, namespaceStart);
538        }
539      }
540    }
541    #endregion
542
543    #region Eventhandler
544
545    public event EventHandler ExecutionTimeChanged;
546    private void OnExecutionTimeChanged() {
547      EventHandler handler = ExecutionTimeChanged;
548      if (handler != null) handler(this, EventArgs.Empty);
549    }
550
551    public event EventHandler ExecutionStateChanged;
552    private void OnExecutionStateChanged() {
553      LogMessage("ExecutionState changed to " + executionState.ToString());
554      EventHandler handler = ExecutionStateChanged;
555      if (handler != null) handler(this, EventArgs.Empty);
556    }
557
558    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
559
560    public event EventHandler Started;
561    private void OnStarted() {
562      LogMessage("Started");
563      timer.Start();
564      EventHandler handler = Started;
565      if (handler != null) handler(this, EventArgs.Empty);
566    }
567
568    public event EventHandler Stopped;
569    private void OnStopped() {
570      timer.Stop();
571      LogMessage("Stopped");
572      EventHandler handler = Stopped;
573      if (handler != null) handler(this, EventArgs.Empty);
574    }
575
576    public event EventHandler Paused;
577    private void OnPaused() {
578      timer.Stop();
579      LogMessage("Paused");
580      EventHandler handler = Paused;
581      if (handler != null) handler(this, EventArgs.Empty);
582    }
583
584    public event EventHandler Prepared;
585    protected virtual void OnPrepared() {
586      LogMessage("Prepared");
587      EventHandler handler = Prepared;
588      if (handler != null) handler(this, EventArgs.Empty);
589    }
590
591    public event EventHandler ResourceIdsChanged;
592    protected virtual void OnResourceIdsChanged() {
593      EventHandler handler = ResourceIdsChanged;
594      if (handler != null) handler(this, EventArgs.Empty);
595    }
596
597    public event EventHandler ExperimentChanged;
598    protected virtual void OnExperimentChanged() {
599      LogMessage("Experiment changed");
600      EventHandler handler = ExperimentChanged;
601      if (handler != null) handler(this, EventArgs.Empty);
602    }
603
604    public event EventHandler ServerUrlChanged;
605    protected virtual void OnServerUrlChanged() {
606      EventHandler handler = ServerUrlChanged;
607      if (handler != null) handler(this, EventArgs.Empty);
608    }
609
610    public event EventHandler IsResultsPollingChanged;
611    private void OnIsPollingResultsChanged() {
612      if (this.IsPollingResults) {
613        LogMessage("Results Polling Started");
614        timer.Start();
615      } else {
616        LogMessage("Results Polling Stopped");
617        timer.Stop();
618      }
619      EventHandler handler = IsResultsPollingChanged;
620      if (handler != null) handler(this, EventArgs.Empty);
621    }
622    #endregion
623  }
624}
Note: See TracBrowser for help on using the repository browser.