1 | using System;
2 | using System.Collections.Concurrent;
3 | using System.Collections.Generic;
4 | using System.ServiceModel;
5 | using System.Threading.Tasks;
6 | using System.Timers;
7 | using DistributedGA.Core.Domain;
8 | using DistributedGA.Core.Interface;
9 | using DistributedGA.Core.Util;
10 |
11 | namespace DistributedGA.Core.Implementation {
12 | public class WcfMessageSender : IMessageSender {
13 |
14 | private PeerInfo myself;
15 |
16 | //providing two queues for faster access
17 | private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesWrite;
18 | private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesRead;
19 |
20 | private Timer timer; //sends cached messages to network in background
21 |
22 | private Object timerLock = new Object();
23 |
24 | public void Init(PeerInfo source, int messageCacheCapacity) {
25 | myself = source;
26 | bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
27 | bufferedMessagesRead.Limit = messageCacheCapacity;
28 | bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
29 | bufferedMessagesWrite.Limit = bufferedMessagesRead.Limit;
30 | timer = new Timer(1000 * 10); //each 10 seconds
31 | timer.Elapsed += GenerateSendingTasks;
32 | timer.Start();
33 | }
34 |
35 | public void SendData(PeerInfo destination, byte[] data) {
36 | bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[]>(destination, data));
37 | }
38 |
39 | public void Dispose() {
40 | timer.Stop();
41 | timer.Dispose();
42 | timer = null;
43 | }
44 |
45 | private void GenerateSendingTasks(object sender, ElapsedEventArgs e) {
46 | lock (timerLock) {
47 | //changing the queues...
48 | var tmp = bufferedMessagesRead;
49 | bufferedMessagesRead = bufferedMessagesWrite;
50 | bufferedMessagesWrite = tmp;
51 | List<KeyValuePair<PeerInfo, byte[]>> messages = new List<KeyValuePair<PeerInfo, byte[]>>();
52 | while (!bufferedMessagesRead.IsEmpty) {
53 | KeyValuePair<PeerInfo, byte[]> message;
54 | if (bufferedMessagesRead.TryDequeue(out message)) {
55 | messages.Add(message);
56 | //Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning);
57 | }
58 | }
59 | //now: merge them and start sending tasks
60 |
61 | List<KeyValuePair<PeerInfo, byte[][]>> mergedMessages = MergeMessages(messages);
62 | foreach (var item in mergedMessages) {
63 | Task.Factory.StartNew(() => SendDataFromQueue(item.Key, item.Value), TaskCreationOptions.LongRunning);
64 | }
65 | }
66 | }
67 |
68 | /// <summary>
69 | /// Merges and resorts all Messages by their destination
70 | /// </summary>
71 | /// <param name="messages">a list with multiple messages to the same destination</param>
72 | /// <returns></returns>
73 | private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[]>> messages) {
74 | List<KeyValuePair<PeerInfo, byte[][]>> res = new List<KeyValuePair<PeerInfo, byte[][]>>();
75 | Dictionary<PeerInfo, List<byte[]>> cache = new Dictionary<PeerInfo, List<byte[]>>();
76 | foreach (var messagePackage in messages) {
77 | if (!cache.ContainsKey(messagePackage.Key)) {
78 | cache.Add(messagePackage.Key, new List<byte[]>());
79 | }
80 | //for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) {
81 | cache[messagePackage.Key].Add(messagePackage.Value);
82 | //}
83 | }
84 | //now we have a dictionary with all messages per destionation
85 | //so: create a byte[][] again and return
86 | foreach (var dest in cache.Keys) {
87 | byte[][] messagesPerDest = new byte[cache[dest].Count][];
88 | for (int i = 0; i <= messagesPerDest.GetUpperBound(0); i++) {
89 | messagesPerDest[i] = cache[dest][i];
90 | }
91 | res.Add(new KeyValuePair<PeerInfo, byte[][]>(dest, messagesPerDest));
92 | }
93 | return res;
94 | }
95 |
96 | /// <summary>
97 | /// sends data to another peer, is used by a single thread to run in background
98 | /// </summary>
99 | /// <param name="destination">destination peer</param>
100 | /// <param name="data">data to send</param>
101 | private void SendDataFromQueue(PeerInfo destination, byte[][] data) {
102 | try {
103 | Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port));
104 | var serviceUrl = "DistributedGA.svc";
105 | var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA"));
106 | var serviceUri = new Uri(baseUri, serviceUrl);
107 |
108 | var binding = new NetTcpBinding();
109 | binding.MaxReceivedMessageSize = 20000000;
110 | binding.MaxBufferSize = 20000000;
111 | binding.MaxBufferPoolSize = 20000000;
112 | binding.ReaderQuotas.MaxArrayLength = 20000000;
113 | binding.ReaderQuotas.MaxDepth = 32;
114 | binding.ReaderQuotas.MaxStringContentLength = 20000000;
115 |
116 | var endpoint = new EndpointAddress(serviceUri);
117 | using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {
118 | using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {
119 | for (int i = 0; i < data.GetUpperBound(0) + 1; i++) {
120 | ((IMessageContract)client).SendData(myself, data[i]); //maybe exception...
121 | }
122 | }
123 | }
124 | }
125 | catch (Exception ex) {
126 | //ignore
127 | Console.WriteLine(ex.Message);
128 | }
129 | }
130 |
131 | }
132 | }