using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Net; using DistributedGA.Core.Domain; using DistributedGA.Core.Interface; namespace DistributedGA.Core.Implementation { public class PeerNetworkMessageHandler : IMessageHandler { //own peer-instance Information private PeerInfo ownInstance = null; //uses peer-list from IPeerListManager to decide which peers to contact private IPeerListManager peerListManager; //uses IMessageSender to send populations to peers private IMessageSender sender = null; //provides current population for the higher layer IMigrationOperator //to queues are used to gather and and provide population more efficiently private object activeQueueLocker = new object(); private ConcurrentQueue writeQueue; private ConcurrentQueue readQueue; //uses IMessageService for recieving population from one peer at once private IMessageService host = null; public void Init() { try { ownInstance = new PeerInfo() { IpAddress = GetExternalIpAddress(), Port = 0, ProblemInstance = "TestProblem" }; // TODO: get own peerinfo writeQueue = new ConcurrentQueue(); readQueue = new ConcurrentQueue(); host = new WcfMessageService(); ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet host.OnDataRecieved += new EventHandler(OnPopulationRecieved); peerListManager = new WcfPeerListManager(); peerListManager.Init(ownInstance); sender = new WcfMessageSender(); sender.Init(ownInstance); } catch (Exception ex) { AddError("PeerNetworkMessageHandler.Init", ex); } } public void Dispose() { try { host.Dispose(); } catch (Exception ex) { AddError("PeerNetworkMessageHandler.Dispose", ex); } } public void PublishDataToNetwork(SolutionInfo[] data) { try { foreach (PeerInfo peer in peerListManager.GetPeerList()) { //HACK: manipulate for monitoring in test foreach (SolutionInfo si in data) { si.IterationNumber = ownInstance.Port; } //maybe something will go wrong during network communication... try { sender.SendData(peer, data); } catch (Exception ex) { AddError("PeerNetworkMessageHandler.PublishMigrationInfo(during sending to one peer!)", ex); } } } catch (Exception ex) { AddError("PeerNetworkMessageHandler.PublishMigrationInfo", ex); } } public SolutionInfo[] GetDataFromNetwork() { try { List res = new List(); SolutionInfo item = null; lock (activeQueueLocker) { var tmp = readQueue; readQueue = writeQueue; writeQueue = tmp; } //creating resultset while (!readQueue.IsEmpty) { if (readQueue.TryDequeue(out item)) { res.Add(item); } } return res.ToArray(); } catch (Exception ex) { AddError("PeerNetworkMessageHandler.GetMigrationInfo", ex); return null; } } public PeerInfo GetPeerInfo() { return ownInstance; } public List GetCurrentNetwork() { return peerListManager.GetPeerList(); } private string GetExternalIpAddress() { try { var strHostName = Dns.GetHostName(); // Then using host name, get the IP address list.. IPHostEntry ipEntry = Dns.GetHostEntry(strHostName); IPAddress[] addr = ipEntry.AddressList; return addr .Select(ip => ip.ToString()) .First(str => str.StartsWith(ConfigurationManager.AppSettings["LanIpPrefix"])); } catch { return null; } } private void OnPopulationRecieved(object sender, MessageRecieveEventArgs e) { if (e != null) { lock (activeQueueLocker) { foreach (SolutionInfo si in e.data) { writeQueue.Enqueue(si); } } } } private void AddError(string source, Exception ex) { if (peerListManager != null) { try { peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message)); } catch { } } } } }