1 | using System;
2 | using System.Collections.Concurrent;
3 | using System.Collections.Generic;
4 | using System.Configuration;
5 | using System.Linq;
6 | using System.Net;
7 | using DistributedGA.Core.Domain;
8 | using DistributedGA.Core.Interface;
9 |
10 | namespace DistributedGA.Core.Implementation {
11 | public class PeerNetworkMessageHandler : IMessageHandler {
12 | //own peer-instance Information
13 | private PeerInfo ownInstance = null;
14 |
15 | //uses peer-list from IPeerListManager to decide which peers to contact
16 | private IPeerListManager peerListManager;
17 |
18 | //uses IMessageSender to send populations to peers
19 | private IMessageSender sender = null;
20 |
21 | //provides current population for the higher layer IMigrationOperator
22 | //to queues are used to gather and and provide population more efficiently
23 | private object activeQueueLocker = new object();
24 | private ConcurrentQueue<SolutionInfo> writeQueue;
25 | private ConcurrentQueue<SolutionInfo> readQueue;
26 | private bool collectingInFirstQueue = true; //currently first queue is filled with recieved populations
27 |
28 | //uses IMessageService for recieving population from one peer at once
29 | private IMessageService host = null;
30 |
31 |
32 |
33 | public void Init() {
34 | try {
35 | ownInstance = new PeerInfo() {
36 | IpAddress = GetExternalIpAddress(),
37 | Port = 0,
38 | ProblemInstance = "TestProblem"
39 | }; // TODO: get own peerinfo
40 |
41 | writeQueue = new ConcurrentQueue<SolutionInfo>();
42 | readQueue = new ConcurrentQueue<SolutionInfo>();
43 |
44 | host = new WcfMessageService();
45 | ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet
46 | host.OnPopulationRecieved += new EventHandler<MessageRecieveEventArgs>(OnPopulationRecieved);
47 |
48 | peerListManager = new WcfPeerListManager();
49 | //peerListManager = new TestPeerListManager();
50 | peerListManager.Init(ownInstance);
51 |
52 | sender = new WcfMessageSender();
53 | sender.Init(ownInstance);
54 |
55 | } catch (Exception ex) {
56 | AddError("PeerNetworkMessageHandler.Init", ex);
57 | }
58 | }
59 |
60 | public void PublishMigrationInfo(SolutionInfo[] population) {
61 | try {
62 | foreach (PeerInfo peer in peerListManager.GetPeerList()) {
63 | //HACK: manipulate for monitoring in test
64 | foreach (SolutionInfo si in population) {
65 | si.IterationNumber = ownInstance.Port;
66 | }
67 | //maybe something will go wrong during network communication...
68 | try {
70 | //Note: If IP is same as own: it is a "localhost"
71 | if (peer.IpAddress.Equals(ownInstance.IpAddress)) {
72 | peer.IpAddress = "localhost";
73 | }
74 | sender.SendPopulation(peer, population);
75 |
76 | } catch (Exception ex) {
77 | //TBD
78 | }
79 | }
80 | } catch (Exception ex) {
81 | AddError("PeerNetworkMessageHandler.PublishMigrationInfo", ex);
82 | }
83 | }
84 |
85 | public SolutionInfo[] GetMigrationInfo() {
86 | try {
87 | List<SolutionInfo> res = new List<SolutionInfo>();
88 | SolutionInfo item = null;
89 | lock (activeQueueLocker) {
90 | var tmp = readQueue;
91 | readQueue = writeQueue;
92 | writeQueue = tmp;
93 | }
94 |
95 | //creating resultset
96 | while (!readQueue.IsEmpty) {
97 | if (readQueue.TryDequeue(out item)) {
98 | res.Add(item);
99 | }
100 | }
101 | return res.ToArray();
102 | } catch (Exception ex) {
103 | AddError("PeerNetworkMessageHandler.GetMigrationInfo", ex);
104 | return null;
105 | }
106 | }
107 |
108 | public PeerInfo GetPeerInfo() {
109 | return ownInstance;
110 | }
111 |
112 | public List<PeerInfo> GetCurrentNetwork() {
113 | return peerListManager.GetPeerList();
114 | }
115 |
116 | private string GetExternalIpAddress() {
117 | try {
118 | var strHostName = Dns.GetHostName();
119 | // Then using host name, get the IP address list..
120 | IPHostEntry ipEntry = Dns.GetHostEntry(strHostName);
121 | IPAddress[] addr = ipEntry.AddressList;
122 |
123 | return addr
124 | .Select(ip => ip.ToString())
125 | .First(str => str.StartsWith(ConfigurationManager.AppSettings["LanIpPrefix"]));
126 | } catch { return null; }
127 | }
128 |
129 | private void OnPopulationRecieved(object sender, MessageRecieveEventArgs e) {
130 | if (e != null) {
131 | lock (activeQueueLocker) {
132 | foreach (SolutionInfo si in e.Population) {
133 | writeQueue.Enqueue(si);
134 | }
135 | }
136 | }
137 | }
138 |
139 | private void AddError(string source, Exception ex) {
140 | if (peerListManager != null) {
141 | try {
142 | peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message));
143 | } catch { }
144 | }
145 | }
146 | }
147 | }