using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Net; using DistributedGA.Core.Domain; using DistributedGA.Core.Interface; using DistributedGA.Core.Util; namespace DistributedGA.Core.Implementation { public class PeerNetworkMessageHandler : IMessageHandler, IDisposable { //own peer-instance Information private PeerInfo ownInstance = null; //uses peer-list from IPeerListManager to decide which peers to contact private IPeerListManager peerListManager; //uses IMessageSender to send populations to peers private IMessageSender sender = null; //provides current population for the higher layer IMigrationOperator //two queues are used to gather and and provide population more efficiently private object activeQueueLocker = new Object(); private SizedConcurrentQueue> writeQueue; private SizedConcurrentQueue> readQueue; //uses IMessageService for recieving population from one peer at once private IMessageService host = null; public event EventHandler ExceptionOccurend; public void Init(string lanIpPrefix, string contactServerUrl, string problemInstance, int messageCacheCapacity, double communicationRate) { try { ownInstance = new PeerInfo() { IpAddress = GetInternalIpAddress(lanIpPrefix), Port = 0, ProblemInstance = problemInstance }; // TODO: get own peerinfo writeQueue = new SizedConcurrentQueue>(); readQueue = new SizedConcurrentQueue>(); writeQueue.Limit = messageCacheCapacity; readQueue.Limit = writeQueue.Limit; host = new WcfMessageService(); ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet host.OnDataRecieved += new EventHandler(OnDataRecieved); peerListManager = new WcfPeerListManager(); peerListManager.Init(ownInstance, contactServerUrl, communicationRate); sender = new WcfMessageSender(); sender.Init(ownInstance, messageCacheCapacity); } catch (Exception ex) { AddError("PeerNetworkMessageHandler.Init", ex); } } public void Dispose() { try { host.Dispose(); sender.Dispose(); peerListManager.Dispose(); } catch (Exception ex) { AddError("PeerNetworkMessageHandler.Dispose", ex); } } public void PublishDataToNetwork(ByteArrayWrapper data) { try { var allPeers = peerListManager.GetPeerList(); foreach (PeerInfo peer in allPeers) { try { sender.SendData(peer, data); } catch (Exception ex) { AddError("PeerNetworkMessageHandler.PublishDataToNetwork(during sending to one peer!)", ex); } } } catch (Exception ex) { AddError("PeerNetworkMessageHandler.PublishDataToNetwork", ex); } } public List> GetDataFromNetwork() { try { List> res = new List>(); KeyValuePair item; lock (activeQueueLocker) { //changing the current queue for storing items to send //then read from the now unselect queue var tmp = readQueue; readQueue = writeQueue; writeQueue = tmp; } //creating resultset while (!readQueue.IsEmpty) { if (readQueue.TryDequeue(out item)) { res.Add(item); } } return res;//.ToArray(); } catch (Exception ex) { AddError("PeerNetworkMessageHandler.GetDataFromNetwork", ex); return null; } } public PeerInfo GetPeerInfo() { return ownInstance; } public List GetCurrentNetwork() { return peerListManager.GetPeerList(); } private void PropagateException(Exception ex) { if (ExceptionOccurend != null) { ExceptionOccurend(this, ex); } } private string GetInternalIpAddress(string ipPrefix) { try { var strHostName = Dns.GetHostName(); // Then using host name, get the IP address list.. IPHostEntry ipEntry = Dns.GetHostEntry(strHostName); IPAddress[] addr = ipEntry.AddressList; return addr .Select(ip => ip.ToString()) .First(str => str.StartsWith(ipPrefix)); } catch { return null; } } private void OnDataRecieved(object sender, MessageRecieveEventArgs e) { if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) { lock (activeQueueLocker) { writeQueue.Enqueue(new KeyValuePair(e.Sender, ByteArrayWrapper.CreateByteArrayWrapper(e.data))); } } } private void AddError(string source, Exception ex) { PropagateException(ex); } } }