Free cookie consent management tool by TermsFeed Policy Generator

source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs @ 13937

Last change on this file since 13937 was 13937, checked in by thasling, 8 years ago

#2615:
implemented ConcurrentQueue
made minor changes in Dispose-methods

File size: 4.6 KB
Line 
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.Linq;
5using System.Net;
6using DistributedGA.Core.Domain;
7using DistributedGA.Core.Interface;
8using DistributedGA.Core.Util;
9
10namespace DistributedGA.Core.Implementation {
11  public class PeerNetworkMessageHandler : IMessageHandler {
12    //own peer-instance Information
13    private PeerInfo ownInstance = null;
14
15    //uses peer-list from IPeerListManager to decide which peers to contact
16    private IPeerListManager peerListManager;
17
18    //uses IMessageSender to send populations to peers
19    private IMessageSender sender = null;
20
21    //provides current population for the higher layer IMigrationOperator
22    //to queues are used to gather and and provide population more efficiently
23    private object activeQueueLocker = new object();
24    private SizedConcurrentQueue<byte[]> writeQueue;
25    private SizedConcurrentQueue<byte[]> readQueue;
26
27    //uses IMessageService for recieving population from one peer at once
28    private IMessageService host = null;
29
30
31
32    public void Init(string lanIpPrefix, string contactServerUrl) {
33      try {
34        ownInstance = new PeerInfo() {
35          IpAddress = GetInternalIpAddress(lanIpPrefix),
36          Port = 0,
37          ProblemInstance = "TestProblem"
38        }; // TODO: get own peerinfo
39
40        writeQueue = new SizedConcurrentQueue<byte[]>();
41        readQueue = new SizedConcurrentQueue<byte[]>();
42        writeQueue.Limit = 1000;
43        readQueue.Limit = writeQueue.Limit;
44
45        host = new WcfMessageService();
46        ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet
47        host.OnDataRecieved += new EventHandler<MessageRecieveEventArgs>(OnDataRecieved);
48
49        peerListManager = new WcfPeerListManager();
50        peerListManager.Init(ownInstance, contactServerUrl);
51
52        sender = new WcfMessageSender();
53        sender.Init(ownInstance);
54
55      }
56      catch (Exception ex) {
57        AddError("PeerNetworkMessageHandler.Init", ex);
58      }
59    }
60
61    public void Dispose() {
62      try {
63        host.Dispose();
64        sender.Dispose();
65        peerListManager.Dispose();
66      }
67      catch (Exception ex) {
68        AddError("PeerNetworkMessageHandler.Dispose", ex);
69      }
70    }
71
72    public void PublishDataToNetwork(byte[][] data) {
73      try {
74        foreach (PeerInfo peer in peerListManager.GetPeerList()) {
75          //maybe something will go wrong during network communication...
76          try {
77            sender.SendData(peer, data);
78          }
79          catch (Exception ex) {
80            AddError("PeerNetworkMessageHandler.PublishDataToNetwork(during sending to one peer!)", ex);
81          }
82        }
83      }
84      catch (Exception ex) {
85        AddError("PeerNetworkMessageHandler.PublishDataToNetwork", ex);
86      }
87    }
88
89    public byte[][] GetDataFromNetwork() {
90      try {
91        List<byte[]> res = new List<byte[]>();
92        byte[] item = null;
93        lock (activeQueueLocker) {
94          var tmp = readQueue;
95          readQueue = writeQueue;
96          writeQueue = tmp;
97        }
98
99        //creating resultset
100        while (!readQueue.IsEmpty) {
101          if (readQueue.TryDequeue(out item)) {
102            res.Add(item);
103          }
104        }
105        return res.ToArray();
106      }
107      catch (Exception ex) {
108        AddError("PeerNetworkMessageHandler.GetDataFromNetwork", ex);
109        return null;
110      }
111    }
112
113    public PeerInfo GetPeerInfo() {
114      return ownInstance;
115    }
116
117    public List<PeerInfo> GetCurrentNetwork() {
118      return peerListManager.GetPeerList();
119    }
120
121    private string GetInternalIpAddress(string ipPrefix) {
122      try {
123        var strHostName = Dns.GetHostName();
124        // Then using host name, get the IP address list..
125        IPHostEntry ipEntry = Dns.GetHostEntry(strHostName);
126        IPAddress[] addr = ipEntry.AddressList;
127
128        return addr
129          .Select(ip => ip.ToString())
130          .First(str => str.StartsWith(ipPrefix));
131      }
132      catch { return null; }
133    }
134
135    private void OnDataRecieved(object sender, MessageRecieveEventArgs e) {
136      if (e != null) {
137        lock (activeQueueLocker) {
138          foreach (byte[] item in e.data) {
139            writeQueue.Enqueue(item);
140          }
141        }
142      }
143    }
144
145    private void AddError(string source, Exception ex) {
146      if (peerListManager != null) {
147        try {
148          peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message));
149        }
150        catch { }
151      }
152    }
153
154  }
155}
Note: See TracBrowser for help on using the repository browser.