source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs @ 13937

Last change on this file since 13937 was 13937, checked in by thasling, 4 years ago

#2615:
implemented ConcurrentQueue
made minor changes in Dispose-methods

File size: 2.7 KB
Line 
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.ServiceModel;
5using System.Threading.Tasks;
6using System.Timers;
7using DistributedGA.Core.Domain;
8using DistributedGA.Core.Interface;
9using DistributedGA.Core.Util;
10
11namespace DistributedGA.Core.Implementation {
12  public class WcfMessageSender : IMessageSender {
13
14    private PeerInfo myself;
15
16    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessages;
17
18    private Timer timer; //sends cached messages to network in background
19
20    public void Init(PeerInfo source) {
21      myself = source;
22      bufferedMessages = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>();
23      bufferedMessages.Limit = 1000;
24      timer = new Timer(1000 * 60); //each 5 minutes
25      timer.Elapsed += GenerateSendingTasks;
26      timer.Start();
27    }
28
29    public void SendData(PeerInfo destination, byte[][] data) {
30      bufferedMessages.Enqueue(new KeyValuePair<PeerInfo, byte[][]>(destination, data));
31    }
32
33    public void Dispose() {
34      timer.Stop();
35      timer.Dispose();
36      timer = null;
37    }
38
39    private void GenerateSendingTasks(object sender, ElapsedEventArgs e) {
40      while (!bufferedMessages.IsEmpty) {
41        KeyValuePair<PeerInfo, byte[][]> message;
42        if (bufferedMessages.TryDequeue(out message)) {
43          Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning);
44        }
45      }
46    }
47
48    /// <summary>
49    /// sends data to another peer, is used by a single thread to run in background
50    /// </summary>
51    /// <param name="destination">destination peer</param>
52    /// <param name="data">data to send</param>
53    private void SendDataFromQueue(PeerInfo destination, byte[][] data) {
54      try {
55        Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port));
56        var serviceUrl = "DistributedGA.svc";
57        var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA"));
58        var serviceUri = new Uri(baseUri, serviceUrl);
59
60        var binding = new NetTcpBinding();
61        var endpoint = new EndpointAddress(serviceUri);
62        using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {
63          using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {
64            ((IMessageContract)client).SendData(myself, data); //maybe timout exception...
65          }
66        }
67      }
68      catch { } //if maybe sending failed (because of connection lost, etc.): just ignore
69    }
70  }
71}
Note: See TracBrowser for help on using the repository browser.