using System.Threading; using System.Threading.Tasks; using HeuristicLab.Collections; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Core.Networks; using HeuristicLab.Data; using HeuristicLab.Optimization; using HeuristicLab.Parameters; using HeuristicLab.Persistence.Default.CompositeSerializers.Storable; namespace HeuristicLab.Networks.IntegratedOptimization { [Item("OrchestratorNode", "An orchestrator node.")] [StorableClass] public abstract class OrchestratorNode : Node, IOrchestratorNode { #region Constants private const string OrchestrationMessageParameterName = "OrchestrationMessage"; protected const string OrchestrationPortNameSuffix = "OrchestrationPort"; protected const string EvaluationPortNameSuffix = "EvaluationPort"; #endregion [Storable] private ParameterCollection parameters; protected ParameterCollection Parameters { get { return parameters; } } private ReadOnlyKeyedItemCollection readOnlyParameters; IKeyedItemCollection IOrchestratorNode.Parameters { get { if (readOnlyParameters == null) readOnlyParameters = parameters.AsReadOnly(); return readOnlyParameters; } } [Storable] private ResultCollection results; public ResultCollection Results { get { return results; } } [StorableConstructor] protected OrchestratorNode(bool deserializing) : base(deserializing) { } protected OrchestratorNode(OrchestratorNode original, Cloner cloner) : base(original, cloner) { results = cloner.Clone(original.results); parameters = cloner.Clone(original.parameters); readOnlyParameters = null; RegisterEvents(); } protected OrchestratorNode() : base("OrchestratorNode") { results = new ResultCollection(); parameters = new ParameterCollection(); Parameters.Add(new ValueParameter>("OrchestrationMessage", new EnumValue())); readOnlyParameters = null; } protected OrchestratorNode(string name) : base(name) { results = new ResultCollection(); parameters = new ParameterCollection(); Parameters.Add(new ValueParameter>("OrchestrationMessage", new EnumValue())); readOnlyParameters = null; } protected OrchestratorNode(string name, string description) : base(name, description) { results = new ResultCollection(); parameters = new ParameterCollection(); Parameters.Add(new ValueParameter>("OrchestrationMessage", new EnumValue())); readOnlyParameters = null; } [StorableHook(HookType.AfterDeserialization)] private void AfterDeserialization() { RegisterEvents(); } private void RegisterEvents() { foreach (var p in Ports) RegisterPortEvents(p); } protected void AddOrchestrationPort(string solverName) where T : class, IProblem { var orchestrationPort = new MessagePort(solverName + OrchestrationPortNameSuffix); orchestrationPort.Parameters.Add(new PortParameter>("OrchestrationMessage") { Type = PortParameterType.Output }); orchestrationPort.Parameters.Add(new PortParameter("Problem") { Type = PortParameterType.Output }); orchestrationPort.Parameters.Add(new PortParameter("Results") { Type = PortParameterType.Input }); Ports.Add(orchestrationPort); } protected void AddEvaluationPort(string solverName, string solutionName, string qualityName) where T : class, IItem { var evaluationPort = new MessagePort(solverName + EvaluationPortNameSuffix); evaluationPort.Parameters.Add(new PortParameter(solutionName) { Type = PortParameterType.Input }); evaluationPort.Parameters.Add(new PortParameter(qualityName) { Type = PortParameterType.Input | PortParameterType.Output }); Ports.Add(evaluationPort); } protected virtual void ProcessMessage(IMessage message, IMessagePort port, CancellationToken token) { } public abstract void Prepare(); public async Task PrepareAsync() { await Task.Run(() => Prepare()); } public abstract void Start(); public async Task StartAsync() { await Task.Run(() => Start()); } public abstract void Pause(); public async Task PauseAsync() { await Task.Run(() => Pause()); } public abstract void Stop(); public async Task StopAsync() { await Task.Run(() => Stop()); } #region Events protected override void Ports_ItemsAdded(object sender, CollectionItemsChangedEventArgs e) { base.Ports_ItemsAdded(sender, e); foreach (var p in e.Items) RegisterPortEvents(p); } protected override void Ports_ItemsRemoved(object sender, CollectionItemsChangedEventArgs e) { base.Ports_ItemsRemoved(sender, e); foreach (var p in e.Items) DeregisterPortEvents(p); } protected override void Ports_ItemsReplaced(object sender, CollectionItemsChangedEventArgs e) { base.Ports_ItemsReplaced(sender, e); foreach (var p in e.OldItems) DeregisterPortEvents(p); foreach (var p in e.Items) RegisterPortEvents(p); } protected override void Ports_CollectionReset(object sender, CollectionItemsChangedEventArgs e) { base.Ports_CollectionReset(sender, e); foreach (var p in e.OldItems) DeregisterPortEvents(p); foreach (var p in e.Items) RegisterPortEvents(p); } private void MessagePort_MessageReceived(object sender, EventArgs e) { ProcessMessage(e.Value, (IMessagePort)sender, e.Value2); } #endregion #region Port Events protected virtual void RegisterPortEvents(IPort port) { var mp = port as IMessagePort; if (mp != null) mp.MessageReceived += MessagePort_MessageReceived; } protected virtual void DeregisterPortEvents(IPort port) { var mp = port as IMessagePort; if (mp != null) mp.MessageReceived -= MessagePort_MessageReceived; } #endregion } }