Free cookie consent management tool by TermsFeed Policy Generator

source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs @ 13956

Last change on this file since 13956 was 13956, checked in by thasling, 8 years ago

#2615:
finally fixed bug concerning message send to the wrong peers
also made communicationRate and messageCacheCapacity as paramters
integration in P2PMigrationAnalyzer still TBD

File size: 4.9 KB
RevLine 
[13538]1using System;
[13524]2using System.Collections.Concurrent;
3using System.Collections.Generic;
[13541]4using System.Linq;
[13524]5using System.Net;
[13538]6using DistributedGA.Core.Domain;
7using DistributedGA.Core.Interface;
[13937]8using DistributedGA.Core.Util;
[13524]9
[13924]10namespace DistributedGA.Core.Implementation {
11  public class PeerNetworkMessageHandler : IMessageHandler {
12    //own peer-instance Information
13    private PeerInfo ownInstance = null;
[13524]14
[13924]15    //uses peer-list from IPeerListManager to decide which peers to contact
16    private IPeerListManager peerListManager;
[13524]17
[13924]18    //uses IMessageSender to send populations to peers
19    private IMessageSender sender = null;
[13524]20
[13924]21    //provides current population for the higher layer IMigrationOperator
[13945]22    //two queues are used to gather and and provide population more efficiently
[13947]23    private object activeQueueLocker = new Object();
[13946]24    private SizedConcurrentQueue<byte[]> writeQueue;
[13937]25    private SizedConcurrentQueue<byte[]> readQueue;
[13524]26
[13924]27    //uses IMessageService for recieving population from one peer at once
28    private IMessageService host = null;
[13524]29
[13541]30
[13956]31    public void Init(string lanIpPrefix, string contactServerUrl, string problemInstance, int messageCacheCapacity, int communicationRate) {
[13924]32      try {
33        ownInstance = new PeerInfo() {
34          IpAddress = GetInternalIpAddress(lanIpPrefix),
35          Port = 0,
[13956]36          ProblemInstance = problemInstance
[13924]37        }; // TODO: get own peerinfo
[13524]38
[13946]39        writeQueue = new SizedConcurrentQueue<byte[]>();
[13937]40        readQueue = new SizedConcurrentQueue<byte[]>();
[13956]41        writeQueue.Limit = messageCacheCapacity;
[13946]42        readQueue.Limit = writeQueue.Limit;
[13524]43
[13924]44        host = new WcfMessageService();
45        ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet
[13937]46        host.OnDataRecieved += new EventHandler<MessageRecieveEventArgs>(OnDataRecieved);
[13524]47
[13924]48        peerListManager = new WcfPeerListManager();
[13956]49        peerListManager.Init(ownInstance, contactServerUrl, communicationRate);
[13524]50
[13924]51        sender = new WcfMessageSender();
[13956]52        sender.Init(ownInstance, messageCacheCapacity);
[13524]53
[13924]54      }
55      catch (Exception ex) {
56        AddError("PeerNetworkMessageHandler.Init", ex);
57      }
58    }
[13524]59
[13924]60    public void Dispose() {
61      try {
62        host.Dispose();
63        sender.Dispose();
64        peerListManager.Dispose();
65      }
66      catch (Exception ex) {
67        AddError("PeerNetworkMessageHandler.Dispose", ex);
68      }
69    }
[13547]70
[13924]71    public void PublishDataToNetwork(byte[][] data) {
72      try {
73        foreach (PeerInfo peer in peerListManager.GetPeerList()) {
74          //maybe something will go wrong during network communication...
75          try {
76            sender.SendData(peer, data);
77          }
78          catch (Exception ex) {
79            AddError("PeerNetworkMessageHandler.PublishDataToNetwork(during sending to one peer!)", ex);
80          }
[13524]81        }
[13924]82      }
83      catch (Exception ex) {
84        AddError("PeerNetworkMessageHandler.PublishDataToNetwork", ex);
85      }
86    }
[13524]87
[13924]88    public byte[][] GetDataFromNetwork() {
89      try {
90        List<byte[]> res = new List<byte[]>();
91        byte[] item = null;
92        lock (activeQueueLocker) {
[13945]93          //changing the current queue for storing items to send
94          //then read from the now unselect queue
[13946]95          var tmp = readQueue;
96          readQueue = writeQueue;
97          writeQueue = tmp;
[13524]98        }
[13541]99
[13924]100        //creating resultset
[13946]101        while (!readQueue.IsEmpty) {
102          if (readQueue.TryDequeue(out item)) {
[13924]103            res.Add(item);
104          }
[13524]105        }
[13924]106        return res.ToArray();
107      }
108      catch (Exception ex) {
109        AddError("PeerNetworkMessageHandler.GetDataFromNetwork", ex);
110        return null;
111      }
112    }
[13524]113
[13924]114    public PeerInfo GetPeerInfo() {
115      return ownInstance;
116    }
[13524]117
[13924]118    public List<PeerInfo> GetCurrentNetwork() {
119      return peerListManager.GetPeerList();
120    }
[13524]121
[13924]122    private string GetInternalIpAddress(string ipPrefix) {
123      try {
124        var strHostName = Dns.GetHostName();
125        // Then using host name, get the IP address list..
126        IPHostEntry ipEntry = Dns.GetHostEntry(strHostName);
127        IPAddress[] addr = ipEntry.AddressList;
[13541]128
[13924]129        return addr
130          .Select(ip => ip.ToString())
131          .First(str => str.StartsWith(ipPrefix));
132      }
133      catch { return null; }
134    }
135
[13937]136    private void OnDataRecieved(object sender, MessageRecieveEventArgs e) {
[13924]137      if (e != null) {
138        lock (activeQueueLocker) {
139          foreach (byte[] item in e.data) {
[13946]140            writeQueue.Enqueue(item);
[13924]141          }
[13887]142        }
[13956]143     
[13924]144      }
145    }
[13538]146
[13924]147    private void AddError(string source, Exception ex) {
148      if (peerListManager != null) {
149        try {
150          peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message));
[13524]151        }
[13924]152        catch { }
153      }
154    }
[13524]155
[13924]156  }
[13524]157}
Note: See TracBrowser for help on using the repository browser.