source: branches/HeuristicLab.DatastreamAnalysis/HeuristicLab.DatastreamAnalysis/3.4/DatastreamAnalysisOptimizer.cs @ 14543

Last change on this file since 14543 was 14543, checked in by jzenisek, 3 years ago

#2719 enhanced Optimizer functionality; added results

File size: 21.6 KB
Line 
1#region License Information
2
3/* HeuristicLab
4 * Copyright (C) 2002-2016 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
5 *
6 * This file is part of HeuristicLab.
7 *
8 * HeuristicLab is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
12 *
13 * HeuristicLab is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
20 */
21
22#endregion
23
24using System;
25using System.Collections.Generic;
26using System.ComponentModel;
27using System.Diagnostics;
28using System.Drawing;
29using System.Linq;
30using System.Threading;
31using System.Threading.Tasks;
32using HeuristicLab.Analysis;
33using HeuristicLab.Collections;
34using HeuristicLab.Common;
35using HeuristicLab.Core;
36using HeuristicLab.Data;
37using HeuristicLab.Optimization;
38using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
39using HeuristicLab.Problems.DataAnalysis;
40
41namespace HeuristicLab.DatastreamAnalysis {
42  internal enum DatastreamAnalysisOptimizerAction {
43    None,
44    Prepare,
45    Start,
46    Stop,
47    Pause
48  }
49
50  [StorableClass]
51  [Item("DatastreamAnalysis Optimizer",
52     "The main loop for evaluating ensemble models against a incoming datastream of time series fashion.")]
53  [Creatable(CreatableAttribute.Categories.Algorithms)]
54  public class DatastreamAnalysisOptimizer : Executable, IOptimizer, IStorableContent {
55    #region properties
56    public string Filename { get; set; }
57
58    private DatastreamAnalysisOptimizerAction daoAction;
59
60    public IEnumerable<IOptimizer> NestedOptimizers { get; }
61
62
63    [Storable]
64    protected ILog log;
65    public ILog Log {
66      get { return log; }
67    }
68
69    [Storable]
70    private ResultCollection results;
71
72    public ResultCollection Results {
73      get { return results; }
74    }
75
76    private CancellationTokenSource cancellationTokenSource;
77    private bool stopPending;
78    private DateTime lastUpdateTime;
79    private bool prepared;
80    private bool finished;
81
82    [Storable]
83    protected int runsCounter;
84
85    [Storable]
86    private RunCollection runs;
87
88    public RunCollection Runs
89    {
90      get { return runs; }
91      protected set
92      {
93        if (value == null) throw new ArgumentNullException();
94        if (runs != value) {
95          if (runs != null) DeregisterRunsEvents();
96          runs = value;
97          if (runs != null) RegisterRunsEvents();
98        }
99      }
100    }
101
102
103    [Storable]
104    private IItemList<RegressionEnsembleModel> ensembles;
105
106    public IItemList<RegressionEnsembleModel> Ensembles {
107      get { return ensembles; }
108      set {
109        if (value == null || value == ensembles)
110          return;
111        if(!(value is IRegressionEnsembleModel)) throw new ArgumentException("Invaid ensemble model type");
112        DeregisterEnsembleEvents();
113        ensembles = value;
114        RegisterEnsembleEvents();
115        OnEnsemblesChanged();
116        Prepare();
117      }
118    }
119
120
121    // VAR 1: datastream ~= problem data, VAR 2 (TODO): datastream = external source e.g. webservice, AMQP-Queue, etc.
122    [Storable]
123    private Datastream datastream;
124
125    public Datastream Datastream {
126      get { return datastream; }
127      set {
128        if (value == null || value == datastream)
129          return;
130        if(!(value is IDatastream)) throw new ArgumentException("Invalid datastream type");
131        DeregisterDatastreamEvents();
132        datastream = value;
133        RegisterDatastreamEvents();
134        OnDatastreamChanged();
135        Prepare();
136      }
137    }
138
139    #endregion properties
140
141    #region results properties
142    private int ResultsSlidingWindowMovements {
143      get { return ((IntValue)Results["Sliding Window Movements"].Value).Value; }
144      set { ((IntValue)Results["Sliding Window Movements"].Value).Value = value; }
145    }
146    private DataTable ResultsQualities {
147      get { return ((DataTable)Results["Qualities"].Value); }
148    }
149
150    private const string ResultsQualitiesMSE = "Mean squared error";
151    private const string ResultsQualitiesPR2 = "Pearson R²";
152
153    private DataTable ResultsTargets {
154      get { return ((DataTable) Results["Targets"].Value); }
155    }
156
157    private const string ResultsTargetsReal = "Real";
158
159    private DataTable ResultsQualitiesBars {
160      get { return ((DataTable) Results["Ensembles"].Value); }
161    }
162
163    protected void SetupResults() {
164      Results.Clear();
165
166      Results.Add(new Result("Sliding Window Movements", new IntValue(0)));
167      Results.Add(new Result("Qualities", new DataTable("Qualities")));
168      Results.Add(new Result("Targets", new DataTable("Targets")));
169
170      ResultsQualitiesBars.Rows.Add(new DataRow("Ensembles"));
171
172      ResultsTargets.Rows.Add(new DataRow(ResultsTargetsReal));
173      foreach (var ensemble in Ensembles) {
174        // targets table
175        ResultsTargets.Rows.Add(new DataRow(ensemble.Name));
176
177        // qualities (series)
178        //ResultsQualities.Rows.Add(new DataRow(ensemble.Name + " - " + ResultsQualitiesMSE));
179        ResultsQualities.Rows.Add(new DataRow(ensemble.Name + " - " + ResultsQualitiesPR2));
180
181        // qualities (bars)
182        //ResultsQualitiesBars.Rows.Add(new DataRow(ensemble.Name + " - " + ResultsQualitiesPR2));
183        //ResultsQualitiesBars.Rows["Ensembles"].Values.
184      }
185    }
186
187    #endregion
188
189    #region constructors, cloner,...
190    public DatastreamAnalysisOptimizer() : base() {
191      name = "Datastream Analysis";
192      log = new Log();
193      results = new ResultCollection();
194      ensembles = new ItemList<RegressionEnsembleModel>();
195      datastream = new Datastream();
196      runsCounter = 0;
197      runs = new RunCollection();
198      Initialize();
199    }
200
201    [StorableConstructor]
202    protected DatastreamAnalysisOptimizer(bool deserializing) : base(deserializing) {
203    }
204
205    [StorableHook(HookType.AfterDeserialization)]
206    private void AfterDeserialization() {
207      Initialize();
208    }
209
210    protected DatastreamAnalysisOptimizer(DatastreamAnalysisOptimizer original, Cloner cloner) : base(original, cloner) {
211      name = original.name;
212      log = cloner.Clone(original.log);
213      results = cloner.Clone(original.results);
214      ensembles = (ItemList<RegressionEnsembleModel>) original.Ensembles.Clone(cloner);
215      datastream = (Datastream) original.Datastream.Clone(cloner);
216      runsCounter = original.runsCounter;
217      runs = cloner.Clone(original.runs);
218      Initialize();
219    }
220
221    public override IDeepCloneable Clone(Cloner cloner) {
222      return new DatastreamAnalysisOptimizer(this, cloner);
223    }
224
225    private void Initialize() {
226      if (runs != null) RegisterRunsEvents();
227      if (datastream != null) RegisterDatastreamEvents();
228      if (ensembles != null) RegisterEnsembleEvents();
229    }
230    #endregion
231
232    #region control actions
233
234    public override void Prepare() {
235      if (ensembles == null || ensembles.Count == 0 || datastream == null || !datastream.SlidingWindowEvaluationPossible) return;
236      //if (ensembles.SelectMany(x => x.Models).Count() == 0) return;
237      base.Prepare();
238      OnPrepared();
239    }
240
241    public void Prepare(bool clearRuns) {
242      if (ensembles == null || ensembles.Count == 0 || datastream == null || !datastream.SlidingWindowEvaluationPossible) return;
243
244      base.Prepare();
245      if (clearRuns) runs.Clear();
246      OnPrepared();
247    }
248
249    public override void Start() {
250      if (ensembles == null || datastream == null) return;
251      base.Start();
252      cancellationTokenSource = new CancellationTokenSource();
253      stopPending = false;
254
255      if (prepared) {
256        SetupResults();
257        Datastream.InitializeState();
258      }
259
260      Task task = Task.Factory.StartNew(Run, cancellationTokenSource.Token, cancellationTokenSource.Token);
261      task.ContinueWith(t => {
262        try {
263          t.Wait();
264        }
265        catch (AggregateException ex) {
266          try {
267            ex.Flatten().Handle(x => x is OperationCanceledException);
268          } catch (AggregateException remaining) {
269            if(remaining.InnerExceptions.Count == 1) OnExceptionOccurred(remaining.InnerExceptions[0]);
270            else OnExceptionOccurred(remaining);
271          }
272        }
273        cancellationTokenSource.Dispose();
274        cancellationTokenSource = null;
275
276        // handle stop/pause
277        if (stopPending || finished) {
278          OnStopped();
279        } else {
280          OnPaused();
281        }
282      });
283    }
284
285    public override void Pause() {
286      if (ensembles == null || datastream == null) return;
287      base.Pause();
288      cancellationTokenSource.Cancel();
289    }
290
291    public override void Stop() {
292      if (ensembles == null || datastream == null) return;
293      base.Stop();
294      if (ExecutionState == ExecutionState.Paused) {
295        OnStopped();
296      } else {
297        stopPending = true;
298        cancellationTokenSource.Cancel();
299      }
300    }
301
302    protected override void OnPrepared() {
303      ExecutionTime = TimeSpan.Zero;
304      foreach (IStatefulItem statefulItem in this.GetObjectGraphObjects(new HashSet<object>() {Runs}).OfType<IStatefulItem>()) {
305        statefulItem.InitializeState();
306      }
307      results.Clear();
308      prepared = true;
309      finished = false;
310      Log.LogMessage("Datastream analysis prepared");
311      base.OnPrepared();
312    }
313
314    protected override void OnStarted() {
315      Log.LogMessage("Datastream analysis started");
316      base.OnStarted();
317    }
318
319    protected override void OnPaused() {
320      Log.LogMessage("Datastream analysis paused");
321      base.OnPaused();
322    }
323
324    protected override void OnStopped() {
325      try {
326        runsCounter++;
327        var run = new Run();
328        run.Filename = Filename;
329        run.Name = string.Format("{0} Run {1}", Name, runsCounter);
330        CollectParameterValues(run.Parameters);
331        CollectResultValues(run.Results);
332        runs.Add(run);
333      }
334      finally {
335        Log.LogMessage("Datastream analysis stopped");
336        base.OnStopped();
337      }
338    }
339
340    #endregion
341
342    #region run
343    private void Run(object state) {
344      CancellationToken cancellationToken = (CancellationToken) state;
345      OnStarted();
346      lastUpdateTime = DateTime.UtcNow;
347      System.Timers.Timer timer = new System.Timers.Timer(250);
348      timer.AutoReset = true;
349      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
350      timer.Start();
351
352      try {
353        Run(cancellationToken);
354      }
355      finally {
356        timer.Elapsed -= new System.Timers.ElapsedEventHandler(timer_Elapsed);
357        timer.Stop();
358        ExecutionTime += DateTime.UtcNow - lastUpdateTime;
359      }
360
361      cancellationToken.ThrowIfCancellationRequested();
362    }
363
364    private int replayedIndex;
365
366    protected void Run(CancellationToken cancellationToken) {
367
368      if (prepared) {
369        replayedIndex = 0;
370        prepared = false;
371      }
372
373      try {
374
375        // play and evaluate initial window
376        PlayDatastream();
377        if(Datastream.SlidingWindowEvaluationPossible) Evaluate();
378        replayedIndex = Datastream.FitnessPartition.End;
379
380        do {
381          while(Datastream.SlidingWindowMovementPossible) {
382            cancellationToken.ThrowIfCancellationRequested();
383
384            // perform (delayed) window movement
385            Thread.Sleep(Datastream.SlidingWindowMovementInterval.Value);
386            Datastream.MoveSlidingWindow();
387            ResultsSlidingWindowMovements++;
388
389            // play and evaluate the moved window
390            PlayDatastream();
391            if (Datastream.SlidingWindowEvaluationPossible) Evaluate();
392            replayedIndex = Datastream.FitnessPartition.End;
393          }
394        } while (Datastream.UpdateAvailable);
395        finished = true;
396      }
397      catch (Exception ex) {
398        if (ex is ArgumentOutOfRangeException) throw ex;
399        if (ex is OperationCanceledException) throw ex;
400      }
401      finally {
402        // reset everything
403        //Prepare(true);
404      }
405    }
406
407    private void PlayDatastream() {
408      var problemData = Datastream.ProblemData;
409      var targetVarName = problemData.TargetVariable;
410
411      for (int i = replayedIndex; i < Datastream.FitnessPartition.End; i++) {
412        var realValue = problemData.Dataset.GetDoubleValue(targetVarName, i);
413        ResultsTargets.Rows[ResultsTargetsReal].Values.Add(realValue);
414      }
415    }
416
417    private void Evaluate() {
418      var problemData = Datastream.ProblemData;
419      var dataset = problemData.Dataset;
420      var targetVarName = problemData.TargetVariable;
421
422      var realRows = Enumerable.Range(Datastream.FitnessPartition.Start, Datastream.FitnessPartition.Size).ToList();
423      var realValues = dataset.GetDoubleValues(targetVarName, realRows);
424
425      foreach (var ensemble in Ensembles) {
426
427        var rows = Enumerable.Range(Datastream.FitnessPartition.Start, Datastream.FitnessPartition.Size);
428        var estimatedValuesPerModelPerRow = ensemble.Models.Select(x => x.GetEstimatedValues(datastream.ProblemData.Dataset, rows).ToArray());
429        var estimatedValues = Enumerable.Range(0, Datastream.FitnessPartition.Size).Select(r => estimatedValuesPerModelPerRow.Select(e => e[r]).Average()).ToArray(); // per row
430        var averageEstimatedValuesPerModel = estimatedValuesPerModelPerRow.Select(x => x.Average()); // per model
431        var averageEstimatedValue = averageEstimatedValuesPerModel.Average();
432
433        // determine quality
434        var mse = Math.Pow(averageEstimatedValue - realValues.Average(), 2);
435        OnlineCalculatorError error;
436        var pR = OnlinePearsonsRCalculator.Calculate(estimatedValues, realValues, out error);
437        var pR2 = error == OnlineCalculatorError.None ? pR * pR : 0.0;
438
439        DataRow qualitiesBarsRow = ResultsQualitiesBars.Rows[ensemble.Name + " - " + ResultsQualitiesPR2];
440
441        for (int i = replayedIndex; i < Datastream.FitnessPartition.End; i++) {
442          ResultsTargets.Rows[ensemble.Name].Values.Add(averageEstimatedValue);
443
444          //ResultsQualities.Rows[ensemble.Name + " - " + ResultsQualitiesMSE].Values.Add(mse);
445          ResultsQualities.Rows[ensemble.Name + " - " + ResultsQualitiesPR2].Values.Add(pR2);
446        }
447      }
448    }
449
450    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
451      System.Timers.Timer timer = (System.Timers.Timer)sender;
452      timer.Enabled = false;
453      DateTime now = DateTime.UtcNow;
454      ExecutionTime += now - lastUpdateTime;
455      lastUpdateTime = now;
456      timer.Enabled = true;
457    }
458
459    public void CollectParameterValues(IDictionary<string, IItem> values) {
460      values.Add("Datastream Analysis Name", new StringValue(Name));
461      if (Datastream != null) {
462        Datastream.CollectParameterValues(values);
463        values.Add("Datastream Name", new StringValue(Datastream.Name));
464      }
465    }
466
467    public void CollectResultValues(IDictionary<string, IItem> values) {
468      values.Add("Execution Time", new TimeSpanValue(ExecutionTime));
469      Results.CollectResultValues(values);
470    }
471    #endregion
472
473    #region events
474
475    #region events registration
476
477    public EventHandler EnsemblesChanged;
478    public EventHandler DatastreamChanged;
479
480    protected virtual void DeregisterRunsEvents() {
481      runs.CollectionReset -= new CollectionItemsChangedEventHandler<IRun>(Runs_CollectionReset);
482    }
483
484    protected virtual void RegisterRunsEvents() {
485      runs.CollectionReset += new CollectionItemsChangedEventHandler<IRun>(Runs_CollectionReset);
486    }
487
488    protected virtual void RegisterDatastreamEvents() {
489      datastream.Reset += new EventHandler(Datastream_Reset);
490      datastream.ProblemDataChanged += new EventHandler(Datastream_ProblemDataChanged);
491    }
492
493    protected virtual void DeregisterDatastreamEvents() {
494      datastream.Reset -= new EventHandler(Datastream_Reset);
495      datastream.ProblemDataChanged -= new EventHandler(Datastream_ProblemDataChanged);
496    }
497
498    protected virtual void RegisterEnsembleEvents() {
499      ensembles.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
500      ensembles.ItemsMoved += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
501      ensembles.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
502      ensembles.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
503      ensembles.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_Reset);
504    }
505
506    protected virtual void DeregisterEnsembleEvents() {
507      ensembles.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
508      ensembles.ItemsMoved -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
509      ensembles.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
510      ensembles.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
511      ensembles.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_Reset);
512    }
513    #endregion
514
515    #region event handling
516
517    protected virtual void Runs_CollectionReset(object sender, CollectionItemsChangedEventArgs<IRun> e) {
518      runsCounter = runs.Count;
519    }
520
521    protected virtual void Datastream_Reset(object sender, EventArgs e) {
522      Prepare();
523    }
524
525    protected virtual void Datastream_ProblemDataChanged(object sender, EventArgs e) {
526      Prepare();
527    }
528
529    protected virtual void Ensembles_Reset(object sender, EventArgs e) {
530      Prepare();
531    }
532
533    protected virtual void Ensembles_ItemsChanged(object sender, EventArgs e) {
534      Prepare();
535    }
536
537    private void OnEnsemblesChanged() {
538      var changed = EnsemblesChanged;
539      if (changed != null)
540        changed(this, EventArgs.Empty);
541    }
542
543    private void OnDatastreamChanged() {
544      var changed = DatastreamChanged;
545      if (changed != null)
546        changed(this, EventArgs.Empty);
547    }
548
549    #endregion
550
551    #endregion
552
553    #region NamedItem
554
555    [Storable]
556    protected string name;
557
558    /// <inheritdoc/>
559    /// <remarks>Calls <see cref="OnNameChanging"/> and also <see cref="OnNameChanged"/>
560    /// eventually in the setter.</remarks>
561    public string Name {
562      get { return name; }
563      set {
564        if (!CanChangeName) throw new NotSupportedException("Name cannot be changed.");
565        if (!(name.Equals(value) || (value == null) && (name == string.Empty))) {
566          CancelEventArgs<string> e = value == null
567            ? new CancelEventArgs<string>(string.Empty)
568            : new CancelEventArgs<string>(value);
569          OnNameChanging(e);
570          if (!e.Cancel) {
571            name = value == null ? string.Empty : value;
572            OnNameChanged();
573          }
574        }
575      }
576    }
577
578    public virtual bool CanChangeName {
579      get { return true; }
580    }
581
582    [Storable] protected string description;
583
584    public string Description {
585      get { return description; }
586      set {
587        if (!CanChangeDescription) throw new NotSupportedException("Description cannot be changed.");
588        if (!(description.Equals(value) || (value == null) && (description == string.Empty))) {
589          description = value == null ? string.Empty : value;
590          OnDescriptionChanged();
591        }
592      }
593    }
594
595    public virtual bool CanChangeDescription {
596      get { return true; }
597    }
598
599    /// <summary>
600    /// Gets the string representation of the current instance in the format: <c>Name: [null|Value]</c>.
601    /// </summary>
602    /// <returns>The current instance as a string.</returns>
603    public override string ToString() {
604      return Name;
605    }
606
607    /// <inheritdoc/>
608    public event EventHandler<CancelEventArgs<string>> NameChanging;
609
610    /// <summary>
611    /// Fires a new <c>NameChanging</c> event.
612    /// </summary>
613    /// <param name="e">The event arguments of the changing.</param>
614    protected virtual void OnNameChanging(CancelEventArgs<string> e) {
615      var handler = NameChanging;
616      if (handler != null) handler(this, e);
617    }
618
619    /// <inheritdoc/>
620    public event EventHandler NameChanged;
621
622    /// <summary>
623    /// Fires a new <c>NameChanged</c> event.
624    /// </summary>
625    /// <remarks>Calls <see cref="ItemBase.OnChanged"/>.</remarks>
626    protected virtual void OnNameChanged() {
627      var handler = NameChanged;
628      if (handler != null) handler(this, EventArgs.Empty);
629      OnToStringChanged();
630    }
631
632    /// <inheritdoc/>
633    public event EventHandler DescriptionChanged;
634
635    /// <summary>
636    /// Fires a new <c>DescriptionChanged</c> event.
637    /// </summary>
638    /// <remarks>Calls <see cref="ItemBase.OnChanged"/>.</remarks>
639    protected virtual void OnDescriptionChanged() {
640      var handler = DescriptionChanged;
641      if (handler != null) handler(this, EventArgs.Empty);
642    }
643
644    #endregion nameditem
645  }
646}
Note: See TracBrowser for help on using the repository browser.