Free cookie consent management tool by TermsFeed Policy Generator

source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs @ 13923

Last change on this file since 13923 was 13923, checked in by thasling, 8 years ago

#2615:
re-implemented WcfPeerListManager
also sending messages is now done in the background async

File size: 2.6 KB
Line 
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.ServiceModel;
5using System.Threading.Tasks;
6using System.Timers;
7using DistributedGA.Core.Domain;
8using DistributedGA.Core.Interface;
9
10namespace DistributedGA.Core.Implementation {
11  public class WcfMessageSender : IMessageSender {
12
13    private PeerInfo myself;
14
15    private ConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessages;
16
17    private Timer timer; //sends cached messages to network in background
18
19    public void Init(PeerInfo source) {
20      myself = source;
21      bufferedMessages = new ConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>();
22      timer = new Timer(1000 * 60); //each 5 minutes
23      timer.Elapsed += GenerateSendingTasks;
24      timer.Start();
25    }
26
27    public void SendData(PeerInfo destination, byte[][] data) {
28      bufferedMessages.Enqueue(new KeyValuePair<PeerInfo, byte[][]>(destination, data));
29    }
30
31    public void Dispose() {
32      timer.Stop();
33      timer.Dispose();
34      timer = null;
35    }
36
37    private void GenerateSendingTasks(object sender, ElapsedEventArgs e) {
38      while (!bufferedMessages.IsEmpty) {
39        KeyValuePair<PeerInfo, byte[][]> message;
40        if (bufferedMessages.TryDequeue(out message)) {
41          Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning);
42        }
43      }
44    }
45
46    /// <summary>
47    /// sends data to another peer, is used by a single thread to run in background
48    /// </summary>
49    /// <param name="destination">destination peer</param>
50    /// <param name="data">data to send</param>
51    private void SendDataFromQueue(PeerInfo destination, byte[][] data) {
52      try {
53        Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port));
54        var serviceUrl = "DistributedGA.svc";
55        var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA"));
56        var serviceUri = new Uri(baseUri, serviceUrl);
57
58        var binding = new NetTcpBinding();
59        var endpoint = new EndpointAddress(serviceUri);
60        using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {
61          using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {
62            ((IMessageContract)client).SendData(myself, data); //maybe timout exception...
63          }
64        }
65      }
66      catch { } //if maybe sending failed (because of connection lost, etc.): just ignore
67    }
68  }
69}
Note: See TracBrowser for help on using the repository browser.