Free cookie consent management tool by TermsFeed Policy Generator

source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs @ 13982

Last change on this file since 13982 was 13982, checked in by thasling, 8 years ago

#2615:
made minor changes

File size: 6.2 KB
RevLine 
[13538]1using System;
[13524]2using System.Collections.Concurrent;
3using System.Collections.Generic;
[13541]4using System.Linq;
[13524]5using System.Net;
[13538]6using DistributedGA.Core.Domain;
7using DistributedGA.Core.Interface;
[13937]8using DistributedGA.Core.Util;
[13524]9
[13924]10namespace DistributedGA.Core.Implementation {
11  public class PeerNetworkMessageHandler : IMessageHandler {
12    //own peer-instance Information
13    private PeerInfo ownInstance = null;
[13524]14
[13924]15    //uses peer-list from IPeerListManager to decide which peers to contact
16    private IPeerListManager peerListManager;
[13524]17
[13924]18    //uses IMessageSender to send populations to peers
19    private IMessageSender sender = null;
[13524]20
[13924]21    //provides current population for the higher layer IMigrationOperator
[13945]22    //two queues are used to gather and and provide population more efficiently
[13947]23    private object activeQueueLocker = new Object();
[13972]24    private SizedConcurrentQueue<KeyValuePair<PeerInfo,byte[]>> writeQueue;
25    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> readQueue;
[13524]26
[13924]27    //uses IMessageService for recieving population from one peer at once
28    private IMessageService host = null;
[13524]29
[13982]30    private double communicationRate;
31    private Random rand;
32
[13970]33    public event EventHandler<Exception> ExceptionOccurend;
[13541]34
[13965]35    public void Init(string lanIpPrefix, string contactServerUrl, string problemInstance, int messageCacheCapacity, double communicationRate) {
[13924]36      try {
37        ownInstance = new PeerInfo() {
38          IpAddress = GetInternalIpAddress(lanIpPrefix),
39          Port = 0,
[13956]40          ProblemInstance = problemInstance
[13924]41        }; // TODO: get own peerinfo
[13524]42
[13982]43        this.communicationRate = communicationRate;
44        rand = new Random();
45
[13972]46        writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
47        readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo,byte[]>>();
[13956]48        writeQueue.Limit = messageCacheCapacity;
[13946]49        readQueue.Limit = writeQueue.Limit;
[13524]50
[13924]51        host = new WcfMessageService();
52        ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet
[13937]53        host.OnDataRecieved += new EventHandler<MessageRecieveEventArgs>(OnDataRecieved);
[13524]54
[13924]55        peerListManager = new WcfPeerListManager();
[13982]56        peerListManager.Init(ownInstance, contactServerUrl);
[13524]57
[13924]58        sender = new WcfMessageSender();
[13956]59        sender.Init(ownInstance, messageCacheCapacity);
[13524]60
[13924]61      }
62      catch (Exception ex) {
63        AddError("PeerNetworkMessageHandler.Init", ex);
64      }
65    }
[13524]66
[13924]67    public void Dispose() {
68      try {
69        host.Dispose();
70        sender.Dispose();
71        peerListManager.Dispose();
72      }
73      catch (Exception ex) {
74        AddError("PeerNetworkMessageHandler.Dispose", ex);
75      }
76    }
[13547]77
[13972]78    public void PublishDataToNetwork(byte[] data) {
[13924]79      try {
[13982]80        var peers = peerListManager.GetPeerList();
[13924]81        foreach (PeerInfo peer in peerListManager.GetPeerList()) {
[13982]82          var peersForMessaging = ChoosePeersForMessaging(ref peers);
83
[13924]84          //maybe something will go wrong during network communication...
85          try {
86            sender.SendData(peer, data);
87          }
88          catch (Exception ex) {
89            AddError("PeerNetworkMessageHandler.PublishDataToNetwork(during sending to one peer!)", ex);
90          }
[13524]91        }
[13924]92      }
93      catch (Exception ex) {
94        AddError("PeerNetworkMessageHandler.PublishDataToNetwork", ex);
95      }
96    }
[13524]97
[13972]98    public List<KeyValuePair<PeerInfo, byte[]>> GetDataFromNetwork() {
[13924]99      try {
[13972]100        List<KeyValuePair<PeerInfo, byte[]>> res = new List<KeyValuePair<PeerInfo, byte[]>>();
101        KeyValuePair<PeerInfo,byte[]> item;
[13924]102        lock (activeQueueLocker) {
[13945]103          //changing the current queue for storing items to send
104          //then read from the now unselect queue
[13946]105          var tmp = readQueue;
106          readQueue = writeQueue;
107          writeQueue = tmp;
[13524]108        }
[13541]109
[13924]110        //creating resultset
[13946]111        while (!readQueue.IsEmpty) {
112          if (readQueue.TryDequeue(out item)) {
[13924]113            res.Add(item);
114          }
[13524]115        }
[13972]116        return res;//.ToArray();
[13924]117      }
118      catch (Exception ex) {
119        AddError("PeerNetworkMessageHandler.GetDataFromNetwork", ex);
120        return null;
121      }
122    }
[13524]123
[13924]124    public PeerInfo GetPeerInfo() {
125      return ownInstance;
126    }
[13524]127
[13924]128    public List<PeerInfo> GetCurrentNetwork() {
129      return peerListManager.GetPeerList();
130    }
[13524]131
[13969]132    private void PropagateException(Exception ex) {
133      //if (CountdownCompleted != null)
134      //  CountdownCompleted(this, e);
135      if (ExceptionOccurend != null) {
136        ExceptionOccurend(this, ex);
137      }
138    }
139
[13982]140    private List<PeerInfo> ChoosePeersForMessaging(ref List<PeerInfo> allPeers) {
141      Shuffle<PeerInfo>(allPeers);
142      int toTake = Convert.ToInt32(allPeers.Count * communicationRate) + 1;
143      if (allPeers.Count > 0 && toTake == 0) {
144        toTake = 1;
145      }
146      return allPeers.Take(toTake).ToList(); ;
147    }
148
[13924]149    private string GetInternalIpAddress(string ipPrefix) {
150      try {
151        var strHostName = Dns.GetHostName();
152        // Then using host name, get the IP address list..
153        IPHostEntry ipEntry = Dns.GetHostEntry(strHostName);
154        IPAddress[] addr = ipEntry.AddressList;
[13541]155
[13924]156        return addr
157          .Select(ip => ip.ToString())
158          .First(str => str.StartsWith(ipPrefix));
159      }
160      catch { return null; }
161    }
162
[13982]163    private void Shuffle<T>(IList<T> list) {
164      int n = list.Count;
165      while (n > 1) {
166        n--;
167        int k = rand.Next(n + 1);
168        T value = list[k];
169        list[k] = list[n];
170        list[n] = value;
171      }
172    }
173
[13937]174    private void OnDataRecieved(object sender, MessageRecieveEventArgs e) {
[13957]175      if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) {
[13924]176        lock (activeQueueLocker) {
[13972]177            writeQueue.Enqueue(new KeyValuePair<PeerInfo, byte[]>(e.Sender, e.data));
[13887]178        }
[13970]179
[13924]180      }
181    }
[13538]182
[13924]183    private void AddError(string source, Exception ex) {
[13969]184      PropagateException(ex);
[13924]185      if (peerListManager != null) {
186        try {
187          peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message));
[13524]188        }
[13924]189        catch { }
190      }
191    }
[13524]192
[13924]193  }
[13524]194}
Note: See TracBrowser for help on using the repository browser.