source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs @ 13554

Last change on this file since 13554 was 13554, checked in by thasling, 5 years ago

PeerList now is fetched from the server each time the peerlist is needed

File size: 4.6 KB
RevLine 
[13538]1using System;
[13524]2using System.Collections.Concurrent;
3using System.Collections.Generic;
[13541]4using System.Configuration;
5using System.Linq;
[13524]6using System.Net;
[13538]7using DistributedGA.Core.Domain;
8using DistributedGA.Core.Interface;
[13524]9
[13538]10namespace DistributedGA.Core.Implementation {
11  public class PeerNetworkMessageHandler : IMessageHandler {
12    //own peer-instance Information
13    private PeerInfo ownInstance = null;
[13524]14
[13538]15    //uses peer-list from IPeerListManager to decide which peers to contact
16    private IPeerListManager peerListManager;
[13524]17
[13538]18    //uses IMessageSender to send populations to peers
19    private IMessageSender sender = null;
[13524]20
[13538]21    //provides current population for the higher layer IMigrationOperator
22    //to queues are used to gather and and provide population more efficiently
[13541]23    private object activeQueueLocker = new object();
24    private ConcurrentQueue<SolutionInfo> writeQueue;
25    private ConcurrentQueue<SolutionInfo> readQueue;
[13524]26
[13538]27    //uses IMessageService for recieving population from one peer at once
28    private IMessageService host = null;
[13524]29
[13541]30
31
[13538]32    public void Init() {
33      try {
34        ownInstance = new PeerInfo() {
35          IpAddress = GetExternalIpAddress(),
36          Port = 0,
37          ProblemInstance = "TestProblem"
38        }; // TODO: get own peerinfo
[13524]39
[13541]40        writeQueue = new ConcurrentQueue<SolutionInfo>();
41        readQueue = new ConcurrentQueue<SolutionInfo>();
[13524]42
[13538]43        host = new WcfMessageService();
44        ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet
[13553]45        host.OnDataRecieved += new EventHandler<MessageRecieveEventArgs>(OnPopulationRecieved);
[13524]46
[13538]47        peerListManager = new WcfPeerListManager();
48        peerListManager.Init(ownInstance);
[13524]49
[13538]50        sender = new WcfMessageSender();
51        sender.Init(ownInstance);
[13524]52
[13538]53      } catch (Exception ex) {
54        AddError("PeerNetworkMessageHandler.Init", ex);
55      }
56    }
[13524]57
[13547]58    public void Dispose() {
59      try {
60        host.Dispose();
61      } catch (Exception ex) {
62        AddError("PeerNetworkMessageHandler.Dispose", ex);
63      }
64    }
65
[13553]66    public void PublishDataToNetwork(SolutionInfo[] data) {
[13538]67      try {
68        foreach (PeerInfo peer in peerListManager.GetPeerList()) {
69          //HACK: manipulate for monitoring in test
[13553]70          foreach (SolutionInfo si in data) {
[13538]71            si.IterationNumber = ownInstance.Port;
72          }
73          //maybe something will go wrong during network communication...
74          try {
[13553]75            sender.SendData(peer, data);
[13538]76          } catch (Exception ex) {
[13548]77            AddError("PeerNetworkMessageHandler.PublishMigrationInfo(during sending to one peer!)", ex);
[13538]78          }
[13524]79        }
[13538]80      } catch (Exception ex) {
81        AddError("PeerNetworkMessageHandler.PublishMigrationInfo", ex);
82      }
83    }
[13524]84
[13553]85    public SolutionInfo[] GetDataFromNetwork() {
[13538]86      try {
87        List<SolutionInfo> res = new List<SolutionInfo>();
88        SolutionInfo item = null;
[13541]89        lock (activeQueueLocker) {
90          var tmp = readQueue;
91          readQueue = writeQueue;
92          writeQueue = tmp;
[13524]93        }
[13541]94
[13538]95        //creating resultset
[13541]96        while (!readQueue.IsEmpty) {
97          if (readQueue.TryDequeue(out item)) {
[13538]98            res.Add(item);
99          }
[13524]100        }
[13538]101        return res.ToArray();
102      } catch (Exception ex) {
103        AddError("PeerNetworkMessageHandler.GetMigrationInfo", ex);
104        return null;
105      }
106    }
[13524]107
[13538]108    public PeerInfo GetPeerInfo() {
109      return ownInstance;
110    }
[13524]111
[13538]112    public List<PeerInfo> GetCurrentNetwork() {
113      return peerListManager.GetPeerList();
114    }
[13524]115
[13538]116    private string GetExternalIpAddress() {
117      try {
[13541]118        var strHostName = Dns.GetHostName();
119        // Then using host name, get the IP address list..
120        IPHostEntry ipEntry = Dns.GetHostEntry(strHostName);
121        IPAddress[] addr = ipEntry.AddressList;
122
123        return addr
124          .Select(ip => ip.ToString())
125          .First(str => str.StartsWith(ConfigurationManager.AppSettings["LanIpPrefix"]));
[13538]126      } catch { return null; }
127    }
128
129    private void OnPopulationRecieved(object sender, MessageRecieveEventArgs e) {
130      if (e != null) {
[13541]131        lock (activeQueueLocker) {
[13553]132          foreach (SolutionInfo si in e.data) {
[13541]133            writeQueue.Enqueue(si);
[13538]134          }
[13524]135        }
[13538]136      }
137    }
[13524]138
[13538]139    private void AddError(string source, Exception ex) {
140      if (peerListManager != null) {
141        try {
142          peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message));
143        } catch { }
144      }
[13524]145    }
[13547]146
[13538]147  }
[13524]148}
Note: See TracBrowser for help on using the repository browser.