Free cookie consent management tool by TermsFeed Policy Generator

source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs @ 14074

Last change on this file since 14074 was 14009, checked in by thasling, 8 years ago

#2615:
made minor changes,
wrong commit happende last time, so still tbd:
-sort List
-dispose of anaylzer is never called

File size: 5.4 KB
Line 
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.Linq;
5using System.Net;
6using DistributedGA.Core.Domain;
7using DistributedGA.Core.Interface;
8using DistributedGA.Core.Util;
9
10namespace DistributedGA.Core.Implementation {
11  public class PeerNetworkMessageHandler : IMessageHandler {
12    //own peer-instance Information
13    private PeerInfo ownInstance = null;
14
15    //uses peer-list from IPeerListManager to decide which peers to contact
16    private IPeerListManager peerListManager;
17
18    //uses IMessageSender to send populations to peers
19    private IMessageSender sender = null;
20
21    //provides current population for the higher layer IMigrationOperator
22    //two queues are used to gather and and provide population more efficiently
23    private object activeQueueLocker = new Object();
24    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> writeQueue;
25    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> readQueue;
26
27    //uses IMessageService for recieving population from one peer at once
28    private IMessageService host = null;
29
30    private double communicationRate;
31
32    public event EventHandler<Exception> ExceptionOccurend;
33
34    public void Init(string lanIpPrefix, string contactServerUrl, string problemInstance, int messageCacheCapacity, double communicationRate) {
35      try {
36        ownInstance = new PeerInfo() {
37          IpAddress = GetInternalIpAddress(lanIpPrefix),
38          Port = 0,
39          ProblemInstance = problemInstance
40        }; // TODO: get own peerinfo
41
42        this.communicationRate = communicationRate;
43
44        writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
45        readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
46        writeQueue.Limit = messageCacheCapacity;
47        readQueue.Limit = writeQueue.Limit;
48
49        host = new WcfMessageService();
50        ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet
51        host.OnDataRecieved += new EventHandler<MessageRecieveEventArgs>(OnDataRecieved);
52
53        peerListManager = new WcfPeerListManager();
54        peerListManager.Init(ownInstance, contactServerUrl, communicationRate);
55
56        sender = new WcfMessageSender();
57        sender.Init(ownInstance, messageCacheCapacity);
58
59      }
60      catch (Exception ex) {
61        AddError("PeerNetworkMessageHandler.Init", ex);
62      }
63    }
64
65    public void Dispose() {
66      try {
67        host.Dispose();
68        sender.Dispose();
69        peerListManager.Dispose();
70      }
71      catch (Exception ex) {
72        AddError("PeerNetworkMessageHandler.Dispose", ex);
73      }
74    }
75
76    public void PublishDataToNetwork(byte[] data) {
77      try {
78        var allPeers = peerListManager.GetPeerList();
79        foreach (PeerInfo peer in allPeers) {
80          try {
81            sender.SendData(peer, data);
82          }
83          catch (Exception ex) {
84            AddError("PeerNetworkMessageHandler.PublishDataToNetwork(during sending to one peer!)", ex);
85          }
86        }
87
88      }
89      catch (Exception ex) {
90        AddError("PeerNetworkMessageHandler.PublishDataToNetwork", ex);
91      }
92    }
93
94    public List<KeyValuePair<PeerInfo, byte[]>> GetDataFromNetwork() {
95      try {
96        List<KeyValuePair<PeerInfo, byte[]>> res = new List<KeyValuePair<PeerInfo, byte[]>>();
97        KeyValuePair<PeerInfo, byte[]> item;
98        lock (activeQueueLocker) {
99          //changing the current queue for storing items to send
100          //then read from the now unselect queue
101          var tmp = readQueue;
102          readQueue = writeQueue;
103          writeQueue = tmp;
104        }
105
106        //creating resultset
107        while (!readQueue.IsEmpty) {
108          if (readQueue.TryDequeue(out item)) {
109            res.Add(item);
110          }
111        }
112        return res;//.ToArray();
113      }
114      catch (Exception ex) {
115        AddError("PeerNetworkMessageHandler.GetDataFromNetwork", ex);
116        return null;
117      }
118    }
119
120    public PeerInfo GetPeerInfo() {
121      return ownInstance;
122    }
123
124    public List<PeerInfo> GetCurrentNetwork() {
125      return peerListManager.GetPeerList();
126    }
127
128    private void PropagateException(Exception ex) {
129      if (ExceptionOccurend != null) {
130        ExceptionOccurend(this, ex);
131      }
132    }
133
134    private string GetInternalIpAddress(string ipPrefix) {
135      try {
136        var strHostName = Dns.GetHostName();
137        // Then using host name, get the IP address list..
138        IPHostEntry ipEntry = Dns.GetHostEntry(strHostName);
139        IPAddress[] addr = ipEntry.AddressList;
140
141        return addr
142          .Select(ip => ip.ToString())
143          .First(str => str.StartsWith(ipPrefix));
144      }
145      catch { return null; }
146    }
147
148    private void OnDataRecieved(object sender, MessageRecieveEventArgs e) {
149      if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) {
150        lock (activeQueueLocker) {
151          writeQueue.Enqueue(new KeyValuePair<PeerInfo, byte[]>(e.Sender, e.data));
152        }
153
154      }
155    }
156
157    private void AddError(string source, Exception ex) {
158      PropagateException(ex);
159      if (peerListManager != null) {
160        try {
161          peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message));
162        }
163        catch { }
164      }
165    }
166
167  }
168}
Note: See TracBrowser for help on using the repository browser.