using System; using System.Linq; using System.Threading; using System.Threading.Tasks; using DistributedGA.Core; using DistributedGA.Core.Domain; using DistributedGA.Core.Implementation; using DistributedGA.Core.Interface; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Data; using HeuristicLab.Optimization; using HeuristicLab.Parameters; using HeuristicLab.Persistence.Default.CompositeSerializers.Storable; namespace DistributedGA.Hive { [StorableClass] [Creatable(Category = "Haslinger's Very Special XO", Priority = 1000)] public class P2PTask : ParameterizedNamedItem, IOptimizer { [Storable] private DateTime startTime; [Storable] private RunCollection runCollection; #region Constructors and Cloning [StorableConstructor] protected P2PTask(bool deserializing) { } protected P2PTask(P2PTask original, Cloner cloner) : base(original, cloner) { startTime = original.startTime; runCollection = cloner.Clone(original.runCollection); } public P2PTask() { Name = "P2PTask"; Description = "P2PTask"; runCollection = new RunCollection(); Parameters.Add(new ValueParameter("LanIpPrefix", "", new StringValue("10."))); Parameters.Add(new ValueParameter("ContactServerURL", "", new StringValue("net.tcp://10.42.1.150:9090/DistributedGA.ContactServer/ContactService"))); Parameters.Add(new ValueParameter("Log", "", new Log())); } [StorableHook(HookType.AfterDeserialization)] protected virtual void AfterDeserialization() { } public override IDeepCloneable Clone(Cloner cloner) { return new P2PTask(this, cloner); } #endregion #region ITask Members public ExecutionState ExecutionState { get; private set; } public TimeSpan ExecutionTime { get; private set; } public void Prepare() { Prepare(true); } public void Prepare(bool clearRuns) { // ignore ExecutionState = HeuristicLab.Core.ExecutionState.Prepared; OnExecutionStateChanged(); OnPrepared(); } private CancellationTokenSource cts; private ManualResetEvent stoppedEvent; public void Start() { Task.Factory.StartNew(() => { cts = new CancellationTokenSource(); stoppedEvent = new ManualResetEvent(false); startTime = DateTime.Now; ExecutionState = HeuristicLab.Core.ExecutionState.Started; OnExecutionStateChanged(); OnStarted(); var log = ((Log)(Parameters["Log"].ActualValue)); try { log.LogMessage("Starting peer..."); IMessageHandler h = new PeerNetworkMessageHandler(); var lanIpPrefix = ((StringValue)(Parameters["LanIpPrefix"].ActualValue)).Value; var contactServerUri = ((StringValue)(Parameters["ContactServerURL"].ActualValue)).Value; h.Init(lanIpPrefix, contactServerUri, "TEST", 10000, 100); PeerInfo pi = h.GetPeerInfo(); log.LogMessage(string.Format("Peer is hostet at IP: {0} and port: {1}", pi.IpAddress, pi.Port)); Thread.Sleep(1000 * 20); log.LogMessage("Current peers within network:"); foreach (var item in h.GetCurrentNetwork()) { log.LogMessage(string.Format("Peer at {0}:{1}", item.IpAddress, item.Port)); } int i = 0; while (i < 10 && !cts.Token.IsCancellationRequested) { i++; Thread.Sleep(1000 * 10); var message = CreateMessage(pi, i); Console.WriteLine("Publishing messages..."); h.PublishDataToNetwork(message); Console.WriteLine("Messages published."); Console.WriteLine("Recieved messages:"); foreach (var item in h.GetDataFromNetwork()) { log.LogMessage(string.Format("Message:{0}", GetString(item))); } ExecutionTime = DateTime.Now - startTime; OnExecutionTimeChanged(); } } catch (Exception ex) { log.LogMessage(ex.Message); log.LogMessage("press any key to continue..."); } var run = new Run(); var results = new ResultCollection(); run.Results.Add("Execution Time", new TimeSpanValue(ExecutionTime)); run.Results.Add("Log", log); runCollection.Add(run); stoppedEvent.Set(); ExecutionState = HeuristicLab.Core.ExecutionState.Stopped; OnExecutionStateChanged(); OnStopped(); }); } public void Pause() { if (cts != null) cts.Cancel(); stoppedEvent.WaitOne(); ExecutionState = HeuristicLab.Core.ExecutionState.Paused; OnExecutionStateChanged(); OnPaused(); } public void Stop() { if (cts != null) cts.Cancel(); stoppedEvent.WaitOne(); ExecutionState = HeuristicLab.Core.ExecutionState.Stopped; OnExecutionStateChanged(); OnStopped(); } public event EventHandler Started; protected virtual void OnStarted() { EventHandler handler = Started; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Stopped; protected virtual void OnStopped() { EventHandler handler = Stopped; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Paused; protected virtual void OnPaused() { EventHandler handler = Paused; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Prepared; protected virtual void OnPrepared() { EventHandler handler = Prepared; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler ComputeInParallelChanged; protected virtual void OnComputeInParallelChanged() { EventHandler handler = ComputeInParallelChanged; if (handler != null) handler(this, EventArgs.Empty); } #endregion #region Events public event EventHandler ExecutionTimeChanged; protected virtual void OnExecutionTimeChanged() { EventHandler handler = ExecutionTimeChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler ExecutionStateChanged; protected virtual void OnExecutionStateChanged() { EventHandler handler = ExecutionStateChanged; if (handler != null) handler(this, EventArgs.Empty); } #endregion public override string ToString() { return Name; } private static byte[][] CreateMessage(PeerInfo pi, int iterationNumber) { string msg1 = string.Concat("Message 1 from Peer ", pi.IpAddress, ":", pi.Port, " at iteration ", iterationNumber); string msg2 = string.Concat("Message 2 from Peer ", pi.IpAddress, ":", pi.Port, " at iteration ", iterationNumber); return new byte[][] { GetBytes(msg1), GetBytes(msg2) }; } static byte[] GetBytes(string str) { byte[] bytes = new byte[str.Length * sizeof(char)]; System.Buffer.BlockCopy(str.ToCharArray(), 0, bytes, 0, bytes.Length); return bytes; } static string GetString(byte[] bytes) { char[] chars = new char[bytes.Length / sizeof(char)]; System.Buffer.BlockCopy(bytes, 0, chars, 0, bytes.Length); return new string(chars); } public System.Collections.Generic.IEnumerable NestedOptimizers { get { return Enumerable.Empty(); } } public RunCollection Runs { get { return runCollection; } } public event EventHandler> ExceptionOccurred; } }