Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.DatastreamAnalysis/HeuristicLab.DatastreamAnalysis/3.4/DatastreamAnalysisOptimizer.cs @ 14547

Last change on this file since 14547 was 14547, checked in by jzenisek, 7 years ago

#2719 added type and view for representing dictionary<string,double> as bar chart (implementation in progress)

File size: 21.7 KB
Line 
1#region License Information
2
3/* HeuristicLab
4 * Copyright (C) 2002-2016 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
5 *
6 * This file is part of HeuristicLab.
7 *
8 * HeuristicLab is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
12 *
13 * HeuristicLab is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
20 */
21
22#endregion
23
24using System;
25using System.Collections.Generic;
26using System.ComponentModel;
27using System.Diagnostics;
28using System.Drawing;
29using System.Linq;
30using System.Threading;
31using System.Threading.Tasks;
32using HeuristicLab.Analysis;
33using HeuristicLab.Collections;
34using HeuristicLab.Common;
35using HeuristicLab.Core;
36using HeuristicLab.Data;
37using HeuristicLab.Optimization;
38using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
39using HeuristicLab.Problems.DataAnalysis;
40
41namespace HeuristicLab.DatastreamAnalysis {
42  internal enum DatastreamAnalysisOptimizerAction {
43    None,
44    Prepare,
45    Start,
46    Stop,
47    Pause
48  }
49
50  [StorableClass]
51  [Item("DatastreamAnalysis Optimizer",
52     "The main loop for evaluating ensemble models against a incoming datastream of time series fashion.")]
53  [Creatable(CreatableAttribute.Categories.Algorithms)]
54  public class DatastreamAnalysisOptimizer : Executable, IOptimizer, IStorableContent {
55    #region properties
56    public string Filename { get; set; }
57
58    private DatastreamAnalysisOptimizerAction daoAction;
59
60    public IEnumerable<IOptimizer> NestedOptimizers { get; }
61
62
63    [Storable]
64    protected ILog log;
65    public ILog Log {
66      get { return log; }
67    }
68
69    [Storable]
70    private ResultCollection results;
71
72    public ResultCollection Results {
73      get { return results; }
74    }
75
76    private CancellationTokenSource cancellationTokenSource;
77    private bool stopPending;
78    private DateTime lastUpdateTime;
79    private bool prepared;
80    private bool finished;
81
82    [Storable]
83    protected int runsCounter;
84
85    [Storable]
86    private RunCollection runs;
87
88    public RunCollection Runs
89    {
90      get { return runs; }
91      protected set
92      {
93        if (value == null) throw new ArgumentNullException();
94        if (runs != value) {
95          if (runs != null) DeregisterRunsEvents();
96          runs = value;
97          if (runs != null) RegisterRunsEvents();
98        }
99      }
100    }
101
102
103    [Storable]
104    private IItemList<RegressionEnsembleModel> ensembles;
105
106    public IItemList<RegressionEnsembleModel> Ensembles {
107      get { return ensembles; }
108      set {
109        if (value == null || value == ensembles)
110          return;
111        if(!(value is IRegressionEnsembleModel)) throw new ArgumentException("Invaid ensemble model type");
112        DeregisterEnsembleEvents();
113        ensembles = value;
114        RegisterEnsembleEvents();
115        OnEnsemblesChanged();
116        Prepare();
117      }
118    }
119
120
121    // VAR 1: datastream ~= problem data, VAR 2 (TODO): datastream = external source e.g. webservice, AMQP-Queue, etc.
122    [Storable]
123    private Datastream datastream;
124
125    public Datastream Datastream {
126      get { return datastream; }
127      set {
128        if (value == null || value == datastream)
129          return;
130        if(!(value is IDatastream)) throw new ArgumentException("Invalid datastream type");
131        DeregisterDatastreamEvents();
132        datastream = value;
133        RegisterDatastreamEvents();
134        OnDatastreamChanged();
135        Prepare();
136      }
137    }
138
139    #endregion properties
140
141    #region results properties
142    private int ResultsSlidingWindowMovements {
143      get { return ((IntValue)Results["Sliding Window Movements"].Value).Value; }
144      set { ((IntValue)Results["Sliding Window Movements"].Value).Value = value; }
145    }
146    private DataTable ResultsQualities {
147      get { return ((DataTable)Results["Qualities"].Value); }
148    }
149
150    private const string ResultsQualitiesMSE = "Mean squared error";
151    private const string ResultsQualitiesPR2 = "Pearson R²";
152
153    private DataTable ResultsTargets {
154      get { return ((DataTable) Results["Targets"].Value); }
155    }
156
157    private const string ResultsTargetsReal = "Real";
158
159    private DataBarSet ResultsQualitiesBars {
160      get { return (DataBarSet) Results["Ensemble"].Value; }
161    }
162
163    protected void SetupResults() {
164      Results.Clear();
165
166      Results.Add(new Result("Sliding Window Movements", new IntValue(0)));
167      Results.Add(new Result("Qualities", new DataTable("Qualities")));
168      Results.Add(new Result("Targets", new DataTable("Targets")));
169      Results.Add(new Result("Ensemble Comparison", new DataBarSet("Ensemble Comparison")));
170
171      ResultsTargets.Rows.Add(new DataRow(ResultsTargetsReal));
172      foreach (var ensemble in Ensembles) {
173        // targets table
174        ResultsTargets.Rows.Add(new DataRow(ensemble.Name));
175
176        // qualities (series)
177        //ResultsQualities.Rows.Add(new DataRow(ensemble.Name + " - " + ResultsQualitiesMSE));
178        ResultsQualities.Rows.Add(new DataRow(ensemble.Name + " - " + ResultsQualitiesPR2));
179
180        // qualities (bars)
181        ResultsQualitiesBars.Bars.Add(new StringValue(ensemble.Name), new DoubleValue(0.0));
182      }
183    }
184
185    #endregion
186
187    #region constructors, cloner,...
188    public DatastreamAnalysisOptimizer() : base() {
189      name = "Datastream Analysis";
190      log = new Log();
191      results = new ResultCollection();
192      ensembles = new ItemList<RegressionEnsembleModel>();
193      datastream = new Datastream();
194      runsCounter = 0;
195      runs = new RunCollection();
196      Initialize();
197    }
198
199    [StorableConstructor]
200    protected DatastreamAnalysisOptimizer(bool deserializing) : base(deserializing) {
201    }
202
203    [StorableHook(HookType.AfterDeserialization)]
204    private void AfterDeserialization() {
205      Initialize();
206    }
207
208    protected DatastreamAnalysisOptimizer(DatastreamAnalysisOptimizer original, Cloner cloner) : base(original, cloner) {
209      name = original.name;
210      log = cloner.Clone(original.log);
211      results = cloner.Clone(original.results);
212      ensembles = (ItemList<RegressionEnsembleModel>) original.Ensembles.Clone(cloner);
213      datastream = (Datastream) original.Datastream.Clone(cloner);
214      runsCounter = original.runsCounter;
215      runs = cloner.Clone(original.runs);
216      Initialize();
217    }
218
219    public override IDeepCloneable Clone(Cloner cloner) {
220      return new DatastreamAnalysisOptimizer(this, cloner);
221    }
222
223    private void Initialize() {
224      if (runs != null) RegisterRunsEvents();
225      if (datastream != null) RegisterDatastreamEvents();
226      if (ensembles != null) RegisterEnsembleEvents();
227    }
228    #endregion
229
230    #region control actions
231
232    public override void Prepare() {
233      if (ensembles == null || ensembles.Count == 0 || datastream == null || !datastream.SlidingWindowEvaluationPossible) return;
234      //if (ensembles.SelectMany(x => x.Models).Count() == 0) return;
235      base.Prepare();
236      OnPrepared();
237    }
238
239    public void Prepare(bool clearRuns) {
240      if (ensembles == null || ensembles.Count == 0 || datastream == null || !datastream.SlidingWindowEvaluationPossible) return;
241
242      base.Prepare();
243      if (clearRuns) runs.Clear();
244      OnPrepared();
245    }
246
247    public override void Start() {
248      if (ensembles == null || datastream == null) return;
249      base.Start();
250      cancellationTokenSource = new CancellationTokenSource();
251      stopPending = false;
252
253      if (prepared) {
254        SetupResults();
255        Datastream.InitializeState();
256      }
257
258      Task task = Task.Factory.StartNew(Run, cancellationTokenSource.Token, cancellationTokenSource.Token);
259      task.ContinueWith(t => {
260        try {
261          t.Wait();
262        }
263        catch (AggregateException ex) {
264          try {
265            ex.Flatten().Handle(x => x is OperationCanceledException);
266          } catch (AggregateException remaining) {
267            if(remaining.InnerExceptions.Count == 1) OnExceptionOccurred(remaining.InnerExceptions[0]);
268            else OnExceptionOccurred(remaining);
269          }
270        }
271        cancellationTokenSource.Dispose();
272        cancellationTokenSource = null;
273
274        // handle stop/pause
275        if (stopPending || finished) {
276          OnStopped();
277        } else {
278          OnPaused();
279        }
280      });
281    }
282
283    public override void Pause() {
284      if (ensembles == null || datastream == null) return;
285      base.Pause();
286      cancellationTokenSource.Cancel();
287    }
288
289    public override void Stop() {
290      if (ensembles == null || datastream == null) return;
291      base.Stop();
292      if (ExecutionState == ExecutionState.Paused) {
293        OnStopped();
294      } else {
295        stopPending = true;
296        cancellationTokenSource.Cancel();
297      }
298    }
299
300    protected override void OnPrepared() {
301      ExecutionTime = TimeSpan.Zero;
302      foreach (IStatefulItem statefulItem in this.GetObjectGraphObjects(new HashSet<object>() {Runs}).OfType<IStatefulItem>()) {
303        statefulItem.InitializeState();
304      }
305      results.Clear();
306      prepared = true;
307      finished = false;
308      Log.LogMessage("Datastream analysis prepared");
309      base.OnPrepared();
310    }
311
312    protected override void OnStarted() {
313      Log.LogMessage("Datastream analysis started");
314      base.OnStarted();
315    }
316
317    protected override void OnPaused() {
318      Log.LogMessage("Datastream analysis paused");
319      base.OnPaused();
320    }
321
322    protected override void OnStopped() {
323      try {
324        runsCounter++;
325        var run = new Run();
326        run.Filename = Filename;
327        run.Name = string.Format("{0} Run {1}", Name, runsCounter);
328        CollectParameterValues(run.Parameters);
329        CollectResultValues(run.Results);
330        runs.Add(run);
331      }
332      finally {
333        Log.LogMessage("Datastream analysis stopped");
334        base.OnStopped();
335      }
336    }
337
338    #endregion
339
340    #region run
341    private void Run(object state) {
342      CancellationToken cancellationToken = (CancellationToken) state;
343      OnStarted();
344      lastUpdateTime = DateTime.UtcNow;
345      System.Timers.Timer timer = new System.Timers.Timer(250);
346      timer.AutoReset = true;
347      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
348      timer.Start();
349
350      try {
351        Run(cancellationToken);
352      }
353      finally {
354        timer.Elapsed -= new System.Timers.ElapsedEventHandler(timer_Elapsed);
355        timer.Stop();
356        ExecutionTime += DateTime.UtcNow - lastUpdateTime;
357      }
358
359      cancellationToken.ThrowIfCancellationRequested();
360    }
361
362    private int replayedIndex;
363
364    protected void Run(CancellationToken cancellationToken) {
365
366      if (prepared) {
367        replayedIndex = 0;
368        prepared = false;
369      }
370
371      try {
372
373        // play and evaluate initial window
374        PlayDatastream();
375        if(Datastream.SlidingWindowEvaluationPossible) Evaluate();
376        replayedIndex = Datastream.FitnessPartition.End;
377
378        do {
379          while(Datastream.SlidingWindowMovementPossible) {
380            cancellationToken.ThrowIfCancellationRequested();
381
382            // perform (delayed) window movement
383            Thread.Sleep(Datastream.SlidingWindowMovementInterval.Value);
384            Datastream.MoveSlidingWindow();
385            ResultsSlidingWindowMovements++;
386
387            // play and evaluate the moved window
388            PlayDatastream();
389            if (Datastream.SlidingWindowEvaluationPossible) Evaluate();
390            replayedIndex = Datastream.FitnessPartition.End;
391          }
392        } while (Datastream.UpdateAvailable);
393        finished = true;
394      }
395      catch (Exception ex) {
396        if (ex is ArgumentOutOfRangeException) throw ex;
397        if (ex is OperationCanceledException) throw ex;
398      }
399      finally {
400        // reset everything
401        //Prepare(true);
402      }
403    }
404
405    private void PlayDatastream() {
406      var problemData = Datastream.ProblemData;
407      var targetVarName = problemData.TargetVariable;
408
409      for (int i = replayedIndex; i < Datastream.FitnessPartition.End; i++) {
410        var realValue = problemData.Dataset.GetDoubleValue(targetVarName, i);
411        ResultsTargets.Rows[ResultsTargetsReal].Values.Add(realValue);
412      }
413    }
414
415    private void Evaluate() {
416      var problemData = Datastream.ProblemData;
417      var dataset = problemData.Dataset;
418      var targetVarName = problemData.TargetVariable;
419
420      var realRows = Enumerable.Range(Datastream.FitnessPartition.Start, Datastream.FitnessPartition.Size).ToList();
421      var realValues = dataset.GetDoubleValues(targetVarName, realRows);
422
423      foreach (var ensemble in Ensembles) {
424
425        var rows = Enumerable.Range(Datastream.FitnessPartition.Start, Datastream.FitnessPartition.Size);
426        var estimatedValuesPerModelPerRow = ensemble.Models.Select(x => x.GetEstimatedValues(datastream.ProblemData.Dataset, rows).ToArray());
427        var estimatedValues = Enumerable.Range(0, Datastream.FitnessPartition.Size).Select(r => estimatedValuesPerModelPerRow.Select(e => e[r]).Average()).ToArray(); // per row
428        var averageEstimatedValuesPerModel = estimatedValuesPerModelPerRow.Select(x => x.Average()); // per model
429        var averageEstimatedValue = averageEstimatedValuesPerModel.Average();
430
431        // determine quality
432        var mse = Math.Pow(averageEstimatedValue - realValues.Average(), 2);
433        OnlineCalculatorError error;
434        var pR = OnlinePearsonsRCalculator.Calculate(estimatedValues, realValues, out error);
435        var pR2 = error == OnlineCalculatorError.None ? pR * pR : 0.0;
436
437        for (int i = replayedIndex; i < Datastream.FitnessPartition.End; i++) {
438          ResultsTargets.Rows[ensemble.Name].Values.Add(averageEstimatedValue);
439
440          //ResultsQualities.Rows[ensemble.Name + " - " + ResultsQualitiesMSE].Values.Add(mse);
441          ResultsQualities.Rows[ensemble.Name + " - " + ResultsQualitiesPR2].Values.Add(pR2);
442          ResultsQualitiesBars.Bars[new StringValue(ensemble.ItemName)].Value = pR2;
443          ResultsQualitiesBars.Bars[new StringValue(ensemble.ItemName)] = new DoubleValue(pR2);
444          //ResultsQualitiesBars.Bars[new StringValue(ensemble.ItemName)]
445        }
446      }
447    }
448
449    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
450      System.Timers.Timer timer = (System.Timers.Timer)sender;
451      timer.Enabled = false;
452      DateTime now = DateTime.UtcNow;
453      ExecutionTime += now - lastUpdateTime;
454      lastUpdateTime = now;
455      timer.Enabled = true;
456    }
457
458    public void CollectParameterValues(IDictionary<string, IItem> values) {
459      values.Add("Datastream Analysis Name", new StringValue(Name));
460      if (Datastream != null) {
461        Datastream.CollectParameterValues(values);
462        values.Add("Datastream Name", new StringValue(Datastream.Name));
463      }
464    }
465
466    public void CollectResultValues(IDictionary<string, IItem> values) {
467      values.Add("Execution Time", new TimeSpanValue(ExecutionTime));
468      Results.CollectResultValues(values);
469    }
470    #endregion
471
472    #region events
473
474    #region events registration
475
476    public EventHandler EnsemblesChanged;
477    public EventHandler DatastreamChanged;
478
479    protected virtual void DeregisterRunsEvents() {
480      runs.CollectionReset -= new CollectionItemsChangedEventHandler<IRun>(Runs_CollectionReset);
481    }
482
483    protected virtual void RegisterRunsEvents() {
484      runs.CollectionReset += new CollectionItemsChangedEventHandler<IRun>(Runs_CollectionReset);
485    }
486
487    protected virtual void RegisterDatastreamEvents() {
488      datastream.Reset += new EventHandler(Datastream_Reset);
489      datastream.ProblemDataChanged += new EventHandler(Datastream_ProblemDataChanged);
490    }
491
492    protected virtual void DeregisterDatastreamEvents() {
493      datastream.Reset -= new EventHandler(Datastream_Reset);
494      datastream.ProblemDataChanged -= new EventHandler(Datastream_ProblemDataChanged);
495    }
496
497    protected virtual void RegisterEnsembleEvents() {
498      ensembles.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
499      ensembles.ItemsMoved += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
500      ensembles.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
501      ensembles.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
502      ensembles.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_Reset);
503    }
504
505    protected virtual void DeregisterEnsembleEvents() {
506      ensembles.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
507      ensembles.ItemsMoved -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
508      ensembles.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
509      ensembles.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
510      ensembles.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_Reset);
511    }
512    #endregion
513
514    #region event handling
515
516    protected virtual void Runs_CollectionReset(object sender, CollectionItemsChangedEventArgs<IRun> e) {
517      runsCounter = runs.Count;
518    }
519
520    protected virtual void Datastream_Reset(object sender, EventArgs e) {
521      Prepare();
522    }
523
524    protected virtual void Datastream_ProblemDataChanged(object sender, EventArgs e) {
525      Prepare();
526    }
527
528    protected virtual void Ensembles_Reset(object sender, EventArgs e) {
529      Prepare();
530    }
531
532    protected virtual void Ensembles_ItemsChanged(object sender, EventArgs e) {
533      Prepare();
534    }
535
536    private void OnEnsemblesChanged() {
537      var changed = EnsemblesChanged;
538      if (changed != null)
539        changed(this, EventArgs.Empty);
540    }
541
542    private void OnDatastreamChanged() {
543      var changed = DatastreamChanged;
544      if (changed != null)
545        changed(this, EventArgs.Empty);
546    }
547
548    #endregion
549
550    #endregion
551
552    #region NamedItem
553
554    [Storable]
555    protected string name;
556
557    /// <inheritdoc/>
558    /// <remarks>Calls <see cref="OnNameChanging"/> and also <see cref="OnNameChanged"/>
559    /// eventually in the setter.</remarks>
560    public string Name {
561      get { return name; }
562      set {
563        if (!CanChangeName) throw new NotSupportedException("Name cannot be changed.");
564        if (!(name.Equals(value) || (value == null) && (name == string.Empty))) {
565          CancelEventArgs<string> e = value == null
566            ? new CancelEventArgs<string>(string.Empty)
567            : new CancelEventArgs<string>(value);
568          OnNameChanging(e);
569          if (!e.Cancel) {
570            name = value == null ? string.Empty : value;
571            OnNameChanged();
572          }
573        }
574      }
575    }
576
577    public virtual bool CanChangeName {
578      get { return true; }
579    }
580
581    [Storable] protected string description;
582
583    public string Description {
584      get { return description; }
585      set {
586        if (!CanChangeDescription) throw new NotSupportedException("Description cannot be changed.");
587        if (!(description.Equals(value) || (value == null) && (description == string.Empty))) {
588          description = value == null ? string.Empty : value;
589          OnDescriptionChanged();
590        }
591      }
592    }
593
594    public virtual bool CanChangeDescription {
595      get { return true; }
596    }
597
598    /// <summary>
599    /// Gets the string representation of the current instance in the format: <c>Name: [null|Value]</c>.
600    /// </summary>
601    /// <returns>The current instance as a string.</returns>
602    public override string ToString() {
603      return Name;
604    }
605
606    /// <inheritdoc/>
607    public event EventHandler<CancelEventArgs<string>> NameChanging;
608
609    /// <summary>
610    /// Fires a new <c>NameChanging</c> event.
611    /// </summary>
612    /// <param name="e">The event arguments of the changing.</param>
613    protected virtual void OnNameChanging(CancelEventArgs<string> e) {
614      var handler = NameChanging;
615      if (handler != null) handler(this, e);
616    }
617
618    /// <inheritdoc/>
619    public event EventHandler NameChanged;
620
621    /// <summary>
622    /// Fires a new <c>NameChanged</c> event.
623    /// </summary>
624    /// <remarks>Calls <see cref="ItemBase.OnChanged"/>.</remarks>
625    protected virtual void OnNameChanged() {
626      var handler = NameChanged;
627      if (handler != null) handler(this, EventArgs.Empty);
628      OnToStringChanged();
629    }
630
631    /// <inheritdoc/>
632    public event EventHandler DescriptionChanged;
633
634    /// <summary>
635    /// Fires a new <c>DescriptionChanged</c> event.
636    /// </summary>
637    /// <remarks>Calls <see cref="ItemBase.OnChanged"/>.</remarks>
638    protected virtual void OnDescriptionChanged() {
639      var handler = DescriptionChanged;
640      if (handler != null) handler(this, EventArgs.Empty);
641    }
642
643    #endregion nameditem
644  }
645}
Note: See TracBrowser for help on using the repository browser.