#region License Information /* HeuristicLab * Copyright (C) 2002-2016 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.ComponentModel; using System.Diagnostics; using System.Drawing; using System.Linq; using System.Threading; using System.Threading.Tasks; using HeuristicLab.Analysis; using HeuristicLab.Collections; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Data; using HeuristicLab.Optimization; using HeuristicLab.Persistence.Default.CompositeSerializers.Storable; using HeuristicLab.Problems.DataAnalysis; namespace HeuristicLab.DatastreamAnalysis { internal enum DatastreamAnalysisOptimizerAction { None, Prepare, Start, Stop, Pause } [StorableClass] [Item("DatastreamAnalysis Optimizer", "The main loop for evaluating ensemble models against a incoming datastream of time series fashion.")] [Creatable(CreatableAttribute.Categories.Algorithms)] public class DatastreamAnalysisOptimizer : Executable, IOptimizer, IStorableContent { #region properties public string Filename { get; set; } private DatastreamAnalysisOptimizerAction daoAction; public IEnumerable NestedOptimizers { get; } [Storable] protected ILog log; public ILog Log { get { return log; } } [Storable] private ResultCollection results; public ResultCollection Results { get { return results; } } private CancellationTokenSource cancellationTokenSource; private bool stopPending; private DateTime lastUpdateTime; private bool prepared; private bool finished; [Storable] protected int runsCounter; [Storable] private RunCollection runs; public RunCollection Runs { get { return runs; } protected set { if (value == null) throw new ArgumentNullException(); if (runs != value) { if (runs != null) DeregisterRunsEvents(); runs = value; if (runs != null) RegisterRunsEvents(); } } } [Storable] private IItemList ensembles; public IItemList Ensembles { get { return ensembles; } set { if (value == null || value == ensembles) return; if(!(value is IRegressionEnsembleModel)) throw new ArgumentException("Invaid ensemble model type"); DeregisterEnsembleEvents(); ensembles = value; RegisterEnsembleEvents(); OnEnsemblesChanged(); Prepare(); } } // VAR 1: datastream ~= problem data, VAR 2 (TODO): datastream = external source e.g. webservice, AMQP-Queue, etc. [Storable] private Datastream datastream; public Datastream Datastream { get { return datastream; } set { if (value == null || value == datastream) return; if(!(value is IDatastream)) throw new ArgumentException("Invalid datastream type"); DeregisterDatastreamEvents(); datastream = value; RegisterDatastreamEvents(); OnDatastreamChanged(); Prepare(); } } #endregion properties #region results properties private int ResultsSlidingWindowMovements { get { return ((IntValue)Results["Sliding Window Movements"].Value).Value; } set { ((IntValue)Results["Sliding Window Movements"].Value).Value = value; } } private double ResultsBestQuality { get { return ((DoubleValue)Results["Best Quality"].Value).Value; } set { ((DoubleValue)Results["Best Quality"].Value).Value = value; } } private DataTable ResultsQualities { get { return ((DataTable)Results["Qualities"].Value); } } private const string ResultsQualitiesR2 = "R²"; private const string ResultsQualitiesPearson = "Pearson"; private DataTable ResultsTargets { get { return ((DataTable) Results["Targets"].Value); } } private const string ResultsTargetsReal = "Real"; protected void SetupResults() { Results.Clear(); Results.Add(new Result("Sliding Window Movements", new IntValue(0))); Results.Add(new Result("Best Quality", new DoubleValue(0))); Results.Add(new Result("Qualities", new DataTable("Qualities"))); Results.Add(new Result("Targets", new DataTable("Targets"))); ResultsQualities.Rows.Add(new DataRow(ResultsQualitiesR2)); ResultsQualities.Rows.Add(new DataRow(ResultsQualitiesPearson)); ResultsTargets.Rows.Add(new DataRow(ResultsTargetsReal)); foreach (var ensemble in Ensembles) { ResultsTargets.Rows.Add(new DataRow(ensemble.Name)); } } #endregion #region constructors, cloner,... public DatastreamAnalysisOptimizer() : base() { name = "Datastream Analysis"; log = new Log(); results = new ResultCollection(); ensembles = new ItemList(); datastream = new Datastream(); runsCounter = 0; runs = new RunCollection(); Initialize(); } [StorableConstructor] protected DatastreamAnalysisOptimizer(bool deserializing) : base(deserializing) { } [StorableHook(HookType.AfterDeserialization)] private void AfterDeserialization() { Initialize(); } protected DatastreamAnalysisOptimizer(DatastreamAnalysisOptimizer original, Cloner cloner) : base(original, cloner) { name = original.name; log = cloner.Clone(original.log); results = cloner.Clone(original.results); ensembles = (ItemList) original.Ensembles.Clone(cloner); datastream = (Datastream) original.Datastream.Clone(cloner); runsCounter = original.runsCounter; runs = cloner.Clone(original.runs); Initialize(); } public override IDeepCloneable Clone(Cloner cloner) { return new DatastreamAnalysisOptimizer(this, cloner); } private void Initialize() { if (runs != null) RegisterRunsEvents(); if (datastream != null) RegisterDatastreamEvents(); if (ensembles != null) RegisterEnsembleEvents(); } #endregion #region control actions public override void Prepare() { if (ensembles == null || ensembles.Count == 0 || datastream == null || !datastream.SlidingWindowEvaluationPossible) return; //if (ensembles.SelectMany(x => x.Models).Count() == 0) return; base.Prepare(); OnPrepared(); } public void Prepare(bool clearRuns) { if (ensembles == null || ensembles.Count == 0 || datastream == null || !datastream.SlidingWindowEvaluationPossible) return; base.Prepare(); if (clearRuns) runs.Clear(); OnPrepared(); } public override void Start() { if (ensembles == null || datastream == null) return; base.Start(); cancellationTokenSource = new CancellationTokenSource(); stopPending = false; if (prepared) { SetupResults(); //prepared = false; } Task task = Task.Factory.StartNew(Run, cancellationTokenSource.Token, cancellationTokenSource.Token); task.ContinueWith(t => { try { t.Wait(); } catch (AggregateException ex) { try { ex.Flatten().Handle(x => x is OperationCanceledException); } catch (AggregateException remaining) { if(remaining.InnerExceptions.Count == 1) OnExceptionOccurred(remaining.InnerExceptions[0]); else OnExceptionOccurred(remaining); } } cancellationTokenSource.Dispose(); cancellationTokenSource = null; // handle stop/pause if (stopPending || finished) { OnStopped(); } else { OnPaused(); } //if(!Datastream.SlidingWindowMovementPossible) OnStopped(); //else OnPaused(); }); } public override void Pause() { if (ensembles == null || datastream == null) return; base.Pause(); cancellationTokenSource.Cancel(); } public override void Stop() { if (ensembles == null || datastream == null) return; base.Stop(); if (ExecutionState == ExecutionState.Paused) { OnStopped(); } else { stopPending = true; cancellationTokenSource.Cancel(); } } protected override void OnPrepared() { ExecutionTime = TimeSpan.Zero; foreach (IStatefulItem statefulItem in this.GetObjectGraphObjects(new HashSet() {Runs}).OfType()) { statefulItem.InitializeState(); } results.Clear(); prepared = true; finished = false; Log.LogMessage("Datastream analysis prepared"); base.OnPrepared(); } protected override void OnStarted() { Log.LogMessage("Datastream analysis started"); base.OnStarted(); } protected override void OnPaused() { Log.LogMessage("Datastream analysis paused"); base.OnPaused(); } protected override void OnStopped() { try { runsCounter++; var run = new Run(); run.Filename = Filename; run.Name = string.Format("{0} Run {1}", Name, runsCounter); CollectParameterValues(run.Parameters); CollectResultValues(run.Results); runs.Add(run); } finally { Log.LogMessage("Datastream analysis stopped"); base.OnStopped(); } } #endregion #region run private void Run(object state) { CancellationToken cancellationToken = (CancellationToken) state; OnStarted(); lastUpdateTime = DateTime.UtcNow; System.Timers.Timer timer = new System.Timers.Timer(250); timer.AutoReset = true; timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed); timer.Start(); try { Run(cancellationToken); } finally { timer.Elapsed -= new System.Timers.ElapsedEventHandler(timer_Elapsed); timer.Stop(); ExecutionTime += DateTime.UtcNow - lastUpdateTime; } cancellationToken.ThrowIfCancellationRequested(); } private int replayedIndex; protected void Run(CancellationToken cancellationToken) { // init algorithm var problemData = Datastream.ProblemData; var targetVarName = problemData.TargetVariable; var activeVariables = problemData.AllowedInputVariables; if (prepared) { // replay datastream until FitnessPartition.End //for (int i = 0; i < Datastream.FitnessPartition.End; i++) { // var val = problemData.Dataset.GetDoubleValue(targetVarName, i); // ResultsTargets.Rows[ResultsTargetsReal].Values.Add(val); //} //replayedIndex = Datastream.FitnessPartition.End; replayedIndex = 0; prepared = false; } Random rnd = new Random(); try { do { do { cancellationToken.ThrowIfCancellationRequested(); Thread.Sleep(Datastream.SlidingWindowMovementInterval.Value); Datastream.MoveSlidingWindow(); if (Datastream.SlidingWindowEvaluationPossible) { foreach (var ensemble in Ensembles) { // TODO do the window evaluation double estimatedVal = problemData.Dataset.GetDoubleValue(targetVarName, replayedIndex) * (1.0 + (((double)rnd.Next(0, 30)) / 100)); for (int i = replayedIndex; i < Datastream.FitnessPartition.End; i++) { ResultsTargets.Rows[ensemble.Name].Values.Add(estimatedVal); } } } // replay datastream until FitnessPartition.End for (int i = replayedIndex; i < Datastream.FitnessPartition.End; i++) { var val = problemData.Dataset.GetDoubleValue(targetVarName, i); ResultsTargets.Rows[ResultsTargetsReal].Values.Add(val); } replayedIndex = Datastream.FitnessPartition.End; // TODO: collect results and display them int exp = rnd.Next(100); ResultsQualities.Rows[ResultsQualitiesR2].Values.Add(exp); ResultsQualities.Rows[ResultsQualitiesPearson].Values.Add((double)exp / 10); ResultsSlidingWindowMovements++; ResultsBestQuality = (double)exp / 42; } while (Datastream.SlidingWindowMovementPossible); } while (Datastream.UpdateAvailable); finished = true; } catch (Exception ex) { if (ex is ArgumentOutOfRangeException) throw ex; if (ex is OperationCanceledException) throw ex; } finally { // reset everything //Prepare(true); } } private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { System.Timers.Timer timer = (System.Timers.Timer)sender; timer.Enabled = false; DateTime now = DateTime.UtcNow; ExecutionTime += now - lastUpdateTime; lastUpdateTime = now; timer.Enabled = true; } public void CollectParameterValues(IDictionary values) { values.Add("Datastream Analysis Name", new StringValue(Name)); if (Datastream != null) { Datastream.CollectParameterValues(values); values.Add("Datastream Name", new StringValue(Datastream.Name)); } } public void CollectResultValues(IDictionary values) { values.Add("Execution Time", new TimeSpanValue(ExecutionTime)); Results.CollectResultValues(values); } #endregion #region events #region events registration public EventHandler EnsemblesChanged; public EventHandler DatastreamChanged; protected virtual void DeregisterRunsEvents() { runs.CollectionReset -= new CollectionItemsChangedEventHandler(Runs_CollectionReset); } protected virtual void RegisterRunsEvents() { runs.CollectionReset += new CollectionItemsChangedEventHandler(Runs_CollectionReset); } protected virtual void RegisterDatastreamEvents() { datastream.Reset += new EventHandler(Datastream_Reset); datastream.ProblemDataChanged += new EventHandler(Datastream_ProblemDataChanged); } protected virtual void DeregisterDatastreamEvents() { datastream.Reset -= new EventHandler(Datastream_Reset); datastream.ProblemDataChanged -= new EventHandler(Datastream_ProblemDataChanged); } protected virtual void RegisterEnsembleEvents() { ensembles.ItemsAdded += new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.ItemsMoved += new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.ItemsRemoved += new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.ItemsReplaced += new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.CollectionReset += new CollectionItemsChangedEventHandler>(Ensembles_Reset); } protected virtual void DeregisterEnsembleEvents() { ensembles.ItemsAdded -= new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.ItemsMoved -= new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.ItemsRemoved -= new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.ItemsReplaced -= new CollectionItemsChangedEventHandler>(Ensembles_ItemsChanged); ensembles.CollectionReset -= new CollectionItemsChangedEventHandler>(Ensembles_Reset); } #endregion #region event handling protected virtual void Runs_CollectionReset(object sender, CollectionItemsChangedEventArgs e) { runsCounter = runs.Count; } protected virtual void Datastream_Reset(object sender, EventArgs e) { Prepare(); } protected virtual void Datastream_ProblemDataChanged(object sender, EventArgs e) { Prepare(); } protected virtual void Ensembles_Reset(object sender, EventArgs e) { Prepare(); } protected virtual void Ensembles_ItemsChanged(object sender, EventArgs e) { Prepare(); } private void OnEnsemblesChanged() { var changed = EnsemblesChanged; if (changed != null) changed(this, EventArgs.Empty); } private void OnDatastreamChanged() { var changed = DatastreamChanged; if (changed != null) changed(this, EventArgs.Empty); } #endregion #endregion #region NamedItem [Storable] protected string name; /// /// Calls and also /// eventually in the setter. public string Name { get { return name; } set { if (!CanChangeName) throw new NotSupportedException("Name cannot be changed."); if (!(name.Equals(value) || (value == null) && (name == string.Empty))) { CancelEventArgs e = value == null ? new CancelEventArgs(string.Empty) : new CancelEventArgs(value); OnNameChanging(e); if (!e.Cancel) { name = value == null ? string.Empty : value; OnNameChanged(); } } } } public virtual bool CanChangeName { get { return true; } } [Storable] protected string description; public string Description { get { return description; } set { if (!CanChangeDescription) throw new NotSupportedException("Description cannot be changed."); if (!(description.Equals(value) || (value == null) && (description == string.Empty))) { description = value == null ? string.Empty : value; OnDescriptionChanged(); } } } public virtual bool CanChangeDescription { get { return true; } } /// /// Gets the string representation of the current instance in the format: Name: [null|Value]. /// /// The current instance as a string. public override string ToString() { return Name; } /// public event EventHandler> NameChanging; /// /// Fires a new NameChanging event. /// /// The event arguments of the changing. protected virtual void OnNameChanging(CancelEventArgs e) { var handler = NameChanging; if (handler != null) handler(this, e); } /// public event EventHandler NameChanged; /// /// Fires a new NameChanged event. /// /// Calls . protected virtual void OnNameChanged() { var handler = NameChanged; if (handler != null) handler(this, EventArgs.Empty); OnToStringChanged(); } /// public event EventHandler DescriptionChanged; /// /// Fires a new DescriptionChanged event. /// /// Calls . protected virtual void OnDescriptionChanged() { var handler = DescriptionChanged; if (handler != null) handler(this, EventArgs.Empty); } #endregion nameditem } }