source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs @ 13945

Last change on this file since 13945 was 13945, checked in by thasling, 6 years ago

#2615:
fixed major bug in PeerNetworkMessageHandler, because one queue was missing for the sending queues

File size: 5.1 KB
Line 
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.Linq;
5using System.Net;
6using DistributedGA.Core.Domain;
7using DistributedGA.Core.Interface;
8using DistributedGA.Core.Util;
9
10namespace DistributedGA.Core.Implementation {
11  public class PeerNetworkMessageHandler : IMessageHandler {
12    //own peer-instance Information
13    private PeerInfo ownInstance = null;
14
15    //uses peer-list from IPeerListManager to decide which peers to contact
16    private IPeerListManager peerListManager;
17
18    //uses IMessageSender to send populations to peers
19    private IMessageSender sender = null;
20
21    //provides current population for the higher layer IMigrationOperator
22    //two queues are used to gather and and provide population more efficiently
23    private object activeQueueLocker = new object();
24    private SizedConcurrentQueue<byte[]> writeQueueOriginal;
25    private SizedConcurrentQueue<byte[]> writeQueueAlternative;
26
27    //queue for recieved items
28    private SizedConcurrentQueue<byte[]> readQueue;
29
30    //uses IMessageService for recieving population from one peer at once
31    private IMessageService host = null;
32
33
34
35    public void Init(string lanIpPrefix, string contactServerUrl) {
36      try {
37        ownInstance = new PeerInfo() {
38          IpAddress = GetInternalIpAddress(lanIpPrefix),
39          Port = 0,
40          ProblemInstance = "TestProblem"
41        }; // TODO: get own peerinfo
42
43        writeQueueOriginal = new SizedConcurrentQueue<byte[]>();
44        writeQueueAlternative = new SizedConcurrentQueue<byte[]>();
45        readQueue = new SizedConcurrentQueue<byte[]>();
46        writeQueueOriginal.Limit = 1000;
47        writeQueueAlternative.Limit = 1000;
48        readQueue.Limit = writeQueueOriginal.Limit;
49
50        host = new WcfMessageService();
51        ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet
52        host.OnDataRecieved += new EventHandler<MessageRecieveEventArgs>(OnDataRecieved);
53
54        peerListManager = new WcfPeerListManager();
55        peerListManager.Init(ownInstance, contactServerUrl);
56
57        sender = new WcfMessageSender();
58        sender.Init(ownInstance);
59
60      }
61      catch (Exception ex) {
62        AddError("PeerNetworkMessageHandler.Init", ex);
63      }
64    }
65
66    public void Dispose() {
67      try {
68        host.Dispose();
69        sender.Dispose();
70        peerListManager.Dispose();
71      }
72      catch (Exception ex) {
73        AddError("PeerNetworkMessageHandler.Dispose", ex);
74      }
75    }
76
77    public void PublishDataToNetwork(byte[][] data) {
78      try {
79        foreach (PeerInfo peer in peerListManager.GetPeerList()) {
80          //maybe something will go wrong during network communication...
81          try {
82            sender.SendData(peer, data);
83          }
84          catch (Exception ex) {
85            AddError("PeerNetworkMessageHandler.PublishDataToNetwork(during sending to one peer!)", ex);
86          }
87        }
88      }
89      catch (Exception ex) {
90        AddError("PeerNetworkMessageHandler.PublishDataToNetwork", ex);
91      }
92    }
93
94    public byte[][] GetDataFromNetwork() {
95      try {
96        List<byte[]> res = new List<byte[]>();
97        byte[] item = null;
98        lock (activeQueueLocker) {
99          //changing the current queue for storing items to send
100          //then read from the now unselect queue
101          var tmp = writeQueueAlternative;
102          writeQueueAlternative = writeQueueOriginal;
103          writeQueueOriginal = tmp;
104        }
105
106        //creating resultset
107        while (!writeQueueAlternative.IsEmpty) {
108          if (writeQueueAlternative.TryDequeue(out item)) {
109            res.Add(item);
110          }
111        }
112        return res.ToArray();
113      }
114      catch (Exception ex) {
115        AddError("PeerNetworkMessageHandler.GetDataFromNetwork", ex);
116        return null;
117      }
118    }
119
120    public PeerInfo GetPeerInfo() {
121      return ownInstance;
122    }
123
124    public List<PeerInfo> GetCurrentNetwork() {
125      return peerListManager.GetPeerList();
126    }
127
128    private string GetInternalIpAddress(string ipPrefix) {
129      try {
130        var strHostName = Dns.GetHostName();
131        // Then using host name, get the IP address list..
132        IPHostEntry ipEntry = Dns.GetHostEntry(strHostName);
133        IPAddress[] addr = ipEntry.AddressList;
134
135        return addr
136          .Select(ip => ip.ToString())
137          .First(str => str.StartsWith(ipPrefix));
138      }
139      catch { return null; }
140    }
141
142    private void OnDataRecieved(object sender, MessageRecieveEventArgs e) {
143      if (e != null) {
144        lock (activeQueueLocker) {
145          foreach (byte[] item in e.data) {
146            writeQueueOriginal.Enqueue(item);
147          }
148        }
149      }
150    }
151
152    private void AddError(string source, Exception ex) {
153      if (peerListManager != null) {
154        try {
155          peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message));
156        }
157        catch { }
158      }
159    }
160
161  }
162}
Note: See TracBrowser for help on using the repository browser.