Free cookie consent management tool by TermsFeed Policy Generator

source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs @ 13956

Last change on this file since 13956 was 13956, checked in by thasling, 8 years ago

#2615:
finally fixed bug concerning message send to the wrong peers
also made communicationRate and messageCacheCapacity as paramters
integration in P2PMigrationAnalyzer still TBD

File size: 2.8 KB
Line 
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.ServiceModel;
5using System.Threading.Tasks;
6using System.Timers;
7using DistributedGA.Core.Domain;
8using DistributedGA.Core.Interface;
9using DistributedGA.Core.Util;
10
11namespace DistributedGA.Core.Implementation {
12  public class WcfMessageSender : IMessageSender {
13
14    private PeerInfo myself;
15
16    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessages;
17
18    private Timer timer; //sends cached messages to network in background
19
20    private Object timerLock = new Object();
21
22    public void Init(PeerInfo source, int messageCacheCapacity) {
23      myself = source;
24      bufferedMessages = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>();
25      bufferedMessages.Limit = messageCacheCapacity;
26      timer = new Timer(1000 * 60); //each 5 minutes
27      timer.Elapsed += GenerateSendingTasks;
28      timer.Start();
29    }
30
31    public void SendData(PeerInfo destination, byte[][] data) {
32      bufferedMessages.Enqueue(new KeyValuePair<PeerInfo, byte[][]>(destination, data));
33    }
34
35    public void Dispose() {
36      timer.Stop();
37      timer.Dispose();
38      timer = null;
39    }
40
41    private void GenerateSendingTasks(object sender, ElapsedEventArgs e) {
42      while (!bufferedMessages.IsEmpty) {
43        KeyValuePair<PeerInfo, byte[][]> message;
44        if (bufferedMessages.TryDequeue(out message)) {
45          Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning);
46        }
47      }
48    }
49
50    /// <summary>
51    /// sends data to another peer, is used by a single thread to run in background
52    /// </summary>
53    /// <param name="destination">destination peer</param>
54    /// <param name="data">data to send</param>
55    private void SendDataFromQueue(PeerInfo destination, byte[][] data) {
56      lock (timerLock) {
57        try {
58          Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port));
59          var serviceUrl = "DistributedGA.svc";
60          var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA"));
61          var serviceUri = new Uri(baseUri, serviceUrl);
62
63          var binding = new NetTcpBinding();
64          var endpoint = new EndpointAddress(serviceUri);
65          using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {
66            using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {
67              ((IMessageContract)client).SendData(myself, data); //maybe timout exception...
68            }
69          }
70        }
71        catch { } //if maybe sending failed (because of connection lost, etc.): just ignore
72      }
73    }
74
75  }
76}
Note: See TracBrowser for help on using the repository browser.