1 | using System;
|
---|
2 | using System.CodeDom;
|
---|
3 | using System.Collections.Concurrent;
|
---|
4 | using System.Collections.Generic;
|
---|
5 | using System.Linq;
|
---|
6 | using System.ServiceModel;
|
---|
7 | using System.Threading.Tasks;
|
---|
8 | using System.Timers;
|
---|
9 | using DistributedGA.Core.Domain;
|
---|
10 | using DistributedGA.Core.Interface;
|
---|
11 | using DistributedGA.Core.Util;
|
---|
12 |
|
---|
13 | namespace DistributedGA.Core.Implementation {
|
---|
14 | public class WcfMessageSender : IMessageSender, IDisposable {
|
---|
15 |
|
---|
16 | private PeerInfo myself;
|
---|
17 |
|
---|
18 | //providing two queues for faster access
|
---|
19 | private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> bufferedMessagesWrite;
|
---|
20 | private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> bufferedMessagesRead;
|
---|
21 |
|
---|
22 | private Timer timer; //sends cached messages to network in background
|
---|
23 |
|
---|
24 | private Object timerLock = new Object();
|
---|
25 | private bool isActive = false;
|
---|
26 |
|
---|
27 | public void Init(PeerInfo source, int messageCacheCapacity) {
|
---|
28 | myself = source;
|
---|
29 | bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
|
---|
30 | bufferedMessagesRead.Limit = messageCacheCapacity;
|
---|
31 | bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
|
---|
32 | bufferedMessagesWrite.Limit = bufferedMessagesRead.Limit;
|
---|
33 | timer = new Timer(1000 * 10); //each 10 seconds
|
---|
34 | timer.Elapsed += GenerateSendingTasks;
|
---|
35 | timer.Start();
|
---|
36 | }
|
---|
37 |
|
---|
38 | public void SendData(PeerInfo destination, ByteArrayWrapper data) {
|
---|
39 | bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, ByteArrayWrapper>(destination, data));
|
---|
40 | }
|
---|
41 |
|
---|
42 | public void Dispose() {
|
---|
43 | timer.Stop();
|
---|
44 | timer.Dispose();
|
---|
45 | timer = null;
|
---|
46 | }
|
---|
47 |
|
---|
48 | private void GenerateSendingTasks(object sender, ElapsedEventArgs e) {
|
---|
49 | if (isActive) return;
|
---|
50 | lock (timerLock) {
|
---|
51 | try {
|
---|
52 | isActive = true;
|
---|
53 | //changing the queues...
|
---|
54 | var tmp = bufferedMessagesRead;
|
---|
55 | bufferedMessagesRead = bufferedMessagesWrite;
|
---|
56 | bufferedMessagesWrite = tmp;
|
---|
57 | List<KeyValuePair<PeerInfo, ByteArrayWrapper>> messages = new List<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
|
---|
58 | while (!bufferedMessagesRead.IsEmpty) {
|
---|
59 | KeyValuePair<PeerInfo, ByteArrayWrapper> message;
|
---|
60 | if (bufferedMessagesRead.TryDequeue(out message)) {
|
---|
61 | messages.Add(message);
|
---|
62 | //Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning);
|
---|
63 | }
|
---|
64 | }
|
---|
65 | //now: merge them and start sending tasks
|
---|
66 |
|
---|
67 | List<KeyValuePair<PeerInfo, ByteArrayWrapper[]>> mergedMessages = MergeMessages(messages);
|
---|
68 |
|
---|
69 | var runningTasks = new List<Task>();
|
---|
70 | foreach (var item in mergedMessages) {
|
---|
71 | var itemToSend = item;
|
---|
72 | var t = Task.Factory.StartNew(() => SendDataFromQueue(itemToSend.Key, itemToSend.Value),
|
---|
73 | TaskCreationOptions.LongRunning);
|
---|
74 | runningTasks.Add(t);
|
---|
75 | }
|
---|
76 |
|
---|
77 | Task.WaitAll(runningTasks.ToArray());
|
---|
78 | }
|
---|
79 | finally {
|
---|
80 | isActive = false;
|
---|
81 | }
|
---|
82 | }
|
---|
83 | }
|
---|
84 |
|
---|
85 | /// <summary>
|
---|
86 | /// Merges and resorts all Messages by their destination
|
---|
87 | /// </summary>
|
---|
88 | /// <param name="messages">a list with multiple messages to the same destination</param>
|
---|
89 | /// <returns></returns>
|
---|
90 | private List<KeyValuePair<PeerInfo, ByteArrayWrapper[]>> MergeMessages(List<KeyValuePair<PeerInfo, ByteArrayWrapper>> messages) {
|
---|
91 | Dictionary<PeerInfo, List<ByteArrayWrapper>> cache = new Dictionary<PeerInfo, List<ByteArrayWrapper>>();
|
---|
92 | foreach (var messagePackage in messages) {
|
---|
93 | if (!cache.ContainsKey(messagePackage.Key)) {
|
---|
94 | cache.Add(messagePackage.Key, new List<ByteArrayWrapper>());
|
---|
95 | }
|
---|
96 | //for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) {
|
---|
97 | cache[messagePackage.Key].Add(messagePackage.Value);
|
---|
98 | //}
|
---|
99 | }
|
---|
100 |
|
---|
101 | return cache.Keys.Select(dest => new KeyValuePair<PeerInfo, ByteArrayWrapper[]>(dest, cache[dest].ToArray())).ToList();
|
---|
102 | }
|
---|
103 |
|
---|
104 | /// <summary>
|
---|
105 | /// sends data to another peer, is used by a single thread to run in background
|
---|
106 | /// </summary>
|
---|
107 | /// <param name="destination">destination peer</param>
|
---|
108 | /// <param name="data">data to send</param>
|
---|
109 | private void SendDataFromQueue(PeerInfo destination, ByteArrayWrapper[] data) {
|
---|
110 | try {
|
---|
111 | Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port));
|
---|
112 | var serviceUrl = "DistributedGA.svc";
|
---|
113 | var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA"));
|
---|
114 | var serviceUri = new Uri(baseUri, serviceUrl);
|
---|
115 |
|
---|
116 | var binding = new NetTcpBinding();
|
---|
117 | binding.MaxReceivedMessageSize = 20000000;
|
---|
118 | binding.MaxBufferSize = 20000000;
|
---|
119 | binding.MaxBufferPoolSize = 20000000;
|
---|
120 | binding.ReaderQuotas.MaxArrayLength = 20000000;
|
---|
121 | binding.ReaderQuotas.MaxDepth = 32;
|
---|
122 | binding.ReaderQuotas.MaxStringContentLength = 20000000;
|
---|
123 |
|
---|
124 | var endpoint = new EndpointAddress(serviceUri);
|
---|
125 | using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {
|
---|
126 | using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {
|
---|
127 | for (int i = 0; i < data.GetLength(0); i++) {
|
---|
128 | ((IMessageContract)client).SendData(myself, data[i].Array); //maybe exception...
|
---|
129 | }
|
---|
130 | }
|
---|
131 | }
|
---|
132 | }
|
---|
133 | catch (Exception ex) {
|
---|
134 | //ignore
|
---|
135 | Console.WriteLine(ex.Message);
|
---|
136 | }
|
---|
137 | }
|
---|
138 |
|
---|
139 | }
|
---|
140 | }
|
---|