Free cookie consent management tool by TermsFeed Policy Generator

source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs @ 13982

Last change on this file since 13982 was 13982, checked in by thasling, 8 years ago

#2615:
made minor changes

File size: 6.2 KB
Line 
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.Linq;
5using System.Net;
6using DistributedGA.Core.Domain;
7using DistributedGA.Core.Interface;
8using DistributedGA.Core.Util;
9
10namespace DistributedGA.Core.Implementation {
11  public class PeerNetworkMessageHandler : IMessageHandler {
12    //own peer-instance Information
13    private PeerInfo ownInstance = null;
14
15    //uses peer-list from IPeerListManager to decide which peers to contact
16    private IPeerListManager peerListManager;
17
18    //uses IMessageSender to send populations to peers
19    private IMessageSender sender = null;
20
21    //provides current population for the higher layer IMigrationOperator
22    //two queues are used to gather and and provide population more efficiently
23    private object activeQueueLocker = new Object();
24    private SizedConcurrentQueue<KeyValuePair<PeerInfo,byte[]>> writeQueue;
25    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> readQueue;
26
27    //uses IMessageService for recieving population from one peer at once
28    private IMessageService host = null;
29
30    private double communicationRate;
31    private Random rand;
32
33    public event EventHandler<Exception> ExceptionOccurend;
34
35    public void Init(string lanIpPrefix, string contactServerUrl, string problemInstance, int messageCacheCapacity, double communicationRate) {
36      try {
37        ownInstance = new PeerInfo() {
38          IpAddress = GetInternalIpAddress(lanIpPrefix),
39          Port = 0,
40          ProblemInstance = problemInstance
41        }; // TODO: get own peerinfo
42
43        this.communicationRate = communicationRate;
44        rand = new Random();
45
46        writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
47        readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo,byte[]>>();
48        writeQueue.Limit = messageCacheCapacity;
49        readQueue.Limit = writeQueue.Limit;
50
51        host = new WcfMessageService();
52        ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet
53        host.OnDataRecieved += new EventHandler<MessageRecieveEventArgs>(OnDataRecieved);
54
55        peerListManager = new WcfPeerListManager();
56        peerListManager.Init(ownInstance, contactServerUrl);
57
58        sender = new WcfMessageSender();
59        sender.Init(ownInstance, messageCacheCapacity);
60
61      }
62      catch (Exception ex) {
63        AddError("PeerNetworkMessageHandler.Init", ex);
64      }
65    }
66
67    public void Dispose() {
68      try {
69        host.Dispose();
70        sender.Dispose();
71        peerListManager.Dispose();
72      }
73      catch (Exception ex) {
74        AddError("PeerNetworkMessageHandler.Dispose", ex);
75      }
76    }
77
78    public void PublishDataToNetwork(byte[] data) {
79      try {
80        var peers = peerListManager.GetPeerList();
81        foreach (PeerInfo peer in peerListManager.GetPeerList()) {
82          var peersForMessaging = ChoosePeersForMessaging(ref peers);
83
84          //maybe something will go wrong during network communication...
85          try {
86            sender.SendData(peer, data);
87          }
88          catch (Exception ex) {
89            AddError("PeerNetworkMessageHandler.PublishDataToNetwork(during sending to one peer!)", ex);
90          }
91        }
92      }
93      catch (Exception ex) {
94        AddError("PeerNetworkMessageHandler.PublishDataToNetwork", ex);
95      }
96    }
97
98    public List<KeyValuePair<PeerInfo, byte[]>> GetDataFromNetwork() {
99      try {
100        List<KeyValuePair<PeerInfo, byte[]>> res = new List<KeyValuePair<PeerInfo, byte[]>>();
101        KeyValuePair<PeerInfo,byte[]> item;
102        lock (activeQueueLocker) {
103          //changing the current queue for storing items to send
104          //then read from the now unselect queue
105          var tmp = readQueue;
106          readQueue = writeQueue;
107          writeQueue = tmp;
108        }
109
110        //creating resultset
111        while (!readQueue.IsEmpty) {
112          if (readQueue.TryDequeue(out item)) {
113            res.Add(item);
114          }
115        }
116        return res;//.ToArray();
117      }
118      catch (Exception ex) {
119        AddError("PeerNetworkMessageHandler.GetDataFromNetwork", ex);
120        return null;
121      }
122    }
123
124    public PeerInfo GetPeerInfo() {
125      return ownInstance;
126    }
127
128    public List<PeerInfo> GetCurrentNetwork() {
129      return peerListManager.GetPeerList();
130    }
131
132    private void PropagateException(Exception ex) {
133      //if (CountdownCompleted != null)
134      //  CountdownCompleted(this, e);
135      if (ExceptionOccurend != null) {
136        ExceptionOccurend(this, ex);
137      }
138    }
139
140    private List<PeerInfo> ChoosePeersForMessaging(ref List<PeerInfo> allPeers) {
141      Shuffle<PeerInfo>(allPeers);
142      int toTake = Convert.ToInt32(allPeers.Count * communicationRate) + 1;
143      if (allPeers.Count > 0 && toTake == 0) {
144        toTake = 1;
145      }
146      return allPeers.Take(toTake).ToList(); ;
147    }
148
149    private string GetInternalIpAddress(string ipPrefix) {
150      try {
151        var strHostName = Dns.GetHostName();
152        // Then using host name, get the IP address list..
153        IPHostEntry ipEntry = Dns.GetHostEntry(strHostName);
154        IPAddress[] addr = ipEntry.AddressList;
155
156        return addr
157          .Select(ip => ip.ToString())
158          .First(str => str.StartsWith(ipPrefix));
159      }
160      catch { return null; }
161    }
162
163    private void Shuffle<T>(IList<T> list) {
164      int n = list.Count;
165      while (n > 1) {
166        n--;
167        int k = rand.Next(n + 1);
168        T value = list[k];
169        list[k] = list[n];
170        list[n] = value;
171      }
172    }
173
174    private void OnDataRecieved(object sender, MessageRecieveEventArgs e) {
175      if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) {
176        lock (activeQueueLocker) {
177            writeQueue.Enqueue(new KeyValuePair<PeerInfo, byte[]>(e.Sender, e.data));
178        }
179
180      }
181    }
182
183    private void AddError(string source, Exception ex) {
184      PropagateException(ex);
185      if (peerListManager != null) {
186        try {
187          peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message));
188        }
189        catch { }
190      }
191    }
192
193  }
194}
Note: See TracBrowser for help on using the repository browser.