source: branches/HeuristicLab.DatastreamAnalysis/HeuristicLab.DatastreamAnalysis/3.4/DatastreamAnalysisOptimizer.cs @ 14538

Last change on this file since 14538 was 14538, checked in by jzenisek, 3 years ago

#2719 enhanced the optimizer's functionality

File size: 20.6 KB
Line 
1#region License Information
2
3/* HeuristicLab
4 * Copyright (C) 2002-2016 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
5 *
6 * This file is part of HeuristicLab.
7 *
8 * HeuristicLab is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
12 *
13 * HeuristicLab is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
20 */
21
22#endregion
23
24using System;
25using System.Collections.Generic;
26using System.ComponentModel;
27using System.Diagnostics;
28using System.Drawing;
29using System.Linq;
30using System.Threading;
31using System.Threading.Tasks;
32using HeuristicLab.Analysis;
33using HeuristicLab.Collections;
34using HeuristicLab.Common;
35using HeuristicLab.Core;
36using HeuristicLab.Data;
37using HeuristicLab.Optimization;
38using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
39using HeuristicLab.Problems.DataAnalysis;
40
41namespace HeuristicLab.DatastreamAnalysis {
42  internal enum DatastreamAnalysisOptimizerAction {
43    None,
44    Prepare,
45    Start,
46    Stop,
47    Pause
48  }
49
50  [StorableClass]
51  [Item("DatastreamAnalysis Optimizer",
52     "The main loop for evaluating ensemble models against a incoming datastream of time series fashion.")]
53  [Creatable(CreatableAttribute.Categories.Algorithms)]
54  public class DatastreamAnalysisOptimizer : Executable, IOptimizer, IStorableContent {
55    #region properties
56    public string Filename { get; set; }
57
58    private DatastreamAnalysisOptimizerAction daoAction;
59
60    public IEnumerable<IOptimizer> NestedOptimizers { get; }
61
62
63    [Storable]
64    protected ILog log;
65    public ILog Log {
66      get { return log; }
67    }
68
69    [Storable]
70    private ResultCollection results;
71
72    public ResultCollection Results {
73      get { return results; }
74    }
75
76    private CancellationTokenSource cancellationTokenSource;
77    private bool stopPending;
78    private DateTime lastUpdateTime;
79    private bool prepared;
80    private bool finished;
81
82    [Storable]
83    protected int runsCounter;
84
85    [Storable]
86    private RunCollection runs;
87
88    public RunCollection Runs
89    {
90      get { return runs; }
91      protected set
92      {
93        if (value == null) throw new ArgumentNullException();
94        if (runs != value) {
95          if (runs != null) DeregisterRunsEvents();
96          runs = value;
97          if (runs != null) RegisterRunsEvents();
98        }
99      }
100    }
101
102
103    [Storable]
104    private IItemList<RegressionEnsembleModel> ensembles;
105
106    public IItemList<RegressionEnsembleModel> Ensembles {
107      get { return ensembles; }
108      set {
109        if (value == null || value == ensembles)
110          return;
111        if(!(value is IRegressionEnsembleModel)) throw new ArgumentException("Invaid ensemble model type");
112        DeregisterEnsembleEvents();
113        ensembles = value;
114        RegisterEnsembleEvents();
115        OnEnsemblesChanged();
116        Prepare();
117      }
118    }
119
120
121    // VAR 1: datastream ~= problem data, VAR 2 (TODO): datastream = external source e.g. webservice, AMQP-Queue, etc.
122    [Storable]
123    private Datastream datastream;
124
125    public Datastream Datastream {
126      get { return datastream; }
127      set {
128        if (value == null || value == datastream)
129          return;
130        if(!(value is IDatastream)) throw new ArgumentException("Invalid datastream type");
131        DeregisterDatastreamEvents();
132        datastream = value;
133        RegisterDatastreamEvents();
134        OnDatastreamChanged();
135        Prepare();
136      }
137    }
138
139    #endregion properties
140
141    #region results properties
142    private int ResultsSlidingWindowMovements {
143      get { return ((IntValue)Results["Sliding Window Movements"].Value).Value; }
144      set { ((IntValue)Results["Sliding Window Movements"].Value).Value = value; }
145    }
146    private double ResultsBestQuality {
147      get { return ((DoubleValue)Results["Best Quality"].Value).Value; }
148      set { ((DoubleValue)Results["Best Quality"].Value).Value = value; }
149    }
150    private DataTable ResultsQualities {
151      get { return ((DataTable)Results["Qualities"].Value); }
152    }
153
154    private const string ResultsQualitiesR2 = "R²";
155    private const string ResultsQualitiesPearson = "Pearson";
156
157    private DataTable ResultsTargets {
158      get { return ((DataTable) Results["Targets"].Value); }
159    }
160
161    private const string ResultsTargetsReal = "Real";
162
163    protected void SetupResults() {
164      Results.Clear();
165
166      Results.Add(new Result("Sliding Window Movements", new IntValue(0)));
167      Results.Add(new Result("Best Quality", new DoubleValue(0)));
168      Results.Add(new Result("Qualities", new DataTable("Qualities")));
169      Results.Add(new Result("Targets", new DataTable("Targets")));
170
171      ResultsQualities.Rows.Add(new DataRow(ResultsQualitiesR2));
172      ResultsQualities.Rows.Add(new DataRow(ResultsQualitiesPearson));
173      ResultsTargets.Rows.Add(new DataRow(ResultsTargetsReal));
174      foreach (var ensemble in Ensembles) {
175        ResultsTargets.Rows.Add(new DataRow(ensemble.Name));
176      }
177    }
178
179    #endregion
180
181    #region constructors, cloner,...
182
183    public DatastreamAnalysisOptimizer() : base() {
184      name = "Datastream Analysis";
185      log = new Log();
186      results = new ResultCollection();
187      ensembles = new ItemList<RegressionEnsembleModel>();
188      datastream = new Datastream();
189      runsCounter = 0;
190      runs = new RunCollection();
191      Initialize();
192    }
193
194    [StorableConstructor]
195    protected DatastreamAnalysisOptimizer(bool deserializing) : base(deserializing) {
196    }
197
198    [StorableHook(HookType.AfterDeserialization)]
199    private void AfterDeserialization() {
200      Initialize();
201    }
202
203    protected DatastreamAnalysisOptimizer(DatastreamAnalysisOptimizer original, Cloner cloner) : base(original, cloner) {
204      name = original.name;
205      log = cloner.Clone(original.log);
206      results = cloner.Clone(original.results);
207      ensembles = (ItemList<RegressionEnsembleModel>) original.Ensembles.Clone(cloner);
208      datastream = (Datastream) original.Datastream.Clone(cloner);
209      runsCounter = original.runsCounter;
210      runs = cloner.Clone(original.runs);
211      Initialize();
212    }
213
214    public override IDeepCloneable Clone(Cloner cloner) {
215      return new DatastreamAnalysisOptimizer(this, cloner);
216    }
217
218    private void Initialize() {
219      if (runs != null) RegisterRunsEvents();
220      if (datastream != null) RegisterDatastreamEvents();
221      if (ensembles != null) RegisterEnsembleEvents();
222    }
223    #endregion
224
225    #region control actions
226
227    public override void Prepare() {
228      if (ensembles == null || ensembles.Count == 0 || datastream == null || !datastream.SlidingWindowEvaluationPossible) return;
229      //if (ensembles.SelectMany(x => x.Models).Count() == 0) return;
230      base.Prepare();
231      OnPrepared();
232    }
233
234    public void Prepare(bool clearRuns) {
235      if (ensembles == null || ensembles.Count == 0 || datastream == null || !datastream.SlidingWindowEvaluationPossible) return;
236
237      base.Prepare();
238      if (clearRuns) runs.Clear();
239      OnPrepared();
240    }
241
242    public override void Start() {
243      if (ensembles == null || datastream == null) return;
244      base.Start();
245      cancellationTokenSource = new CancellationTokenSource();
246      stopPending = false;
247
248      if (prepared) {
249        SetupResults();
250        //prepared = false;
251      }
252
253      Task task = Task.Factory.StartNew(Run, cancellationTokenSource.Token, cancellationTokenSource.Token);
254      task.ContinueWith(t => {
255        try {
256          t.Wait();
257        }
258        catch (AggregateException ex) {
259          try {
260            ex.Flatten().Handle(x => x is OperationCanceledException);
261          } catch (AggregateException remaining) {
262            if(remaining.InnerExceptions.Count == 1) OnExceptionOccurred(remaining.InnerExceptions[0]);
263            else OnExceptionOccurred(remaining);
264          }
265        }
266        cancellationTokenSource.Dispose();
267        cancellationTokenSource = null;
268
269        // handle stop/pause
270        if (stopPending || finished) {
271          OnStopped();
272        } else {
273          OnPaused();
274        }
275        //if(!Datastream.SlidingWindowMovementPossible) OnStopped();
276        //else OnPaused();
277      });
278
279    }
280
281    public override void Pause() {
282      if (ensembles == null || datastream == null) return;
283      base.Pause();
284      cancellationTokenSource.Cancel();
285    }
286
287    public override void Stop() {
288      if (ensembles == null || datastream == null) return;
289      base.Stop();
290      if (ExecutionState == ExecutionState.Paused) {
291        OnStopped();
292      } else {
293        stopPending = true;
294        cancellationTokenSource.Cancel();
295      }
296    }
297
298    protected override void OnPrepared() {
299      ExecutionTime = TimeSpan.Zero;
300      foreach (IStatefulItem statefulItem in this.GetObjectGraphObjects(new HashSet<object>() {Runs}).OfType<IStatefulItem>()) {
301        statefulItem.InitializeState();
302      }
303      results.Clear();
304      prepared = true;
305      finished = false;
306      Log.LogMessage("Datastream analysis prepared");
307      base.OnPrepared();
308    }
309
310    protected override void OnStarted() {
311      Log.LogMessage("Datastream analysis started");
312      base.OnStarted();
313    }
314
315    protected override void OnPaused() {
316      Log.LogMessage("Datastream analysis paused");
317      base.OnPaused();
318    }
319
320    protected override void OnStopped() {
321      try {
322        runsCounter++;
323        var run = new Run();
324        run.Filename = Filename;
325        run.Name = string.Format("{0} Run {1}", Name, runsCounter);
326        CollectParameterValues(run.Parameters);
327        CollectResultValues(run.Results);
328        runs.Add(run);
329      }
330      finally {
331        Log.LogMessage("Datastream analysis stopped");
332        base.OnStopped();
333      }
334    }
335
336    #endregion
337
338    #region run
339    private void Run(object state) {
340      CancellationToken cancellationToken = (CancellationToken) state;
341      OnStarted();
342      lastUpdateTime = DateTime.UtcNow;
343      System.Timers.Timer timer = new System.Timers.Timer(250);
344      timer.AutoReset = true;
345      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
346      timer.Start();
347
348      try {
349        Run(cancellationToken);
350      }
351      finally {
352        timer.Elapsed -= new System.Timers.ElapsedEventHandler(timer_Elapsed);
353        timer.Stop();
354        ExecutionTime += DateTime.UtcNow - lastUpdateTime;
355      }
356
357      cancellationToken.ThrowIfCancellationRequested();
358    }
359
360    private int replayedIndex;
361
362    protected void Run(CancellationToken cancellationToken) {
363      // init algorithm
364      var problemData = Datastream.ProblemData;
365      var targetVarName = problemData.TargetVariable;
366      var activeVariables = problemData.AllowedInputVariables;
367
368      if (prepared) {
369        // replay datastream until FitnessPartition.End
370        //for (int i = 0; i < Datastream.FitnessPartition.End; i++) {
371        //  var val = problemData.Dataset.GetDoubleValue(targetVarName, i);
372        //  ResultsTargets.Rows[ResultsTargetsReal].Values.Add(val);
373        //}
374        //replayedIndex = Datastream.FitnessPartition.End;
375        replayedIndex = 0;
376        prepared = false;
377      }
378
379
380      Random rnd = new Random();
381
382      try {
383        do {
384          do {
385            cancellationToken.ThrowIfCancellationRequested();
386
387            Thread.Sleep(Datastream.SlidingWindowMovementInterval.Value);
388            Datastream.MoveSlidingWindow();
389
390            if (Datastream.SlidingWindowEvaluationPossible) {
391              foreach (var ensemble in Ensembles) {
392                // TODO do the window evaluation
393                double estimatedVal = problemData.Dataset.GetDoubleValue(targetVarName, replayedIndex)
394                                      * (1.0 + (((double)rnd.Next(0, 30)) / 100));
395                for (int i = replayedIndex; i < Datastream.FitnessPartition.End; i++) {
396                  ResultsTargets.Rows[ensemble.Name].Values.Add(estimatedVal);
397                }
398              }
399            }
400
401            // replay datastream until FitnessPartition.End
402            for (int i = replayedIndex; i < Datastream.FitnessPartition.End; i++) {
403              var val = problemData.Dataset.GetDoubleValue(targetVarName, i);
404              ResultsTargets.Rows[ResultsTargetsReal].Values.Add(val);
405            }
406            replayedIndex = Datastream.FitnessPartition.End;
407
408
409            // TODO: collect results and display them
410
411            int exp = rnd.Next(100);
412            ResultsQualities.Rows[ResultsQualitiesR2].Values.Add(exp);
413            ResultsQualities.Rows[ResultsQualitiesPearson].Values.Add((double)exp / 10);
414            ResultsSlidingWindowMovements++;
415            ResultsBestQuality = (double)exp / 42;
416
417          } while (Datastream.SlidingWindowMovementPossible);
418        } while (Datastream.UpdateAvailable);
419        finished = true;
420      }
421      catch (Exception ex) {
422        if (ex is ArgumentOutOfRangeException) throw ex;
423        if (ex is OperationCanceledException) throw ex;
424      }
425      finally {
426        // reset everything
427        //Prepare(true);
428      }
429    }
430
431    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
432      System.Timers.Timer timer = (System.Timers.Timer)sender;
433      timer.Enabled = false;
434      DateTime now = DateTime.UtcNow;
435      ExecutionTime += now - lastUpdateTime;
436      lastUpdateTime = now;
437      timer.Enabled = true;
438    }
439
440    public void CollectParameterValues(IDictionary<string, IItem> values) {
441      values.Add("Datastream Analysis Name", new StringValue(Name));
442      if (Datastream != null) {
443        Datastream.CollectParameterValues(values);
444        values.Add("Datastream Name", new StringValue(Datastream.Name));
445      }
446    }
447
448    public void CollectResultValues(IDictionary<string, IItem> values) {
449      values.Add("Execution Time", new TimeSpanValue(ExecutionTime));
450      Results.CollectResultValues(values);
451    }
452    #endregion
453
454    #region events
455
456    #region events registration
457
458    public EventHandler EnsemblesChanged;
459    public EventHandler DatastreamChanged;
460
461    protected virtual void DeregisterRunsEvents() {
462      runs.CollectionReset -= new CollectionItemsChangedEventHandler<IRun>(Runs_CollectionReset);
463    }
464
465    protected virtual void RegisterRunsEvents() {
466      runs.CollectionReset += new CollectionItemsChangedEventHandler<IRun>(Runs_CollectionReset);
467    }
468
469    protected virtual void RegisterDatastreamEvents() {
470      datastream.Reset += new EventHandler(Datastream_Reset);
471      datastream.ProblemDataChanged += new EventHandler(Datastream_ProblemDataChanged);
472    }
473
474    protected virtual void DeregisterDatastreamEvents() {
475      datastream.Reset -= new EventHandler(Datastream_Reset);
476      datastream.ProblemDataChanged -= new EventHandler(Datastream_ProblemDataChanged);
477    }
478
479    protected virtual void RegisterEnsembleEvents() {
480      ensembles.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
481      ensembles.ItemsMoved += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
482      ensembles.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
483      ensembles.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
484      ensembles.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_Reset);
485    }
486
487    protected virtual void DeregisterEnsembleEvents() {
488      ensembles.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
489      ensembles.ItemsMoved -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
490      ensembles.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
491      ensembles.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_ItemsChanged);
492      ensembles.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<RegressionEnsembleModel>>(Ensembles_Reset);
493    }
494    #endregion
495
496    #region event handling
497
498    protected virtual void Runs_CollectionReset(object sender, CollectionItemsChangedEventArgs<IRun> e) {
499      runsCounter = runs.Count;
500    }
501
502    protected virtual void Datastream_Reset(object sender, EventArgs e) {
503      Prepare();
504    }
505
506    protected virtual void Datastream_ProblemDataChanged(object sender, EventArgs e) {
507      Prepare();
508    }
509
510    protected virtual void Ensembles_Reset(object sender, EventArgs e) {
511      Prepare();
512    }
513
514    protected virtual void Ensembles_ItemsChanged(object sender, EventArgs e) {
515      Prepare();
516    }
517
518    private void OnEnsemblesChanged() {
519      var changed = EnsemblesChanged;
520      if (changed != null)
521        changed(this, EventArgs.Empty);
522    }
523
524    private void OnDatastreamChanged() {
525      var changed = DatastreamChanged;
526      if (changed != null)
527        changed(this, EventArgs.Empty);
528    }
529
530    #endregion
531
532    #endregion
533
534    #region NamedItem
535
536    [Storable]
537    protected string name;
538
539    /// <inheritdoc/>
540    /// <remarks>Calls <see cref="OnNameChanging"/> and also <see cref="OnNameChanged"/>
541    /// eventually in the setter.</remarks>
542    public string Name {
543      get { return name; }
544      set {
545        if (!CanChangeName) throw new NotSupportedException("Name cannot be changed.");
546        if (!(name.Equals(value) || (value == null) && (name == string.Empty))) {
547          CancelEventArgs<string> e = value == null
548            ? new CancelEventArgs<string>(string.Empty)
549            : new CancelEventArgs<string>(value);
550          OnNameChanging(e);
551          if (!e.Cancel) {
552            name = value == null ? string.Empty : value;
553            OnNameChanged();
554          }
555        }
556      }
557    }
558
559    public virtual bool CanChangeName {
560      get { return true; }
561    }
562
563    [Storable] protected string description;
564
565    public string Description {
566      get { return description; }
567      set {
568        if (!CanChangeDescription) throw new NotSupportedException("Description cannot be changed.");
569        if (!(description.Equals(value) || (value == null) && (description == string.Empty))) {
570          description = value == null ? string.Empty : value;
571          OnDescriptionChanged();
572        }
573      }
574    }
575
576    public virtual bool CanChangeDescription {
577      get { return true; }
578    }
579
580    /// <summary>
581    /// Gets the string representation of the current instance in the format: <c>Name: [null|Value]</c>.
582    /// </summary>
583    /// <returns>The current instance as a string.</returns>
584    public override string ToString() {
585      return Name;
586    }
587
588    /// <inheritdoc/>
589    public event EventHandler<CancelEventArgs<string>> NameChanging;
590
591    /// <summary>
592    /// Fires a new <c>NameChanging</c> event.
593    /// </summary>
594    /// <param name="e">The event arguments of the changing.</param>
595    protected virtual void OnNameChanging(CancelEventArgs<string> e) {
596      var handler = NameChanging;
597      if (handler != null) handler(this, e);
598    }
599
600    /// <inheritdoc/>
601    public event EventHandler NameChanged;
602
603    /// <summary>
604    /// Fires a new <c>NameChanged</c> event.
605    /// </summary>
606    /// <remarks>Calls <see cref="ItemBase.OnChanged"/>.</remarks>
607    protected virtual void OnNameChanged() {
608      var handler = NameChanged;
609      if (handler != null) handler(this, EventArgs.Empty);
610      OnToStringChanged();
611    }
612
613    /// <inheritdoc/>
614    public event EventHandler DescriptionChanged;
615
616    /// <summary>
617    /// Fires a new <c>DescriptionChanged</c> event.
618    /// </summary>
619    /// <remarks>Calls <see cref="ItemBase.OnChanged"/>.</remarks>
620    protected virtual void OnDescriptionChanged() {
621      var handler = DescriptionChanged;
622      if (handler != null) handler(this, EventArgs.Empty);
623    }
624
625    #endregion nameditem
626  }
627}
Note: See TracBrowser for help on using the repository browser.