using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Net; using DistributedGA.Core.Domain; using DistributedGA.Core.Interface; using DistributedGA.Core.Util; namespace DistributedGA.Core.Implementation { public class PeerNetworkMessageHandler : IMessageHandler { //own peer-instance Information private PeerInfo ownInstance = null; //uses peer-list from IPeerListManager to decide which peers to contact private IPeerListManager peerListManager; //uses IMessageSender to send populations to peers private IMessageSender sender = null; //provides current population for the higher layer IMigrationOperator //two queues are used to gather and and provide population more efficiently private object activeQueueLocker = new Object(); private SizedConcurrentQueue> writeQueue; private SizedConcurrentQueue> readQueue; //uses IMessageService for recieving population from one peer at once private IMessageService host = null; public event EventHandler ExceptionOccurend; public void Init(string lanIpPrefix, string contactServerUrl, string problemInstance, int messageCacheCapacity, double communicationRate) { try { ownInstance = new PeerInfo() { IpAddress = GetInternalIpAddress(lanIpPrefix), Port = 0, ProblemInstance = problemInstance }; // TODO: get own peerinfo writeQueue = new SizedConcurrentQueue>(); readQueue = new SizedConcurrentQueue>(); writeQueue.Limit = messageCacheCapacity; readQueue.Limit = writeQueue.Limit; host = new WcfMessageService(); ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet host.OnDataRecieved += new EventHandler(OnDataRecieved); peerListManager = new WcfPeerListManager(); peerListManager.Init(ownInstance, contactServerUrl, communicationRate); sender = new WcfMessageSender(); sender.Init(ownInstance, messageCacheCapacity); } catch (Exception ex) { AddError("PeerNetworkMessageHandler.Init", ex); } } public void Dispose() { try { host.Dispose(); sender.Dispose(); peerListManager.Dispose(); } catch (Exception ex) { AddError("PeerNetworkMessageHandler.Dispose", ex); } } public void PublishDataToNetwork(byte[] data) { try { foreach (PeerInfo peer in peerListManager.GetPeerList()) { //maybe something will go wrong during network communication... try { sender.SendData(peer, data); } catch (Exception ex) { AddError("PeerNetworkMessageHandler.PublishDataToNetwork(during sending to one peer!)", ex); } } } catch (Exception ex) { AddError("PeerNetworkMessageHandler.PublishDataToNetwork", ex); } } public List> GetDataFromNetwork() { try { List> res = new List>(); KeyValuePair item; lock (activeQueueLocker) { //changing the current queue for storing items to send //then read from the now unselect queue var tmp = readQueue; readQueue = writeQueue; writeQueue = tmp; } //creating resultset while (!readQueue.IsEmpty) { if (readQueue.TryDequeue(out item)) { res.Add(item); } } return res;//.ToArray(); } catch (Exception ex) { AddError("PeerNetworkMessageHandler.GetDataFromNetwork", ex); return null; } } public PeerInfo GetPeerInfo() { return ownInstance; } public List GetCurrentNetwork() { return peerListManager.GetPeerList(); } private void PropagateException(Exception ex) { //if (CountdownCompleted != null) // CountdownCompleted(this, e); if (ExceptionOccurend != null) { ExceptionOccurend(this, ex); } } private string GetInternalIpAddress(string ipPrefix) { try { var strHostName = Dns.GetHostName(); // Then using host name, get the IP address list.. IPHostEntry ipEntry = Dns.GetHostEntry(strHostName); IPAddress[] addr = ipEntry.AddressList; return addr .Select(ip => ip.ToString()) .First(str => str.StartsWith(ipPrefix)); } catch { return null; } } private void OnDataRecieved(object sender, MessageRecieveEventArgs e) { if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) { lock (activeQueueLocker) { writeQueue.Enqueue(new KeyValuePair(e.Sender, e.data)); } } } private void AddError(string source, Exception ex) { PropagateException(ex); if (peerListManager != null) { try { peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message)); } catch { } } } } }