Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/19/16 21:37:24 (8 years ago)
Author:
thasling
Message:

#2615:
re-implemented WcfPeerListManager
also sending messages is now done in the background async

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs

    r13918 r13923  
    11using System;
     2using System.Collections.Concurrent;
     3using System.Collections.Generic;
    24using System.ServiceModel;
     5using System.Threading.Tasks;
     6using System.Timers;
    37using DistributedGA.Core.Domain;
    48using DistributedGA.Core.Interface;
     
    610namespace DistributedGA.Core.Implementation {
    711  public class WcfMessageSender : IMessageSender {
     12
    813    private PeerInfo myself;
     14
     15    private ConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessages;
     16
     17    private Timer timer; //sends cached messages to network in background
    918
    1019    public void Init(PeerInfo source) {
    1120      myself = source;
     21      bufferedMessages = new ConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>();
     22      timer = new Timer(1000 * 60); //each 5 minutes
     23      timer.Elapsed += GenerateSendingTasks;
     24      timer.Start();
    1225    }
    1326
    1427    public void SendData(PeerInfo destination, byte[][] data) {
     28      bufferedMessages.Enqueue(new KeyValuePair<PeerInfo, byte[][]>(destination, data));
     29    }
    1530
    16       var serviceUrl = "DistributedGA.svc";
    17       var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA"));
    18       var serviceUri = new Uri(baseUri, serviceUrl);
     31    public void Dispose() {
     32      timer.Stop();
     33      timer.Dispose();
     34      timer = null;
     35    }
    1936
    20       var binding = new NetTcpBinding();
    21       var endpoint = new EndpointAddress(serviceUri);
    22       using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {
    23         using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {
    24           ((IMessageContract)client).SendData(myself, data); //maybe timout exception...
     37    private void GenerateSendingTasks(object sender, ElapsedEventArgs e) {
     38      while (!bufferedMessages.IsEmpty) {
     39        KeyValuePair<PeerInfo, byte[][]> message;
     40        if (bufferedMessages.TryDequeue(out message)) {
     41          Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning);
    2542        }
    2643      }
    2744    }
    2845
    29     public void Dispose() {
     46    /// <summary>
     47    /// sends data to another peer, is used by a single thread to run in background
     48    /// </summary>
     49    /// <param name="destination">destination peer</param>
     50    /// <param name="data">data to send</param>
     51    private void SendDataFromQueue(PeerInfo destination, byte[][] data) {
     52      try {
     53        Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port));
     54        var serviceUrl = "DistributedGA.svc";
     55        var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA"));
     56        var serviceUri = new Uri(baseUri, serviceUrl);
    3057
     58        var binding = new NetTcpBinding();
     59        var endpoint = new EndpointAddress(serviceUri);
     60        using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {
     61          using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {
     62            ((IMessageContract)client).SendData(myself, data); //maybe timout exception...
     63          }
     64        }
     65      }
     66      catch { } //if maybe sending failed (because of connection lost, etc.): just ignore
    3167    }
    3268  }
Note: See TracChangeset for help on using the changeset viewer.