Free cookie consent management tool by TermsFeed Policy Generator

source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs @ 13556

Last change on this file since 13556 was 13556, checked in by thasling, 8 years ago

new hive project

File size: 4.6 KB
RevLine 
[13538]1using System;
[13524]2using System.Collections.Concurrent;
3using System.Collections.Generic;
[13541]4using System.Linq;
[13524]5using System.Net;
[13538]6using DistributedGA.Core.Domain;
7using DistributedGA.Core.Interface;
[13524]8
[13538]9namespace DistributedGA.Core.Implementation {
10  public class PeerNetworkMessageHandler : IMessageHandler {
11    //own peer-instance Information
12    private PeerInfo ownInstance = null;
[13524]13
[13538]14    //uses peer-list from IPeerListManager to decide which peers to contact
15    private IPeerListManager peerListManager;
[13524]16
[13538]17    //uses IMessageSender to send populations to peers
18    private IMessageSender sender = null;
[13524]19
[13538]20    //provides current population for the higher layer IMigrationOperator
21    //to queues are used to gather and and provide population more efficiently
[13541]22    private object activeQueueLocker = new object();
23    private ConcurrentQueue<SolutionInfo> writeQueue;
24    private ConcurrentQueue<SolutionInfo> readQueue;
[13524]25
[13538]26    //uses IMessageService for recieving population from one peer at once
27    private IMessageService host = null;
[13524]28
[13541]29
30
[13556]31    public void Init(string lanIpPrefix, string contactServerUrl) {
[13538]32      try {
33        ownInstance = new PeerInfo() {
[13556]34          IpAddress = GetExternalIpAddress(lanIpPrefix),
[13538]35          Port = 0,
36          ProblemInstance = "TestProblem"
37        }; // TODO: get own peerinfo
[13524]38
[13541]39        writeQueue = new ConcurrentQueue<SolutionInfo>();
40        readQueue = new ConcurrentQueue<SolutionInfo>();
[13524]41
[13538]42        host = new WcfMessageService();
43        ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet
[13553]44        host.OnDataRecieved += new EventHandler<MessageRecieveEventArgs>(OnPopulationRecieved);
[13524]45
[13538]46        peerListManager = new WcfPeerListManager();
[13556]47        peerListManager.Init(ownInstance, contactServerUrl);
[13524]48
[13538]49        sender = new WcfMessageSender();
50        sender.Init(ownInstance);
[13524]51
[13538]52      } catch (Exception ex) {
53        AddError("PeerNetworkMessageHandler.Init", ex);
54      }
55    }
[13524]56
[13547]57    public void Dispose() {
58      try {
59        host.Dispose();
60      } catch (Exception ex) {
61        AddError("PeerNetworkMessageHandler.Dispose", ex);
62      }
63    }
64
[13553]65    public void PublishDataToNetwork(SolutionInfo[] data) {
[13538]66      try {
67        foreach (PeerInfo peer in peerListManager.GetPeerList()) {
68          //HACK: manipulate for monitoring in test
[13553]69          foreach (SolutionInfo si in data) {
[13538]70            si.IterationNumber = ownInstance.Port;
71          }
72          //maybe something will go wrong during network communication...
73          try {
[13553]74            sender.SendData(peer, data);
[13538]75          } catch (Exception ex) {
[13548]76            AddError("PeerNetworkMessageHandler.PublishMigrationInfo(during sending to one peer!)", ex);
[13538]77          }
[13524]78        }
[13538]79      } catch (Exception ex) {
80        AddError("PeerNetworkMessageHandler.PublishMigrationInfo", ex);
81      }
82    }
[13524]83
[13553]84    public SolutionInfo[] GetDataFromNetwork() {
[13538]85      try {
86        List<SolutionInfo> res = new List<SolutionInfo>();
87        SolutionInfo item = null;
[13541]88        lock (activeQueueLocker) {
89          var tmp = readQueue;
90          readQueue = writeQueue;
91          writeQueue = tmp;
[13524]92        }
[13541]93
[13538]94        //creating resultset
[13541]95        while (!readQueue.IsEmpty) {
96          if (readQueue.TryDequeue(out item)) {
[13538]97            res.Add(item);
98          }
[13524]99        }
[13538]100        return res.ToArray();
101      } catch (Exception ex) {
102        AddError("PeerNetworkMessageHandler.GetMigrationInfo", ex);
103        return null;
104      }
105    }
[13524]106
[13538]107    public PeerInfo GetPeerInfo() {
108      return ownInstance;
109    }
[13524]110
[13538]111    public List<PeerInfo> GetCurrentNetwork() {
112      return peerListManager.GetPeerList();
113    }
[13524]114
[13556]115    private string GetExternalIpAddress(string ipPrefix) {
[13538]116      try {
[13541]117        var strHostName = Dns.GetHostName();
118        // Then using host name, get the IP address list..
119        IPHostEntry ipEntry = Dns.GetHostEntry(strHostName);
120        IPAddress[] addr = ipEntry.AddressList;
121
122        return addr
123          .Select(ip => ip.ToString())
[13556]124          .First(str => str.StartsWith(ipPrefix));
[13538]125      } catch { return null; }
126    }
127
128    private void OnPopulationRecieved(object sender, MessageRecieveEventArgs e) {
129      if (e != null) {
[13541]130        lock (activeQueueLocker) {
[13553]131          foreach (SolutionInfo si in e.data) {
[13541]132            writeQueue.Enqueue(si);
[13538]133          }
[13524]134        }
[13538]135      }
136    }
[13524]137
[13538]138    private void AddError(string source, Exception ex) {
139      if (peerListManager != null) {
140        try {
141          peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message));
142        } catch { }
143      }
[13524]144    }
[13547]145
[13538]146  }
[13524]147}
Note: See TracBrowser for help on using the repository browser.