Free cookie consent management tool by TermsFeed Policy Generator

source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs @ 14061

Last change on this file since 14061 was 14009, checked in by thasling, 8 years ago

#2615:
made minor changes,
wrong commit happende last time, so still tbd:
-sort List
-dispose of anaylzer is never called

File size: 5.4 KB
Line 
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.ServiceModel;
5using System.Threading.Tasks;
6using System.Timers;
7using DistributedGA.Core.Domain;
8using DistributedGA.Core.Interface;
9using DistributedGA.Core.Util;
10
11namespace DistributedGA.Core.Implementation {
12  public class WcfMessageSender : IMessageSender {
13
14    private PeerInfo myself;
15
16    //providing two queues for faster access
17    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesWrite;
18    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesRead;
19
20    private Timer timer; //sends cached messages to network in background
21
22    private Object timerLock = new Object();
23
24    public void Init(PeerInfo source, int messageCacheCapacity) {
25      myself = source;
26      bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
27      bufferedMessagesRead.Limit = messageCacheCapacity;
28      bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
29      bufferedMessagesWrite.Limit = bufferedMessagesRead.Limit;
30      timer = new Timer(1000 * 10); //each 10 seconds
31      timer.Elapsed += GenerateSendingTasks;
32      timer.Start();
33    }
34
35    public void SendData(PeerInfo destination, byte[] data) {
36      bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[]>(destination, data));
37    }
38
39    public void Dispose() {
40      timer.Stop();
41      timer.Dispose();
42      timer = null;
43    }
44
45    private void GenerateSendingTasks(object sender, ElapsedEventArgs e) {
46      lock (timerLock) {
47        //changing the queues...
48        var tmp = bufferedMessagesRead;
49        bufferedMessagesRead = bufferedMessagesWrite;
50        bufferedMessagesWrite = tmp;
51        List<KeyValuePair<PeerInfo, byte[]>> messages = new List<KeyValuePair<PeerInfo, byte[]>>();
52        while (!bufferedMessagesRead.IsEmpty) {
53          KeyValuePair<PeerInfo, byte[]> message;
54          if (bufferedMessagesRead.TryDequeue(out message)) {
55            messages.Add(message);
56            //Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning);
57          }
58        }
59        //now: merge them and start sending tasks
60
61        List<KeyValuePair<PeerInfo, byte[][]>> mergedMessages = MergeMessages(messages);
62        foreach (var item in mergedMessages) {
63          Task.Factory.StartNew(() => SendDataFromQueue(item.Key, item.Value), TaskCreationOptions.LongRunning);
64        }
65      }
66    }
67
68    /// <summary>
69    /// Merges and resorts all Messages by their destination
70    /// </summary>
71    /// <param name="messages">a list with multiple messages to the same  destination</param>
72    /// <returns></returns>
73    private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[]>> messages) {
74      List<KeyValuePair<PeerInfo, byte[][]>> res = new List<KeyValuePair<PeerInfo, byte[][]>>();
75      Dictionary<PeerInfo, List<byte[]>> cache = new Dictionary<PeerInfo, List<byte[]>>();
76      foreach (var messagePackage in messages) {
77        if (!cache.ContainsKey(messagePackage.Key)) {
78          cache.Add(messagePackage.Key, new List<byte[]>());
79        }
80        //for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) {
81        cache[messagePackage.Key].Add(messagePackage.Value);
82        //}
83      }
84      //now we have a dictionary with all messages per destionation
85      //so: create a byte[][] again and return
86      foreach (var dest in cache.Keys) {
87        byte[][] messagesPerDest = new byte[cache[dest].Count][];
88        for (int i = 0; i <= messagesPerDest.GetUpperBound(0); i++) {
89          messagesPerDest[i] = cache[dest][i];
90        }
91        res.Add(new KeyValuePair<PeerInfo, byte[][]>(dest, messagesPerDest));
92      }
93      return res;
94    }
95
96    /// <summary>
97    /// sends data to another peer, is used by a single thread to run in background
98    /// </summary>
99    /// <param name="destination">destination peer</param>
100    /// <param name="data">data to send</param>
101    private void SendDataFromQueue(PeerInfo destination, byte[][] data) {
102      try {
103        Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port));
104        var serviceUrl = "DistributedGA.svc";
105        var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA"));
106        var serviceUri = new Uri(baseUri, serviceUrl);
107
108        var binding = new NetTcpBinding();
109        binding.MaxReceivedMessageSize = 20000000;
110        binding.MaxBufferSize = 20000000;
111        binding.MaxBufferPoolSize = 20000000;
112        binding.ReaderQuotas.MaxArrayLength = 20000000;
113        binding.ReaderQuotas.MaxDepth = 32;
114        binding.ReaderQuotas.MaxStringContentLength = 20000000;
115
116        var endpoint = new EndpointAddress(serviceUri);
117        using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {
118          using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {
119            for (int i = 0; i < data.GetUpperBound(0) + 1; i++) {
120              ((IMessageContract)client).SendData(myself, data[i]); //maybe exception...
121            }
122          }
123        }
124      }
125      catch (Exception ex) {
126        //ignore
127        Console.WriteLine(ex.Message);
128      }
129    }
130
131  }
132}
Note: See TracBrowser for help on using the repository browser.