source: branches/HeuristicLab.DatastreamAnalysis/HeuristicLab.DatastreamAnalysis/3.4/DatastreamAnalysisOptimizer.cs @ 14588

Last change on this file since 14588 was 14588, checked in by jzenisek, 5 years ago

#2719 updated databar set and view

File size: 21.7 KB
Line 
1#region License Information
2
3/* HeuristicLab
4 * Copyright (C) 2002-2016 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
5 *
6 * This file is part of HeuristicLab.
7 *
8 * HeuristicLab is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
12 *
13 * HeuristicLab is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
20 */
21
22#endregion
23
24using System;
25using System.Collections.Generic;
26using System.ComponentModel;
27using System.Diagnostics;
28using System.Drawing;
29using System.Linq;
30using System.Threading;
31using System.Threading.Tasks;
32using HeuristicLab.Analysis;
33using HeuristicLab.Collections;
34using HeuristicLab.Common;
35using HeuristicLab.Core;
36using HeuristicLab.Data;
37using HeuristicLab.Optimization;
38using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
39using HeuristicLab.Problems.DataAnalysis;
40
41namespace HeuristicLab.DatastreamAnalysis {
42  internal enum DatastreamAnalysisOptimizerAction {
43    None,
44    Prepare,
45    Start,
46    Stop,
47    Pause
48  }
49
50  [StorableClass]
51  [Item("DatastreamAnalysis Optimizer",
52     "The main loop for evaluating ensemble models against a incoming datastream of time series fashion.")]
53  [Creatable(CreatableAttribute.Categories.Algorithms)]
54  public class DatastreamAnalysisOptimizer : Executable, IOptimizer, IStorableContent {
55    #region properties
56    public string Filename { get; set; }
57
58    private DatastreamAnalysisOptimizerAction daoAction;
59
60    public IEnumerable<IOptimizer> NestedOptimizers { get; }
61
62
63    [Storable]
64    protected ILog log;
65    public ILog Log {
66      get { return log; }
67    }
68
69    [Storable]
70    private ResultCollection results;
71
72    public ResultCollection Results {
73      get { return results; }
74    }
75
76    private CancellationTokenSource cancellationTokenSource;
77    private bool stopPending;
78    private DateTime lastUpdateTime;
79    private bool prepared;
80    private bool finished;
81
82    [Storable]
83    protected int runsCounter;
84
85    [Storable]
86    private RunCollection runs;
87
88    public RunCollection Runs
89    {
90      get { return runs; }
91      protected set
92      {
93        if (value == null) throw new ArgumentNullException();
94        if (runs != value) {
95          if (runs != null) DeregisterRunsEvents();
96          runs = value;
97          if (runs != null) RegisterRunsEvents();
98        }
99      }
100    }
101
102
103    [Storable]
104    private IItemList<RegressionEnsembleModel> ensembles;
105
106    public IItemList<RegressionEnsembleModel> Ensembles {
107      get { return ensembles; }
108      set {
109        if (value == null || value == ensembles)
110          return;
111        if(!(value is IRegressionEnsembleModel)) throw new ArgumentException("Invaid ensemble model type");
112        DeregisterEnsembleEvents();
113        ensembles = value;
114        RegisterEnsembleEvents();
115        OnEnsemblesChanged();
116        Prepare();
117      }
118    }
119
120
121    // VAR 1: datastream ~= problem data, VAR 2 (TODO): datastream = external source e.g. webservice, AMQP-Queue, etc.
122    [Storable]
123    private Datastream datastream;
124
125    public Datastream Datastream {
126      get { return datastream; }
127      set {
128        if (value == null || value == datastream)
129          return;
130        if(!(value is IDatastream)) throw new ArgumentException("Invalid datastream type");
131        DeregisterDatastreamEvents();
132        datastream = value;
133        RegisterDatastreamEvents();
134        OnDatastreamChanged();
135        Prepare();
136      }
137    }
138
139    #endregion properties
140
141    #region results properties
142    private int ResultsSlidingWindowMovements {
143      get { return ((IntValue)Results["Sliding Window Movements"].Value).Value; }
144      set { ((IntValue)Results["Sliding Window Movements"].Value).Value = value; }
145    }
146    private DataTable ResultsQualities {
147      get { return ((DataTable)Results["Qualities"].Value); }
148    }
149
150    private const string ResultsQualitiesMSE = "Mean squared error";
151    private const string ResultsQualitiesPR2 = "Pearson R²";
152
153    private DataTable ResultsTargets {
154      get { return ((DataTable) Results["Targets"].Value); }
155    }
156
157    private const string ResultsTargetsReal = "Real";
158
159    private DataBarSet ResultsQualitiesBars {
160      get { return (DataBarSet) Results["Ensemble Comparison"].Value; }
161    }
162
163    protected void SetupResults() {
164      Results.Clear();
165
166      Results.Add(new Result("Sliding Window Movements", new IntValue(0)));
167      Results.Add(new Result("Qualities", new DataTable("Pearson R²")));
168      Results.Add(new Result("Targets", new DataTable("Targets")));
169      Results.Add(new Result("Ensemble Comparison", new DataBarSet("Ensemble Comparison")));
170
171      ResultsTargets.Rows.Add(new DataRow(ResultsTargetsReal));
172      foreach (var ensemble in Ensembles) {
173        // targets table
174        ResultsTargets.Rows.Add(new DataRow(ensemble.Name));
175
176        // qualities (series)
177        //ResultsQualities.Rows.Add(new DataRow(ensemble.Name + " - " + ResultsQualitiesMSE));
178        ResultsQualities.Rows.Add(new DataRow(ensemble.Name + " - " + ResultsQualitiesPR2));
179
180        // qualities (bars)
181        ResultsQualitiesBars.Bars.Add(new DataBar(ensemble.Name, 0.9));
182      }
183    }
184
185    #endregion
186
187    #region constructors, cloner,...
188    public DatastreamAnalysisOptimizer() : base() {
189      name = "Datastream Analysis";
190      log = new Log();
191      results = new ResultCollection();
192      ensembles = new ItemList<RegressionEnsembleModel>();
193      datastream = new Datastream();
194      runsCounter = 0;
195      runs = new RunCollection();
196      Initialize();
197    }
198
199    [StorableConstructor]
200    protected DatastreamAnalysisOptimizer(bool deserializing) : base(deserializing) {
201    }
202
203    [StorableHook(HookType.AfterDeserialization)]
204    private void AfterDeserialization() {
205      Initialize();
206    }
207
208    protected DatastreamAnalysisOptimizer(DatastreamAnalysisOptimizer original, Cloner cloner) : base(original, cloner) {
209      name = original.name;
210      log = cloner.Clone(original.log);
211      results = cloner.Clone(original.results);
212      ensembles = (ItemList<RegressionEnsembleModel>) original.Ensembles.Clone(cloner);
213      datastream = (Datastream) original.Datastream.Clone(cloner);
214      runsCounter = original.runsCounter;
215      runs = cloner.Clone(original.runs);
216      Initialize();
217    }
218
219    public override IDeepCloneable Clone(Cloner cloner) {
220      return new DatastreamAnalysisOptimizer(this, cloner);
221    }
222
223    private void Initialize() {
224      if (runs != null) RegisterRunsEvents();
225      if (datastream != null) RegisterDatastreamEvents();
226      if (ensembles != null) RegisterEnsembleEvents();
227    }
228    #endregion
229
230    #region control actions
231
232    public override void Prepare() {
233      if (ensembles == null || ensembles.Count == 0 || datastream == null || !datastream.SlidingWindowEvaluationPossible) return;
234      //if (ensembles.SelectMany(x => x.Models).Count() == 0) return;
235      base.Prepare();
236      OnPrepared();
237    }
238
239    public void Prepare(bool clearRuns) {
240      if (ensembles == null || ensembles.Count == 0 || datastream == null || !datastream.SlidingWindowEvaluationPossible) return;
241
242      base.Prepare();
243      if (clearRuns) runs.Clear();
244      OnPrepared();
245    }
246
247    public override void Start() {
248      if (ensembles == null || datastream == null) return;
249      base.Start();
250      cancellationTokenSource = new CancellationTokenSource();
251      stopPending = false;
252
253      if (prepared) {
254        SetupResults();
255        Datastream.InitializeState();
256      }
257
258      Task task = Task.Factory.StartNew(Run, cancellationTokenSource.Token, cancellationTokenSource.Token);
259      task.ContinueWith(t => {
260        try {
261          t.Wait();
262        }
263        catch (AggregateException ex) {
264          try {
265            ex.Flatten().Handle(x => x is OperationCanceledException);
266          } catch (AggregateException remaining) {
267            if(remaining.InnerExceptions.Count == 1) OnExceptionOccurred(remaining.InnerExceptions[0]);
268            else OnExceptionOccurred(remaining);
269          }
270        }
271        cancellationTokenSource.Dispose();
272        cancellationTokenSource = null;
273
274        // handle stop/pause
275        if (stopPending || finished) {
276          OnStopped();
277        } else {
278          OnPaused();
279        }
280      });
281    }
282
283    public override void Pause() {
284      if (ensembles == null || datastream == null) return;
285      base.Pause();
286      cancellationTokenSource.Cancel();
287    }
288
289    public override void Stop() {
290      if (ensembles == null || datastream == null) return;
291      base.Stop();
292      if (ExecutionState == ExecutionState.Paused) {
293        OnStopped();
294      } else {
295        stopPending = true;
296        cancellationTokenSource.Cancel();
297      }
298    }
299
300    protected override void OnPrepared() {
301      ExecutionTime = TimeSpan.Zero;
302      foreach (IStatefulItem statefulItem in this.GetObjectGraphObjects(new HashSet<object>() {Runs}).OfType<IStatefulItem>()) {
303        statefulItem.InitializeState();
304      }
305      results.Clear();
306      prepared = true;
307      finished = false;
308      Log.LogMessage("Datastream analysis prepared");
309      base.OnPrepared();
310    }
311
312    protected override void OnStarted() {
313      Log.LogMessage("Datastream analysis started");
314      base.OnStarted();
315    }
316
317    protected override void OnPaused() {
318      Log.LogMessage("Datastream analysis paused");
319      base.OnPaused();
320    }
321
322    protected override void OnStopped() {
323      try {
324        runsCounter++;
325        var run = new Run();
326        run.Filename = Filename;
327        run.Name = string.Format("{0} Run {1}", Name, runsCounter);
328        CollectParameterValues(run.Parameters);
329        CollectResultValues(run.Results);
330        runs.Add(run);
331      }
332      finally {
333        Log.LogMessage("Datastream analysis stopped");
334        base.OnStopped();
335      }
336    }
337
338    #endregion
339
340    #region run
341    private void Run(object state) {
342      CancellationToken cancellationToken = (CancellationToken) state;
343      OnStarted();
344      lastUpdateTime = DateTime.UtcNow;
345      System.Timers.Timer timer = new System.Timers.Timer(250);
346      timer.AutoReset = true;
347      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
348      timer.Start();
349
350      try {
351        Run(cancellationToken);
352      }
353      finally {
354        timer.Elapsed -= new System.Timers.ElapsedEventHandler(timer_Elapsed);
355        timer.Stop();
356        ExecutionTime += DateTime.UtcNow - lastUpdateTime;
357      }
358
359      cancellationToken.ThrowIfCancellationRequested();
360    }
361
362    private int replayedIndex;
363
364    protected void Run(CancellationToken cancellationToken) {
365
366      if (prepared) {
367        replayedIndex = 0;
368        prepared = false;
369      }
370
371      try {
372
373        // play and evaluate initial window
374        PlayDatastream();
375        if(Datastream.SlidingWindowEvaluationPossible) Evaluate();
376        replayedIndex = Datastream.FitnessPartition.End;
377
378        do {
379          while(Datastream.SlidingWindowMovementPossible) {
380            cancellationToken.ThrowIfCancellationRequested();
381
382            // perform (delayed) window movement
383            Thread.Sleep(Datastream.SlidingWindowMovementInterval.Value);
384            Datastream.MoveSlidingWindow();
385            ResultsSlidingWindowMovements++;
386
387            // play and evaluate the moved window
388            PlayDatastream();
389            if (Datastream.SlidingWindowEvaluationPossible) Evaluate();
390            replayedIndex = Datastream.FitnessPartition.End;
391          }
392        } while (Datastream.UpdateAvailable);
393        finished = true;
394      }
395      catch (Exception ex) {
396        if (ex is ArgumentOutOfRangeException) throw ex;
397        if (ex is OperationCanceledException) throw ex;
398      }
399      finally {
400        // reset everything
401        //Prepare(true);
402      }
403    }
404
405    private void PlayDatastream() {
406      var problemData = Datastream.ProblemData;
407      var targetVarName = problemData.TargetVariable;
408
409      for (int i = replayedIndex; i < Datastream.FitnessPartition.End; i++) {
410        var realValue = problemData.Dataset.GetDoubleValue(targetVarName, i);
411        ResultsTargets.Rows[ResultsTargetsReal].Values.Add(realValue);
412      }
413    }
414
415    private void Evaluate() {
416      var problemData = Datastream.ProblemData;
417      var dataset = problemData.Dataset;
418      var targetVarName = problemData.TargetVariable;
419
420      var realRows = Enumerable.Range(Datastream.FitnessPartition.Start, Datastream.FitnessPartition.Size).ToList();
421      var realValues = dataset.GetDoubleValues(targetVarName, realRows);
422
423      foreach (var ensemble in Ensembles) {
424
425        var rows = Enumerable.Range(Datastream.FitnessPartition.Start, Datastream.FitnessPartition.Size);
426        var estimatedValuesPerModelPerRow = ensemble.Models.Select(x => x.GetEstimatedValues(datastream.ProblemData.Dataset, rows).ToArray());
427        var estimatedValues = Enumerable.Range(0, Datastream.FitnessPartition.Size).Select(r => estimatedValuesPerModelPerRow.Select(e => e[r]).Average()).ToArray(); // per row
428        var averageEstimatedValuesPerModel = estimatedValuesPerModelPerRow.Select(x => x.Average()); // per model
429        var averageEstimatedValue = averageEstimatedValuesPerModel.Average();
430
431        // determine quality
432        var mse = Math.Pow(averageEstimatedValue - realValues.Average(), 2);
433        OnlineCalculatorError error;
434        var pR = OnlinePearsonsRCalculator.Calculate(estimatedValues, realValues, out error);
435        var pR2 = error == OnlineCalculatorError.None ? pR * pR : 0.0;
436
437        int replayAmount = Datastream.FitnessPartition.End - replayedIndex;
438        int replayCount = estimatedValues.Length - replayAmount;
439
440        for (int i = replayedIndex; i < Datastream.FitnessPartition.End; i++) {
441          //ResultsTargets.Rows[ensemble.Name].Values.Add(averageEstimatedValue);
442          ResultsTargets.Rows[ensemble.Name].Values.Add(estimatedValues[replayCount]);
443          replayCount++;
444
445          //ResultsQualities.Rows[ensemble.Name + " - " + ResultsQualitiesMSE].Values.Add(mse);
446          ResultsQualities.Rows[ensemble.Name + " - " + ResultsQualitiesPR2].Values.Add(pR2);
447          ResultsQualitiesBars.Bars[ensemble.Name].Value = pR2;
448        }
449      }
450    }
451
452    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
453      System.Timers.Timer timer = (System.Timers.Timer)sender;
454      timer.Enabled = false;
455      DateTime now = DateTime.UtcNow;
456      ExecutionTime += now - lastUpdateTime;
457      lastUpdateTime = now;
458      timer.Enabled = true;
459    }
460
461    public void CollectParameterValues(IDictionary<string, IItem> values) {
462      values.Add("Datastream Analysis Name", new StringValue(Name));
463      if (Datastream != null) {
464        Datastream.CollectParameterValues(values);
465        values.Add("Datastream Name", new StringValue(Datastream.Name));
466      }
467    }
468
469    public void CollectResultValues(IDictionary<string, IItem> values) {
470      values.Add("Execution Time", new TimeSpanValue(ExecutionTime));
471      Results.CollectResultValues(values);
472    }
473    #endregion
474
475    #region events
476
477    #region events registration
478
479    public EventHandler EnsemblesChanged;
480    public EventHandler DatastreamChanged;
481
482    protected virtual void DeregisterRunsEvents() {
483      runs.CollectionReset -= new CollectionItemsChangedEventHandler<IRun>(Runs_CollectionReset);
484    }
485
486    protected virtual void RegisterRunsEvents() {
487      runs.CollectionReset += new CollectionItemsChangedEventHandler<IRun>(Runs_CollectionReset);
488    }
489
490    protected virtual void RegisterDatastreamEvents() {
491      datastream.Reset += new EventHandler(Datastream_Reset);
492      datastream.ProblemDataChanged += new EventHandler(Datastream_ProblemDataChanged);
493    }
494
495    protected virtual void DeregisterDatastreamEvents() {
496      datastream.Reset -= new EventHandler(Datastream_Reset);
497      datastream.ProblemDataChanged -= new EventHandler(Datastream_ProblemDataChanged);
498    }
499
500    protected virtual void RegisterEnsembleEvents() {
501      ensembles.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
502      ensembles.ItemsMoved += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
503      ensembles.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
504      ensembles.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
505      ensembles.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_Reset);
506    }
507
508    protected virtual void DeregisterEnsembleEvents() {
509      ensembles.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
510      ensembles.ItemsMoved -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
511      ensembles.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
512      ensembles.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
513      ensembles.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_Reset);
514    }
515    #endregion
516
517    #region event handling
518
519    protected virtual void Runs_CollectionReset(object sender, CollectionItemsChangedEventArgs<IRun> e) {
520      runsCounter = runs.Count;
521    }
522
523    protected virtual void Datastream_Reset(object sender, EventArgs e) {
524      Prepare();
525    }
526
527    protected virtual void Datastream_ProblemDataChanged(object sender, EventArgs e) {
528      Prepare();
529    }
530
531    protected virtual void Ensembles_Reset(object sender, EventArgs e) {
532      Prepare();
533    }
534
535    protected virtual void Ensembles_ItemsChanged(object sender, EventArgs e) {
536      Prepare();
537    }
538
539    private void OnEnsemblesChanged() {
540      var changed = EnsemblesChanged;
541      if (changed != null)
542        changed(this, EventArgs.Empty);
543    }
544
545    private void OnDatastreamChanged() {
546      var changed = DatastreamChanged;
547      if (changed != null)
548        changed(this, EventArgs.Empty);
549    }
550
551    #endregion
552
553    #endregion
554
555    #region NamedItem
556
557    [Storable]
558    protected string name;
559
560    /// <inheritdoc/>
561    /// <remarks>Calls <see cref="OnNameChanging"/> and also <see cref="OnNameChanged"/>
562    /// eventually in the setter.</remarks>
563    public string Name {
564      get { return name; }
565      set {
566        if (!CanChangeName) throw new NotSupportedException("Name cannot be changed.");
567        if (!(name.Equals(value) || (value == null) && (name == string.Empty))) {
568          CancelEventArgs<string> e = value == null
569            ? new CancelEventArgs<string>(string.Empty)
570            : new CancelEventArgs<string>(value);
571          OnNameChanging(e);
572          if (!e.Cancel) {
573            name = value == null ? string.Empty : value;
574            OnNameChanged();
575          }
576        }
577      }
578    }
579
580    public virtual bool CanChangeName {
581      get { return true; }
582    }
583
584    [Storable] protected string description;
585
586    public string Description {
587      get { return description; }
588      set {
589        if (!CanChangeDescription) throw new NotSupportedException("Description cannot be changed.");
590        if (!(description.Equals(value) || (value == null) && (description == string.Empty))) {
591          description = value == null ? string.Empty : value;
592          OnDescriptionChanged();
593        }
594      }
595    }
596
597    public virtual bool CanChangeDescription {
598      get { return true; }
599    }
600
601    /// <summary>
602    /// Gets the string representation of the current instance in the format: <c>Name: [null|Value]</c>.
603    /// </summary>
604    /// <returns>The current instance as a string.</returns>
605    public override string ToString() {
606      return Name;
607    }
608
609    /// <inheritdoc/>
610    public event EventHandler<CancelEventArgs<string>> NameChanging;
611
612    /// <summary>
613    /// Fires a new <c>NameChanging</c> event.
614    /// </summary>
615    /// <param name="e">The event arguments of the changing.</param>
616    protected virtual void OnNameChanging(CancelEventArgs<string> e) {
617      var handler = NameChanging;
618      if (handler != null) handler(this, e);
619    }
620
621    /// <inheritdoc/>
622    public event EventHandler NameChanged;
623
624    /// <summary>
625    /// Fires a new <c>NameChanged</c> event.
626    /// </summary>
627    /// <remarks>Calls <see cref="ItemBase.OnChanged"/>.</remarks>
628    protected virtual void OnNameChanged() {
629      var handler = NameChanged;
630      if (handler != null) handler(this, EventArgs.Empty);
631      OnToStringChanged();
632    }
633
634    /// <inheritdoc/>
635    public event EventHandler DescriptionChanged;
636
637    /// <summary>
638    /// Fires a new <c>DescriptionChanged</c> event.
639    /// </summary>
640    /// <remarks>Calls <see cref="ItemBase.OnChanged"/>.</remarks>
641    protected virtual void OnDescriptionChanged() {
642      var handler = DescriptionChanged;
643      if (handler != null) handler(this, EventArgs.Empty);
644    }
645
646    #endregion nameditem
647  }
648}
Note: See TracBrowser for help on using the repository browser.