Changeset 13923


Ignore:
Timestamp:
06/19/16 21:37:24 (3 years ago)
Author:
thasling
Message:

#2615:
re-implemented WcfPeerListManager
also sending messages is now done in the background async

Location:
branches/thasling/DistributedGA
Files:
3 edited
1 copied

Legend:

Unmodified
Added
Removed
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs

    r13918 r13923  
    11using System;
     2using System.Collections.Concurrent;
     3using System.Collections.Generic;
    24using System.ServiceModel;
     5using System.Threading.Tasks;
     6using System.Timers;
    37using DistributedGA.Core.Domain;
    48using DistributedGA.Core.Interface;
     
    610namespace DistributedGA.Core.Implementation {
    711  public class WcfMessageSender : IMessageSender {
     12
    813    private PeerInfo myself;
     14
     15    private ConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessages;
     16
     17    private Timer timer; //sends cached messages to network in background
    918
    1019    public void Init(PeerInfo source) {
    1120      myself = source;
     21      bufferedMessages = new ConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>();
     22      timer = new Timer(1000 * 60); //each 5 minutes
     23      timer.Elapsed += GenerateSendingTasks;
     24      timer.Start();
    1225    }
    1326
    1427    public void SendData(PeerInfo destination, byte[][] data) {
     28      bufferedMessages.Enqueue(new KeyValuePair<PeerInfo, byte[][]>(destination, data));
     29    }
    1530
    16       var serviceUrl = "DistributedGA.svc";
    17       var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA"));
    18       var serviceUri = new Uri(baseUri, serviceUrl);
     31    public void Dispose() {
     32      timer.Stop();
     33      timer.Dispose();
     34      timer = null;
     35    }
    1936
    20       var binding = new NetTcpBinding();
    21       var endpoint = new EndpointAddress(serviceUri);
    22       using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {
    23         using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {
    24           ((IMessageContract)client).SendData(myself, data); //maybe timout exception...
     37    private void GenerateSendingTasks(object sender, ElapsedEventArgs e) {
     38      while (!bufferedMessages.IsEmpty) {
     39        KeyValuePair<PeerInfo, byte[][]> message;
     40        if (bufferedMessages.TryDequeue(out message)) {
     41          Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning);
    2542        }
    2643      }
    2744    }
    2845
    29     public void Dispose() {
     46    /// <summary>
     47    /// sends data to another peer, is used by a single thread to run in background
     48    /// </summary>
     49    /// <param name="destination">destination peer</param>
     50    /// <param name="data">data to send</param>
     51    private void SendDataFromQueue(PeerInfo destination, byte[][] data) {
     52      try {
     53        Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port));
     54        var serviceUrl = "DistributedGA.svc";
     55        var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA"));
     56        var serviceUri = new Uri(baseUri, serviceUrl);
    3057
     58        var binding = new NetTcpBinding();
     59        var endpoint = new EndpointAddress(serviceUri);
     60        using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {
     61          using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {
     62            ((IMessageContract)client).SendData(myself, data); //maybe timout exception...
     63          }
     64        }
     65      }
     66      catch { } //if maybe sending failed (because of connection lost, etc.): just ignore
    3167    }
    3268  }
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageService.cs

    r13887 r13923  
    3131                var serviceUri = new Uri(baseUri, serviceUrl);
    3232                NetTcpBinding binding = new NetTcpBinding();
    33                 //using (var host = new ServiceHost(typeof(MessageContractImpl), serviceUri))
    3433
    3534                using (var host = new ServiceHost(messageContract, serviceUri))
     
    4241                }
    4342            }).Start();
    44             //close service again:
    45             //  _ResetEvent.Set();
    4643            return port;
    4744        }
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfPeerListManager.cs

    r13918 r13923  
    77using System.Timers;
    88
    9 namespace DistributedGA.Core.Implementation
    10 {
    11     public class WcfPeerListManager : IPeerListManager
    12     {
    13         private string serverString = null;
     9namespace DistributedGA.Core.Implementation {
     10  public class WcfPeerListManager : IPeerListManager {
    1411
    15         private IContactService client = null;
     12    private string serverString = null;
    1613
    17         private IContactService heartbeatClient = null;
     14    private PeerInfo myself = null;
    1815
    19         private PeerInfo myself = null;
     16    private Timer timer = null; //sends heartbeat to contact-server
    2017
    21         private Timer timer = null; //sends heartbeat to contact-server
     18    private ChannelFactory<IContactService> myChannelFactory;
    2219
    23         public void Init(PeerInfo source, string contactServerUrl)
    24         {
    25             serverString = contactServerUrl;
    26             client = CreateClient();
    27             heartbeatClient = CreateClient();
    28             myself = source;
    29             client.RegisterPeer(source);
    30             timer = new Timer(1000 ); //each 5 minutes
    31             timer.Elapsed += SendHeartbeatToServer;
    32             timer.Start();
    33         }
     20    private IContactService client;
     21
     22    private IContactService heartbeatClient;
     23
     24    public void Init(PeerInfo source, string contactServerUrl) {
     25      serverString = contactServerUrl;
     26      myself = source;
     27      //Init ChannelFactory and Clients
     28      var binding = new NetTcpBinding();
     29      var endpoint = new EndpointAddress(serverString);
     30      myChannelFactory = new ChannelFactory<IContactService>(binding, endpoint);
     31      client = myChannelFactory.CreateChannel();
     32      heartbeatClient = myChannelFactory.CreateChannel();
     33      //Register Peer
     34      client.RegisterPeer(source);
     35      //Start heartbeat timer
     36      timer = new Timer(1000); //each 5 minutes
     37      timer.Elapsed += SendHeartbeatToServer;
     38      timer.Start();
     39    }
     40
     41    public List<PeerInfo> GetPeerList() {
     42      try {
     43        var allPeers = client.GetPeerList(myself); //maybe timout exception...
     44        var peersForMessaging = ChoosePeersForMessaging(allPeers);
     45        //return peersForMessaging;
     46        return allPeers; //TODO: Enable 10% list communication
     47      }
     48      catch { } //if maybe sending failed (because of connection lost, etc.): just ignore
     49      return new List<PeerInfo>();
     50    }
     51
     52    public void SendLogToServer(string msg) {
     53      client.MakeLog(myself, msg);
     54    }
     55
     56    public void Dispose() {
     57      timer.Stop();
     58      timer.Dispose();
     59      ((IClientChannel)client).Close();
     60      ((IClientChannel)heartbeatClient).Close();
     61      myChannelFactory.Close();
     62    }
    3463
    3564
     65    private List<PeerInfo> ChoosePeersForMessaging(List<PeerInfo> allPeers) {
     66      //communicate with 10% of the network
     67      int noOfPeers = allPeers.Count / 10;
     68      List<int> indexList = GetRandomItemIndexes(noOfPeers, 0, allPeers.Count - 1);
     69      List<PeerInfo> res = new List<PeerInfo>();
     70      foreach (int index in indexList) {
     71        res.Add(allPeers.ElementAt(index));
     72      }
     73      return res;
     74    }
    3675
    37         public List<PeerInfo> GetPeerList()
    38         {
    39             var allPeers = client.GetPeerList(myself);
    40             var peersForMessaging = ChoosePeersForMessaging(allPeers);
    41             //return peersForMessaging;
    42             return allPeers; //TODO: Enable 10% list communication
     76    private List<int> GetRandomItemIndexes(int noOfItems, int minValue, int maxValue) {
     77      List<int> res = new List<int>();
     78      Random rnd = new Random();
     79      int tmp = -1;
     80      while (res.Count < noOfItems) {
     81        tmp = rnd.Next(minValue, maxValue);
     82        if (!res.Contains(tmp)) {
     83          res.Add(tmp);
    4384        }
     85      }
     86      return res;
     87    }
    4488
    45         public void SendLogToServer(string msg)
    46         {
    47             client.MakeLog(myself, msg);
    48         }
     89    private void SendHeartbeatToServer(object sender, ElapsedEventArgs e) {
     90      try {
     91        heartbeatClient.UpdateHeartbeat(myself);
     92      }
     93      catch { } //nothing to do
     94    }
    4995
    50         private IContactService CreateClient()
    51         {
    52             var binding = new NetTcpBinding();
    53             var endpoint = new EndpointAddress(serverString);
    54             var myChannelFactory = new ChannelFactory<IContactService>(binding, endpoint);
    55 
    56             IContactService client = null;
    57             client = myChannelFactory.CreateChannel();
    58             return client;
    59         }
    60 
    61         private List<PeerInfo> ChoosePeersForMessaging(List<PeerInfo> allPeers)
    62         {
    63             //communicate with 10% of the network
    64             int noOfPeers = allPeers.Count / 10;
    65             List<int> indexList = GetRandomItemIndexes(noOfPeers, 0, allPeers.Count - 1);
    66             List<PeerInfo> res = new List<PeerInfo>();
    67             foreach (int index in indexList)
    68             {
    69                 res.Add(allPeers.ElementAt(index));
    70             }
    71             return res;
    72         }
    73 
    74         private List<int> GetRandomItemIndexes(int noOfItems, int minValue, int maxValue)
    75         {
    76             List<int> res = new List<int>();
    77             Random rnd = new Random();
    78             int tmp = -1;
    79             while (res.Count < noOfItems)
    80             {
    81                 tmp = rnd.Next(minValue, maxValue);
    82                 if (!res.Contains(tmp))
    83                 {
    84                     res.Add(tmp);
    85                 }
    86             }
    87             return res;
    88         }
    89 
    90         private void SendHeartbeatToServer(object sender, ElapsedEventArgs e)
    91         {
    92             try
    93             {
    94                 heartbeatClient.UpdateHeartbeat(myself);
    95             }
    96             catch { } //nothing to do, exception is raised when getting peer list
    97         }
    98 
    99 
    100 
    101         public void Dispose()
    102         {
    103             timer.Stop();
    104             timer.Dispose();
    105             timer = null;
    106         }
    107     }
     96  }
    10897}
Note: See TracChangeset for help on using the changeset viewer.