#region License Information /* HeuristicLab * Copyright (C) 2002-2016 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.ComponentModel; using System.Diagnostics; using System.Drawing; using System.Linq; using System.Threading; using System.Threading.Tasks; using HeuristicLab.Analysis; using HeuristicLab.Collections; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Data; using HeuristicLab.Optimization; using HeuristicLab.Persistence.Default.CompositeSerializers.Storable; using HeuristicLab.Problems.DataAnalysis; namespace HeuristicLab.DatastreamAnalysis { internal enum DatastreamAnalysisOptimizerAction { None, Prepare, Start, Stop, Pause } [StorableClass] [Item("DatastreamAnalysis Optimizer", "The main loop for evaluating ensemble models against a incoming datastream of time series fashion.")] [Creatable(CreatableAttribute.Categories.Algorithms)] public class DatastreamAnalysisOptimizer : Executable, IOptimizer, IStorableContent { #region properties public string Filename { get; set; } private DatastreamAnalysisOptimizerAction daoAction; public IEnumerable NestedOptimizers { get; } [Storable] protected ILog log; public ILog Log { get { return log; } } [Storable] private ResultCollection results; public ResultCollection Results { get { return results; } } private CancellationTokenSource cancellationTokenSource; private bool stopPending; private DateTime lastUpdateTime; private bool prepared; private bool finished; [Storable] protected int runsCounter; [Storable] private RunCollection runs; public RunCollection Runs { get { return runs; } protected set { if (value == null) throw new ArgumentNullException(); if (runs != value) { if (runs != null) DeregisterRunsEvents(); runs = value; if (runs != null) RegisterRunsEvents(); } } } [Storable] private IItemList ensembles; public IItemList Ensembles { get { return ensembles; } set { if (value == null || value == ensembles) return; if(!(value is IRegressionEnsembleModel)) throw new ArgumentException("Invaid ensemble model type"); DeregisterEnsembleEvents(); ensembles = value; RegisterEnsembleEvents(); OnEnsemblesChanged(); Prepare(); } } // VAR 1: datastream ~= problem data, VAR 2 (TODO): datastream = external source e.g. webservice, AMQP-Queue, etc. [Storable] private Datastream datastream; public Datastream Datastream { get { return datastream; } set { if (value == null || value == datastream) return; if(!(value is IDatastream)) throw new ArgumentException("Invalid datastream type"); DeregisterDatastreamEvents(); datastream = value; RegisterDatastreamEvents(); OnDatastreamChanged(); Prepare(); } } #endregion properties #region results properties private int ResultsSlidingWindowMovements { get { return ((IntValue)Results["Sliding Window Movements"].Value).Value; } set { ((IntValue)Results["Sliding Window Movements"].Value).Value = value; } } private DataTable ResultsQualities { get { return ((DataTable)Results["Qualities"].Value); } } private const string ResultsQualitiesMSE = "Mean squared error"; private const string ResultsQualitiesPR2 = "Pearson R²"; private DataTable ResultsTargets { get { return ((DataTable) Results["Targets"].Value); } } private const string ResultsTargetsReal = "Real"; private DataTable ResultsQualitiesBars { get { return ((DataTable) Results["Ensembles"].Value); } } protected void SetupResults() { Results.Clear(); Results.Add(new Result("Sliding Window Movements", new IntValue(0))); Results.Add(new Result("Qualities", new DataTable("Qualities"))); Results.Add(new Result("Targets", new DataTable("Targets"))); ResultsQualitiesBars.Rows.Add(new DataRow("Ensembles")); ResultsTargets.Rows.Add(new DataRow(ResultsTargetsReal)); foreach (var ensemble in Ensembles) { // targets table ResultsTargets.Rows.Add(new DataRow(ensemble.Name)); // qualities (series) //ResultsQualities.Rows.Add(new DataRow(ensemble.Name + " - " + ResultsQualitiesMSE)); ResultsQualities.Rows.Add(new DataRow(ensemble.Name + " - " + ResultsQualitiesPR2)); // qualities (bars) //ResultsQualitiesBars.Rows.Add(new DataRow(ensemble.Name + " - " + ResultsQualitiesPR2)); //ResultsQualitiesBars.Rows["Ensembles"].Values. } } #endregion #region constructors, cloner,... public DatastreamAnalysisOptimizer() : base() { name = "Datastream Analysis"; log = new Log(); results = new ResultCollection(); ensembles = new ItemList(); datastream = new Datastream(); runsCounter = 0; runs = new RunCollection(); Initialize(); } [StorableConstructor] protected DatastreamAnalysisOptimizer(bool deserializing) : base(deserializing) { } [StorableHook(HookType.AfterDeserialization)] private void AfterDeserialization() { Initialize(); } protected DatastreamAnalysisOptimizer(DatastreamAnalysisOptimizer original, Cloner cloner) : base(original, cloner) { name = original.name; log = cloner.Clone(original.log); results = cloner.Clone(original.results); ensembles = (ItemList) original.Ensembles.Clone(cloner); datastream = (Datastream) original.Datastream.Clone(cloner); runsCounter = original.runsCounter; runs = cloner.Clone(original.runs); Initialize(); } public override IDeepCloneable Clone(Cloner cloner) { return new DatastreamAnalysisOptimizer(this, cloner); } private void Initialize() { if (runs != null) RegisterRunsEvents(); if (datastream != null) RegisterDatastreamEvents(); if (ensembles != null) RegisterEnsembleEvents(); } #endregion #region control actions public override void Prepare() { if (ensembles == null || ensembles.Count == 0 || datastream == null || !datastream.SlidingWindowEvaluationPossible) return; //if (ensembles.SelectMany(x => x.Models).Count() == 0) return; base.Prepare(); OnPrepared(); } public void Prepare(bool clearRuns) { if (ensembles == null || ensembles.Count == 0 || datastream == null || !datastream.SlidingWindowEvaluationPossible) return; base.Prepare(); if (clearRuns) runs.Clear(); OnPrepared(); } public override void Start() { if (ensembles == null || datastream == null) return; base.Start(); cancellationTokenSource = new CancellationTokenSource(); stopPending = false; if (prepared) { SetupResults(); Datastream.InitializeState(); } Task task = Task.Factory.StartNew(Run, cancellationTokenSource.Token, cancellationTokenSource.Token); task.ContinueWith(t => { try { t.Wait(); } catch (AggregateException ex) { try { ex.Flatten().Handle(x => x is OperationCanceledException); } catch (AggregateException remaining) { if(remaining.InnerExceptions.Count == 1) OnExceptionOccurred(remaining.InnerExceptions[0]); else OnExceptionOccurred(remaining); } } cancellationTokenSource.Dispose(); cancellationTokenSource = null; // handle stop/pause if (stopPending || finished) { OnStopped(); } else { OnPaused(); } }); } public override void Pause() { if (ensembles == null || datastream == null) return; base.Pause(); cancellationTokenSource.Cancel(); } public override void Stop() { if (ensembles == null || datastream == null) return; base.Stop(); if (ExecutionState == ExecutionState.Paused) { OnStopped(); } else { stopPending = true; cancellationTokenSource.Cancel(); } } protected override void OnPrepared() { ExecutionTime = TimeSpan.Zero; foreach (IStatefulItem statefulItem in this.GetObjectGraphObjects(new HashSet() {Runs}).OfType()) { statefulItem.InitializeState(); } results.Clear(); prepared = true; finished = false; Log.LogMessage("Datastream analysis prepared"); base.OnPrepared(); } protected override void OnStarted() { Log.LogMessage("Datastream analysis started"); base.OnStarted(); } protected override void OnPaused() { Log.LogMessage("Datastream analysis paused"); base.OnPaused(); } protected override void OnStopped() { try { runsCounter++; var run = new Run(); run.Filename = Filename; run.Name = string.Format("{0} Run {1}", Name, runsCounter); CollectParameterValues(run.Parameters); CollectResultValues(run.Results); runs.Add(run); } finally { Log.LogMessage("Datastream analysis stopped"); base.OnStopped(); } } #endregion #region run private void Run(object state) { CancellationToken cancellationToken = (CancellationToken) state; OnStarted(); lastUpdateTime = DateTime.UtcNow; System.Timers.Timer timer = new System.Timers.Timer(250); timer.AutoReset = true; timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed); timer.Start(); try { Run(cancellationToken); } finally { timer.Elapsed -= new System.Timers.ElapsedEventHandler(timer_Elapsed); timer.Stop(); ExecutionTime += DateTime.UtcNow - lastUpdateTime; } cancellationToken.ThrowIfCancellationRequested(); } private int replayedIndex; protected void Run(CancellationToken cancellationToken) { if (prepared) { replayedIndex = 0; prepared = false; } try { // play and evaluate initial window PlayDatastream(); if(Datastream.SlidingWindowEvaluationPossible) Evaluate(); replayedIndex = Datastream.FitnessPartition.End; do { while(Datastream.SlidingWindowMovementPossible) { cancellationToken.ThrowIfCancellationRequested(); // perform (delayed) window movement Thread.Sleep(Datastream.SlidingWindowMovementInterval.Value); Datastream.MoveSlidingWindow(); ResultsSlidingWindowMovements++; // play and evaluate the moved window PlayDatastream(); if (Datastream.SlidingWindowEvaluationPossible) Evaluate(); replayedIndex = Datastream.FitnessPartition.End; } } while (Datastream.UpdateAvailable); finished = true; } catch (Exception ex) { if (ex is ArgumentOutOfRangeException) throw ex; if (ex is OperationCanceledException) throw ex; } finally { // reset everything //Prepare(true); } } private void PlayDatastream() { var problemData = Datastream.ProblemData; var targetVarName = problemData.TargetVariable; for (int i = replayedIndex; i < Datastream.FitnessPartition.End; i++) { var realValue = problemData.Dataset.GetDoubleValue(targetVarName, i); ResultsTargets.Rows[ResultsTargetsReal].Values.Add(realValue); } } private void Evaluate() { var problemData = Datastream.ProblemData; var dataset = problemData.Dataset; var targetVarName = problemData.TargetVariable; var realRows = Enumerable.Range(Datastream.FitnessPartition.Start, Datastream.FitnessPartition.Size).ToList(); var realValues = dataset.GetDoubleValues(targetVarName, realRows); foreach (var ensemble in Ensembles) { var rows = Enumerable.Range(Datastream.FitnessPartition.Start, Datastream.FitnessPartition.Size); var estimatedValuesPerModelPerRow = ensemble.Models.Select(x => x.GetEstimatedValues(datastream.ProblemData.Dataset, rows).ToArray()); var estimatedValues = Enumerable.Range(0, Datastream.FitnessPartition.Size).Select(r => estimatedValuesPerModelPerRow.Select(e => e[r]).Average()).ToArray(); // per row var averageEstimatedValuesPerModel = estimatedValuesPerModelPerRow.Select(x => x.Average()); // per model var averageEstimatedValue = averageEstimatedValuesPerModel.Average(); // determine quality var mse = Math.Pow(averageEstimatedValue - realValues.Average(), 2); OnlineCalculatorError error; var pR = OnlinePearsonsRCalculator.Calculate(estimatedValues, realValues, out error); var pR2 = error == OnlineCalculatorError.None ? pR * pR : 0.0; DataRow qualitiesBarsRow = ResultsQualitiesBars.Rows[ensemble.Name + " - " + ResultsQualitiesPR2]; for (int i = replayedIndex; i < Datastream.FitnessPartition.End; i++) { ResultsTargets.Rows[ensemble.Name].Values.Add(averageEstimatedValue); //ResultsQualities.Rows[ensemble.Name + " - " + ResultsQualitiesMSE].Values.Add(mse); ResultsQualities.Rows[ensemble.Name + " - " + ResultsQualitiesPR2].Values.Add(pR2); } } } private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { System.Timers.Timer timer = (System.Timers.Timer)sender; timer.Enabled = false; DateTime now = DateTime.UtcNow; ExecutionTime += now - lastUpdateTime; lastUpdateTime = now; timer.Enabled = true; } public void CollectParameterValues(IDictionary values) { values.Add("Datastream Analysis Name", new StringValue(Name)); if (Datastream != null) { Datastream.CollectParameterValues(values); values.Add("Datastream Name", new StringValue(Datastream.Name)); } } public void CollectResultValues(IDictionary values) { values.Add("Execution Time", new TimeSpanValue(ExecutionTime)); Results.CollectResultValues(values); } #endregion #region events #region events registration public EventHandler EnsemblesChanged; public EventHandler DatastreamChanged; protected virtual void DeregisterRunsEvents() { runs.CollectionReset -= new CollectionItemsChangedEventHandler(Runs_CollectionReset); } protected virtual void RegisterRunsEvents() { runs.CollectionReset += new CollectionItemsChangedEventHandler(Runs_CollectionReset); } protected virtual void RegisterDatastreamEvents() { datastream.Reset += new EventHandler(Datastream_Reset); datastream.ProblemDataChanged += new EventHandler(Datastream_ProblemDataChanged); } protected virtual void DeregisterDatastreamEvents() { datastream.Reset -= new EventHandler(Datastream_Reset); datastream.ProblemDataChanged -= new EventHandler(Datastream_ProblemDataChanged); } protected virtual void RegisterEnsembleEvents() { ensembles.ItemsAdded += new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.ItemsMoved += new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.ItemsRemoved += new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.ItemsReplaced += new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.CollectionReset += new CollectionItemsChangedEventHandler>(Ensembles_Reset); } protected virtual void DeregisterEnsembleEvents() { ensembles.ItemsAdded -= new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.ItemsMoved -= new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.ItemsRemoved -= new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.ItemsReplaced -= new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.CollectionReset -= new CollectionItemsChangedEventHandler>(Ensembles_Reset); } #endregion #region event handling protected virtual void Runs_CollectionReset(object sender, CollectionItemsChangedEventArgs e) { runsCounter = runs.Count; } protected virtual void Datastream_Reset(object sender, EventArgs e) { Prepare(); } protected virtual void Datastream_ProblemDataChanged(object sender, EventArgs e) { Prepare(); } protected virtual void Ensembles_Reset(object sender, EventArgs e) { Prepare(); } protected virtual void Ensembles_ItemsChanged(object sender, EventArgs e) { Prepare(); } private void OnEnsemblesChanged() { var changed = EnsemblesChanged; if (changed != null) changed(this, EventArgs.Empty); } private void OnDatastreamChanged() { var changed = DatastreamChanged; if (changed != null) changed(this, EventArgs.Empty); } #endregion #endregion #region NamedItem [Storable] protected string name; /// /// Calls and also /// eventually in the setter. public string Name { get { return name; } set { if (!CanChangeName) throw new NotSupportedException("Name cannot be changed."); if (!(name.Equals(value) || (value == null) && (name == string.Empty))) { CancelEventArgs e = value == null ? new CancelEventArgs(string.Empty) : new CancelEventArgs(value); OnNameChanging(e); if (!e.Cancel) { name = value == null ? string.Empty : value; OnNameChanged(); } } } } public virtual bool CanChangeName { get { return true; } } [Storable] protected string description; public string Description { get { return description; } set { if (!CanChangeDescription) throw new NotSupportedException("Description cannot be changed."); if (!(description.Equals(value) || (value == null) && (description == string.Empty))) { description = value == null ? string.Empty : value; OnDescriptionChanged(); } } } public virtual bool CanChangeDescription { get { return true; } } /// /// Gets the string representation of the current instance in the format: Name: [null|Value]. /// /// The current instance as a string. public override string ToString() { return Name; } /// public event EventHandler> NameChanging; /// /// Fires a new NameChanging event. /// /// The event arguments of the changing. protected virtual void OnNameChanging(CancelEventArgs e) { var handler = NameChanging; if (handler != null) handler(this, e); } /// public event EventHandler NameChanged; /// /// Fires a new NameChanged event. /// /// Calls . protected virtual void OnNameChanged() { var handler = NameChanged; if (handler != null) handler(this, EventArgs.Empty); OnToStringChanged(); } /// public event EventHandler DescriptionChanged; /// /// Fires a new DescriptionChanged event. /// /// Calls . protected virtual void OnDescriptionChanged() { var handler = DescriptionChanged; if (handler != null) handler(this, EventArgs.Empty); } #endregion nameditem } }