Free cookie consent management tool by TermsFeed Policy Generator

source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs @ 14253

Last change on this file since 14253 was 14253, checked in by gkronber, 8 years ago

bugfixing (memory leaks)

File size: 5.4 KB
Line 
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.Linq;
5using System.Net;
6using DistributedGA.Core.Domain;
7using DistributedGA.Core.Interface;
8using DistributedGA.Core.Util;
9
10namespace DistributedGA.Core.Implementation {
11  public class PeerNetworkMessageHandler : IMessageHandler, IDisposable {
12    //own peer-instance Information
13    private PeerInfo ownInstance = null;
14
15    //uses peer-list from IPeerListManager to decide which peers to contact
16    private IPeerListManager peerListManager;
17
18    //uses IMessageSender to send populations to peers
19    private IMessageSender sender = null;
20
21    //provides current population for the higher layer IMigrationOperator
22    //two queues are used to gather and and provide population more efficiently
23    private object activeQueueLocker = new Object();
24    private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> writeQueue;
25    private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> readQueue;
26
27    //uses IMessageService for recieving population from one peer at once
28    private IMessageService host = null;
29
30    public event EventHandler<Exception> ExceptionOccurend;
31
32    public void Init(string lanIpPrefix, string contactServerUrl, string problemInstance, int messageCacheCapacity, double communicationRate) {
33      try {
34        ownInstance = new PeerInfo() {
35          IpAddress = GetInternalIpAddress(lanIpPrefix),
36          Port = 0,
37          ProblemInstance = problemInstance
38        }; // TODO: get own peerinfo
39
40        writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
41        readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
42        writeQueue.Limit = messageCacheCapacity;
43        readQueue.Limit = writeQueue.Limit;
44
45        host = new WcfMessageService();
46        ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet
47        host.OnDataRecieved += new EventHandler<MessageRecieveEventArgs>(OnDataRecieved);
48
49        peerListManager = new WcfPeerListManager();
50        peerListManager.Init(ownInstance, contactServerUrl, communicationRate);
51
52        sender = new WcfMessageSender();
53        sender.Init(ownInstance, messageCacheCapacity);
54
55      } catch (Exception ex) {
56        AddError("PeerNetworkMessageHandler.Init", ex);
57      }
58    }
59
60    public void Dispose() {
61      try {
62        host.Dispose();
63        sender.Dispose();
64        peerListManager.Dispose();
65      } catch (Exception ex) {
66        AddError("PeerNetworkMessageHandler.Dispose", ex);
67      }
68    }
69
70    public void PublishDataToNetwork(ByteArrayWrapper data) {
71      try {
72        var allPeers = peerListManager.GetPeerList();
73        foreach (PeerInfo peer in allPeers) {
74          try {
75            sender.SendData(peer, data);
76          } catch (Exception ex) {
77            AddError("PeerNetworkMessageHandler.PublishDataToNetwork(during sending to one peer!)", ex);
78          }
79        }
80
81      } catch (Exception ex) {
82        AddError("PeerNetworkMessageHandler.PublishDataToNetwork", ex);
83      }
84    }
85
86    public List<KeyValuePair<PeerInfo, ByteArrayWrapper>> GetDataFromNetwork() {
87      try {
88        List<KeyValuePair<PeerInfo, ByteArrayWrapper>> res = new List<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
89        KeyValuePair<PeerInfo, ByteArrayWrapper> item;
90        lock (activeQueueLocker) {
91          //changing the current queue for storing items to send
92          //then read from the now unselect queue
93          var tmp = readQueue;
94          readQueue = writeQueue;
95          writeQueue = tmp;
96        }
97
98        //creating resultset
99        while (!readQueue.IsEmpty) {
100          if (readQueue.TryDequeue(out item)) {
101            res.Add(item);
102          }
103        }
104        return res;//.ToArray();
105      } catch (Exception ex) {
106        AddError("PeerNetworkMessageHandler.GetDataFromNetwork", ex);
107        return null;
108      }
109    }
110
111    public PeerInfo GetPeerInfo() {
112      return ownInstance;
113    }
114
115    public List<PeerInfo> GetCurrentNetwork() {
116      return peerListManager.GetPeerList();
117    }
118
119    private void PropagateException(Exception ex) {
120      if (ExceptionOccurend != null) {
121        ExceptionOccurend(this, ex);
122      }
123    }
124
125    private string GetInternalIpAddress(string ipPrefix) {
126      try {
127        var strHostName = Dns.GetHostName();
128        // Then using host name, get the IP address list..
129        IPHostEntry ipEntry = Dns.GetHostEntry(strHostName);
130        IPAddress[] addr = ipEntry.AddressList;
131
132        return addr
133          .Select(ip => ip.ToString())
134          .First(str => str.StartsWith(ipPrefix));
135      } catch { return null; }
136    }
137
138    private void OnDataRecieved(object sender, MessageRecieveEventArgs e) {
139      if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) {
140        lock (activeQueueLocker) {
141          writeQueue.Enqueue(new KeyValuePair<PeerInfo, ByteArrayWrapper>(e.Sender, ByteArrayWrapper.CreateByteArrayWrapper(e.data)));
142        }
143
144      }
145    }
146
147    private void AddError(string source, Exception ex) {
148      PropagateException(ex);
149      if (peerListManager != null) {
150        try {
151          peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message));
152        } catch { }
153      }
154    }
155  }
156}
Note: See TracBrowser for help on using the repository browser.