Free cookie consent management tool by TermsFeed Policy Generator

source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs @ 13969

Last change on this file since 13969 was 13969, checked in by thasling, 8 years ago

#2615:
implemented migration strategies
log in HL now also logs exceptions of peers

File size: 5.3 KB
RevLine 
[13538]1using System;
[13524]2using System.Collections.Concurrent;
3using System.Collections.Generic;
[13541]4using System.Linq;
[13524]5using System.Net;
[13538]6using DistributedGA.Core.Domain;
7using DistributedGA.Core.Interface;
[13937]8using DistributedGA.Core.Util;
[13524]9
[13924]10namespace DistributedGA.Core.Implementation {
11  public class PeerNetworkMessageHandler : IMessageHandler {
12    //own peer-instance Information
13    private PeerInfo ownInstance = null;
[13524]14
[13924]15    //uses peer-list from IPeerListManager to decide which peers to contact
16    private IPeerListManager peerListManager;
[13524]17
[13924]18    //uses IMessageSender to send populations to peers
19    private IMessageSender sender = null;
[13524]20
[13924]21    //provides current population for the higher layer IMigrationOperator
[13945]22    //two queues are used to gather and and provide population more efficiently
[13947]23    private object activeQueueLocker = new Object();
[13946]24    private SizedConcurrentQueue<byte[]> writeQueue;
[13937]25    private SizedConcurrentQueue<byte[]> readQueue;
[13524]26
[13924]27    //uses IMessageService for recieving population from one peer at once
28    private IMessageService host = null;
[13524]29
[13969]30   public event EventHandler<Exception> ExceptionOccurend;   
[13541]31
[13965]32    public void Init(string lanIpPrefix, string contactServerUrl, string problemInstance, int messageCacheCapacity, double communicationRate) {
[13924]33      try {
34        ownInstance = new PeerInfo() {
35          IpAddress = GetInternalIpAddress(lanIpPrefix),
36          Port = 0,
[13956]37          ProblemInstance = problemInstance
[13924]38        }; // TODO: get own peerinfo
[13524]39
[13946]40        writeQueue = new SizedConcurrentQueue<byte[]>();
[13937]41        readQueue = new SizedConcurrentQueue<byte[]>();
[13956]42        writeQueue.Limit = messageCacheCapacity;
[13946]43        readQueue.Limit = writeQueue.Limit;
[13524]44
[13924]45        host = new WcfMessageService();
46        ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet
[13937]47        host.OnDataRecieved += new EventHandler<MessageRecieveEventArgs>(OnDataRecieved);
[13524]48
[13924]49        peerListManager = new WcfPeerListManager();
[13956]50        peerListManager.Init(ownInstance, contactServerUrl, communicationRate);
[13524]51
[13924]52        sender = new WcfMessageSender();
[13956]53        sender.Init(ownInstance, messageCacheCapacity);
[13524]54
[13924]55      }
56      catch (Exception ex) {
57        AddError("PeerNetworkMessageHandler.Init", ex);
58      }
59    }
[13524]60
[13924]61    public void Dispose() {
62      try {
63        host.Dispose();
64        sender.Dispose();
65        peerListManager.Dispose();
66      }
67      catch (Exception ex) {
68        AddError("PeerNetworkMessageHandler.Dispose", ex);
69      }
70    }
[13547]71
[13924]72    public void PublishDataToNetwork(byte[][] data) {
73      try {
74        foreach (PeerInfo peer in peerListManager.GetPeerList()) {
75          //maybe something will go wrong during network communication...
76          try {
77            sender.SendData(peer, data);
78          }
79          catch (Exception ex) {
80            AddError("PeerNetworkMessageHandler.PublishDataToNetwork(during sending to one peer!)", ex);
81          }
[13524]82        }
[13924]83      }
84      catch (Exception ex) {
85        AddError("PeerNetworkMessageHandler.PublishDataToNetwork", ex);
86      }
87    }
[13524]88
[13924]89    public byte[][] GetDataFromNetwork() {
90      try {
91        List<byte[]> res = new List<byte[]>();
92        byte[] item = null;
93        lock (activeQueueLocker) {
[13945]94          //changing the current queue for storing items to send
95          //then read from the now unselect queue
[13946]96          var tmp = readQueue;
97          readQueue = writeQueue;
98          writeQueue = tmp;
[13524]99        }
[13541]100
[13924]101        //creating resultset
[13946]102        while (!readQueue.IsEmpty) {
103          if (readQueue.TryDequeue(out item)) {
[13924]104            res.Add(item);
105          }
[13524]106        }
[13924]107        return res.ToArray();
108      }
109      catch (Exception ex) {
110        AddError("PeerNetworkMessageHandler.GetDataFromNetwork", ex);
111        return null;
112      }
113    }
[13524]114
[13924]115    public PeerInfo GetPeerInfo() {
116      return ownInstance;
117    }
[13524]118
[13924]119    public List<PeerInfo> GetCurrentNetwork() {
120      return peerListManager.GetPeerList();
121    }
[13524]122
[13969]123    private void PropagateException(Exception ex) {
124      //if (CountdownCompleted != null)
125      //  CountdownCompleted(this, e);
126      if (ExceptionOccurend != null) {
127        ExceptionOccurend(this, ex);
128      }
129    }
130
[13924]131    private string GetInternalIpAddress(string ipPrefix) {
132      try {
133        var strHostName = Dns.GetHostName();
134        // Then using host name, get the IP address list..
135        IPHostEntry ipEntry = Dns.GetHostEntry(strHostName);
136        IPAddress[] addr = ipEntry.AddressList;
[13541]137
[13924]138        return addr
139          .Select(ip => ip.ToString())
140          .First(str => str.StartsWith(ipPrefix));
141      }
142      catch { return null; }
143    }
144
[13937]145    private void OnDataRecieved(object sender, MessageRecieveEventArgs e) {
[13957]146      if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) {
[13924]147        lock (activeQueueLocker) {
148          foreach (byte[] item in e.data) {
[13946]149            writeQueue.Enqueue(item);
[13924]150          }
[13887]151        }
[13956]152     
[13924]153      }
154    }
[13538]155
[13924]156    private void AddError(string source, Exception ex) {
[13969]157      PropagateException(ex);
[13924]158      if (peerListManager != null) {
159        try {
160          peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message));
[13524]161        }
[13924]162        catch { }
163      }
164    }
[13524]165
[13924]166  }
[13524]167}
Note: See TracBrowser for help on using the repository browser.