source: branches/HeuristicLab.DatastreamAnalysis/HeuristicLab.DatastreamAnalysis/3.4/DatastreamAnalysisOptimizer.cs @ 14536

Last change on this file since 14536 was 14536, checked in by jzenisek, 3 years ago

#2719 added datastream type; updated the optimizer view and control functionality

File size: 14.1 KB
Line 
1#region License Information
2
3/* HeuristicLab
4 * Copyright (C) 2002-2016 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
5 *
6 * This file is part of HeuristicLab.
7 *
8 * HeuristicLab is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
12 *
13 * HeuristicLab is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
20 */
21
22#endregion
23
24using System;
25using System.Collections.Generic;
26using System.Diagnostics;
27using System.Drawing;
28using System.Linq;
29using System.Threading;
30using System.Threading.Tasks;
31using HeuristicLab.Analysis;
32using HeuristicLab.Collections;
33using HeuristicLab.Common;
34using HeuristicLab.Core;
35using HeuristicLab.Data;
36using HeuristicLab.Optimization;
37using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
38using HeuristicLab.Problems.DataAnalysis;
39
40namespace HeuristicLab.DatastreamAnalysis {
41  internal enum DatastreamAnalysisOptimizerAction {
42    None,
43    Prepare,
44    Start,
45    Stop,
46    Pause
47  }
48
49  [StorableClass]
50  [Item("DatastreamAnalysis Optimizer",
51     "The main loop for evaluating ensemble models against a incoming datastream of time series fashion.")]
52  [Creatable(CreatableAttribute.Categories.Algorithms)]
53  public class DatastreamAnalysisOptimizer : Executable, IOptimizer, IStorableContent {
54    #region properties
55    public string Filename { get; set; }
56
57    private DatastreamAnalysisOptimizerAction daoAction;
58
59    public IEnumerable<IOptimizer> NestedOptimizers { get; }
60
61
62    [Storable]
63    protected ILog log;
64    public ILog Log {
65      get { return log; }
66    }
67
68    [Storable]
69    private ResultCollection results;
70
71    public ResultCollection Results {
72      get { return results; }
73    }
74
75    private CancellationTokenSource cancellationTokenSource;
76    private bool stopPending;
77    private DateTime lastUpdateTime;
78
79    [Storable]
80    protected int runsCounter;
81
82    [Storable]
83    private RunCollection runs;
84
85    public RunCollection Runs
86    {
87      get { return runs; }
88      protected set
89      {
90        if (value == null) throw new ArgumentNullException();
91        if (runs != value) {
92          if (runs != null) DeregisterRunsEvents();
93          runs = value;
94          if (runs != null) RegisterRunsEvents();
95        }
96      }
97    }
98
99
100    [Storable]
101    private IItemList<RegressionEnsembleModel> ensembles;
102
103    public IItemList<RegressionEnsembleModel> Ensembles {
104      get { return ensembles; }
105      set {
106        if (value == null || value == ensembles)
107          return;
108
109        ensembles = value;
110      }
111    }
112
113
114    // VAR 1: datastream ~= problem data, VAR 2 (TODO): datastream = external source e.g. webservice, AMQP-Queue, etc.
115    [Storable]
116    private Datastream datastream;
117
118    public Datastream Datastream {
119      get { return datastream; }
120      set {
121        if (value == null || value == datastream)
122          return;
123
124        datastream = value;
125      }
126    }
127
128    #endregion properties
129
130    #region ResultsProperties
131    private double ResultsBestQuality
132    {
133      get { return ((DoubleValue)Results["Best Quality"].Value).Value; }
134      set { ((DoubleValue)Results["Best Quality"].Value).Value = value; }
135    }
136    private DataTable ResultsQualities
137    {
138      get { return ((DataTable)Results["Qualities"].Value); }
139    }
140    #endregion
141
142    #region constructors, cloner,...
143
144    public DatastreamAnalysisOptimizer() {
145      name = "Datastream Analysis";
146      log = new Log();
147      results = new ResultCollection();
148      ensembles = new ItemList<RegressionEnsembleModel>();
149      datastream = new Datastream();
150      runsCounter = 0;
151      runs = new RunCollection();
152      Initialize();
153    }
154
155    [StorableConstructor]
156    protected DatastreamAnalysisOptimizer(bool deserializing) : base(deserializing) {
157    }
158
159    [StorableHook(HookType.AfterDeserialization)]
160    private void AfterDeserialization() {
161      // nothing to do in here
162    }
163
164    protected DatastreamAnalysisOptimizer(DatastreamAnalysisOptimizer original, Cloner cloner) : base(original, cloner) {
165      name = original.name;
166      log = cloner.Clone(original.log);
167      results = cloner.Clone(original.results);
168      ensembles = (ItemList<RegressionEnsembleModel>) original.Ensembles.Clone(cloner);
169      datastream = (Datastream) original.Datastream.Clone(cloner);
170      runsCounter = original.runsCounter;
171      runs = cloner.Clone(original.runs);
172      Initialize();
173    }
174
175    public override IDeepCloneable Clone(Cloner cloner) {
176      return new DatastreamAnalysisOptimizer(this, cloner);
177    }
178
179    private void Initialize() {
180      if (runs != null) RegisterRunsEvents();
181    }
182    #endregion
183
184    #region control actions
185
186    public override void Prepare() {
187      if (ensembles == null || datastream == null) return;
188      Prepare(true);
189    }
190
191    public void Prepare(bool clearRuns) {
192      if (ensembles == null || datastream == null) return;
193      base.Prepare();
194      if (clearRuns) results.Clear();
195      OnPrepared();
196    }
197
198    public override void Start() {
199      if (ensembles == null || datastream == null) return;
200      base.Start();
201      cancellationTokenSource = new CancellationTokenSource();
202      stopPending = false;
203
204      Task task = Task.Factory.StartNew(Run, cancellationTokenSource.Token, cancellationTokenSource.Token);
205      task.ContinueWith(t => {
206        try {
207          t.Wait();
208        }
209        catch (AggregateException ex) {
210          try {
211            ex.Flatten().Handle(x => x is OperationCanceledException);
212          } catch (AggregateException remaining) {
213            if(remaining.InnerExceptions.Count == 1) OnExceptionOccurred(remaining.InnerExceptions[0]);
214            else OnExceptionOccurred(remaining);
215          }
216        }
217        cancellationTokenSource.Dispose();
218        cancellationTokenSource = null;
219
220        // handle stop/pause
221        if(stopPending) { }
222        if(!Datastream.SlidingWindowMovementPossible) OnStopped();
223        else OnPaused();
224      });
225
226    }
227
228    public override void Pause() {
229      if (ensembles == null || datastream == null) return;
230      base.Pause();
231      cancellationTokenSource.Cancel();
232    }
233
234    public override void Stop() {
235      if (ensembles == null || datastream == null) return;
236      base.Stop();
237      if (ExecutionState == ExecutionState.Paused) {
238        OnStopped();
239      } else {
240        stopPending = true;
241        cancellationTokenSource.Cancel();
242      }
243    }
244
245    protected override void OnPrepared() {
246      Log.LogMessage("Datastream analysis prepared");
247      base.OnPrepared();
248    }
249
250    protected override void OnStarted() {
251      Log.LogMessage("Datastream analysis started");
252      base.OnStarted();
253    }
254
255    protected override void OnPaused() {
256      Log.LogMessage("Datastream analysis paused");
257      base.OnPaused();
258    }
259
260    protected override void OnStopped() {
261      Log.LogMessage("Datastream analysis stopped");
262      base.OnStopped();
263    }
264
265    #endregion
266
267
268    private void Run(object state) {
269      CancellationToken cancellationToken = (CancellationToken) state;
270      OnStarted();
271      lastUpdateTime = DateTime.UtcNow;
272      System.Timers.Timer timer = new System.Timers.Timer(250);
273      timer.AutoReset = true;
274      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
275      timer.Start();
276
277      try {
278        Run(cancellationToken);
279      }
280      finally {
281        timer.Elapsed -= new System.Timers.ElapsedEventHandler(timer_Elapsed);
282        timer.Stop();
283        ExecutionTime += DateTime.UtcNow - lastUpdateTime;
284      }
285
286      cancellationToken.ThrowIfCancellationRequested();
287    }
288
289    protected void Run(CancellationToken cancellationToken) {
290
291      // setup results
292      var slidingWindowMovements = new IntValue(0);
293      Results.Add(new Result("Sliding Window Movements", slidingWindowMovements));
294
295      var qtable = new DataTable("Qualities");
296      qtable.Rows.Add(new DataRow("R²"));
297      qtable.Rows.Add(new DataRow("Pearson"));
298      Results.Add(new Result("Qualities", qtable));
299
300      var curLoss = new DoubleValue();
301      Results.Add(new Result("R²", curLoss));
302      Results.Add(new Result("Best Quality", curLoss));
303
304
305      // init algorithm
306      var problemData = Datastream.ProblemData;
307      var targetVarName = problemData.TargetVariable;
308      var activeVariables = problemData.AllowedInputVariables;
309      Random rnd = new Random();
310
311      try {
312        while (Datastream.SlidingWindowMovementPossible) {
313          cancellationToken.ThrowIfCancellationRequested();
314
315          Task.Delay(Datastream.SlidingWindowMovementInterval.Value);
316
317          Datastream.MoveSlidingWindow();
318
319          // TODO do the window evaluation
320
321          // TODO: collect results and display them
322
323          curLoss.Value = rnd.Next(100);
324          qtable.Rows["R²"].Values.Add(curLoss.Value);
325          qtable.Rows["Pearson"].Values.Add(curLoss.Value % 10);
326
327          slidingWindowMovements.Value++;
328        }
329
330        // TODO: collect runs (c.f. goal seeking)
331      }
332      catch (Exception ex) {
333        if (ex is ArgumentOutOfRangeException) throw ex;
334        if (ex is OperationCanceledException) throw ex;
335      }
336      finally {
337        // reset everything
338        //Prepare(true);
339      }
340    }
341
342    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
343      System.Timers.Timer timer = (System.Timers.Timer)sender;
344      timer.Enabled = false;
345      DateTime now = DateTime.UtcNow;
346      ExecutionTime += now - lastUpdateTime;
347      lastUpdateTime = now;
348      timer.Enabled = true;
349    }
350
351    public void CollectResultValues(IDictionary<string, IItem> values) {
352      values.Add("Execution Time", new TimeSpanValue(ExecutionTime));
353      Results.CollectResultValues(values);
354    }
355
356    #region events
357
358    public EventHandler EnsemblesChanged;
359    public EventHandler DatastreamChanged;
360
361    protected virtual void DeregisterRunsEvents() {
362      runs.CollectionReset -= new CollectionItemsChangedEventHandler<IRun>(Runs_CollectionReset);
363    }
364
365    protected virtual void RegisterRunsEvents() {
366      runs.CollectionReset += new CollectionItemsChangedEventHandler<IRun>(Runs_CollectionReset);
367    }
368
369    protected virtual void Runs_CollectionReset(object sender, CollectionItemsChangedEventArgs<IRun> e) {
370      runsCounter = runs.Count;
371    }
372
373    #region event handling
374
375    private void OnEnsemblesChanged() {
376      var changed = EnsemblesChanged;
377      if (changed != null)
378        changed(this, EventArgs.Empty);
379    }
380
381    private void OnDatastreamChanged() {
382      var changed = DatastreamChanged;
383      if (changed != null)
384        changed(this, EventArgs.Empty);
385    }
386
387    #endregion
388
389    #endregion
390
391    #region nameditem
392
393    [Storable] protected string name;
394
395    /// <inheritdoc/>
396    /// <remarks>Calls <see cref="OnNameChanging"/> and also <see cref="OnNameChanged"/>
397    /// eventually in the setter.</remarks>
398    public string Name {
399      get { return name; }
400      set {
401        if (!CanChangeName) throw new NotSupportedException("Name cannot be changed.");
402        if (!(name.Equals(value) || (value == null) && (name == string.Empty))) {
403          CancelEventArgs<string> e = value == null
404            ? new CancelEventArgs<string>(string.Empty)
405            : new CancelEventArgs<string>(value);
406          OnNameChanging(e);
407          if (!e.Cancel) {
408            name = value == null ? string.Empty : value;
409            OnNameChanged();
410          }
411        }
412      }
413    }
414
415    public virtual bool CanChangeName {
416      get { return true; }
417    }
418
419    [Storable] protected string description;
420
421    public string Description {
422      get { return description; }
423      set {
424        if (!CanChangeDescription) throw new NotSupportedException("Description cannot be changed.");
425        if (!(description.Equals(value) || (value == null) && (description == string.Empty))) {
426          description = value == null ? string.Empty : value;
427          OnDescriptionChanged();
428        }
429      }
430    }
431
432    public virtual bool CanChangeDescription {
433      get { return true; }
434    }
435
436    /// <summary>
437    /// Gets the string representation of the current instance in the format: <c>Name: [null|Value]</c>.
438    /// </summary>
439    /// <returns>The current instance as a string.</returns>
440    public override string ToString() {
441      return Name;
442    }
443
444    /// <inheritdoc/>
445    public event EventHandler<CancelEventArgs<string>> NameChanging;
446
447    /// <summary>
448    /// Fires a new <c>NameChanging</c> event.
449    /// </summary>
450    /// <param name="e">The event arguments of the changing.</param>
451    protected virtual void OnNameChanging(CancelEventArgs<string> e) {
452      var handler = NameChanging;
453      if (handler != null) handler(this, e);
454    }
455
456    /// <inheritdoc/>
457    public event EventHandler NameChanged;
458
459    /// <summary>
460    /// Fires a new <c>NameChanged</c> event.
461    /// </summary>
462    /// <remarks>Calls <see cref="ItemBase.OnChanged"/>.</remarks>
463    protected virtual void OnNameChanged() {
464      var handler = NameChanged;
465      if (handler != null) handler(this, EventArgs.Empty);
466      OnToStringChanged();
467    }
468
469    /// <inheritdoc/>
470    public event EventHandler DescriptionChanged;
471
472    /// <summary>
473    /// Fires a new <c>DescriptionChanged</c> event.
474    /// </summary>
475    /// <remarks>Calls <see cref="ItemBase.OnChanged"/>.</remarks>
476    protected virtual void OnDescriptionChanged() {
477      var handler = DescriptionChanged;
478      if (handler != null) handler(this, EventArgs.Empty);
479    }
480
481    #endregion nameditem
482  }
483}
Note: See TracBrowser for help on using the repository browser.