Free cookie consent management tool by TermsFeed Policy Generator

source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs @ 13541

Last change on this file since 13541 was 13541, checked in by thasling, 9 years ago

added new empty project for test on hive
changed scheme for ip determination
changed handling of queues in messagehandler

File size: 4.8 KB
Line 
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.Configuration;
5using System.Linq;
6using System.Net;
7using DistributedGA.Core.Domain;
8using DistributedGA.Core.Interface;
9
10namespace DistributedGA.Core.Implementation {
11  public class PeerNetworkMessageHandler : IMessageHandler {
12    //own peer-instance Information
13    private PeerInfo ownInstance = null;
14
15    //uses peer-list from IPeerListManager to decide which peers to contact
16    private IPeerListManager peerListManager;
17
18    //uses IMessageSender to send populations to peers
19    private IMessageSender sender = null;
20
21    //provides current population for the higher layer IMigrationOperator
22    //to queues are used to gather and and provide population more efficiently
23    private object activeQueueLocker = new object();
24    private ConcurrentQueue<SolutionInfo> writeQueue;
25    private ConcurrentQueue<SolutionInfo> readQueue;
26    private bool collectingInFirstQueue = true; //currently first queue is filled with recieved populations
27
28    //uses IMessageService for recieving population from one peer at once
29    private IMessageService host = null;
30
31
32
33    public void Init() {
34      try {
35        ownInstance = new PeerInfo() {
36          IpAddress = GetExternalIpAddress(),
37          Port = 0,
38          ProblemInstance = "TestProblem"
39        }; // TODO: get own peerinfo
40
41        writeQueue = new ConcurrentQueue<SolutionInfo>();
42        readQueue = new ConcurrentQueue<SolutionInfo>();
43
44        host = new WcfMessageService();
45        ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet
46        host.OnPopulationRecieved += new EventHandler<MessageRecieveEventArgs>(OnPopulationRecieved);
47
48        peerListManager = new WcfPeerListManager();
49        //peerListManager = new TestPeerListManager();
50        peerListManager.Init(ownInstance);
51
52        sender = new WcfMessageSender();
53        sender.Init(ownInstance);
54
55      } catch (Exception ex) {
56        AddError("PeerNetworkMessageHandler.Init", ex);
57      }
58    }
59
60    public void PublishMigrationInfo(SolutionInfo[] population) {
61      try {
62        foreach (PeerInfo peer in peerListManager.GetPeerList()) {
63          //HACK: manipulate for monitoring in test
64          foreach (SolutionInfo si in population) {
65            si.IterationNumber = ownInstance.Port;
66          }
67          //maybe something will go wrong during network communication...
68          try {
69            //TODO: NEEDED WHEN PEER IS IN LAN, TOO????
70            //Note: If IP is same as own: it is a "localhost"
71            if (peer.IpAddress.Equals(ownInstance.IpAddress)) {
72              peer.IpAddress = "localhost";
73            }
74            sender.SendPopulation(peer, population);
75
76          } catch (Exception ex) {
77            //TBD
78          }
79        }
80      } catch (Exception ex) {
81        AddError("PeerNetworkMessageHandler.PublishMigrationInfo", ex);
82      }
83    }
84
85    public SolutionInfo[] GetMigrationInfo() {
86      try {
87        List<SolutionInfo> res = new List<SolutionInfo>();
88        SolutionInfo item = null;
89        lock (activeQueueLocker) {
90          var tmp = readQueue;
91          readQueue = writeQueue;
92          writeQueue = tmp;
93        }
94
95        //creating resultset
96        while (!readQueue.IsEmpty) {
97          if (readQueue.TryDequeue(out item)) {
98            res.Add(item);
99          }
100        }
101        return res.ToArray();
102      } catch (Exception ex) {
103        AddError("PeerNetworkMessageHandler.GetMigrationInfo", ex);
104        return null;
105      }
106    }
107
108    public PeerInfo GetPeerInfo() {
109      return ownInstance;
110    }
111
112    public List<PeerInfo> GetCurrentNetwork() {
113      return peerListManager.GetPeerList();
114    }
115
116    private string GetExternalIpAddress() {
117      try {
118        var strHostName = Dns.GetHostName();
119        // Then using host name, get the IP address list..
120        IPHostEntry ipEntry = Dns.GetHostEntry(strHostName);
121        IPAddress[] addr = ipEntry.AddressList;
122
123        return addr
124          .Select(ip => ip.ToString())
125          .First(str => str.StartsWith(ConfigurationManager.AppSettings["LanIpPrefix"]));
126      } catch { return null; }
127    }
128
129    private void OnPopulationRecieved(object sender, MessageRecieveEventArgs e) {
130      if (e != null) {
131        lock (activeQueueLocker) {
132          foreach (SolutionInfo si in e.Population) {
133            writeQueue.Enqueue(si);
134          }
135        }
136      }
137    }
138
139    private void AddError(string source, Exception ex) {
140      if (peerListManager != null) {
141        try {
142          peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message));
143        } catch { }
144      }
145    }
146  }
147}
Note: See TracBrowser for help on using the repository browser.