Free cookie consent management tool by TermsFeed Policy Generator

source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs @ 15802

Last change on this file since 15802 was 14261, checked in by thasling, 8 years ago

#2615:
re-enabled log in P2PMigrationAnalyzer
made code formatings

File size: 5.6 KB
Line 
1using System;
2using System.CodeDom;
3using System.Collections.Concurrent;
4using System.Collections.Generic;
5using System.Linq;
6using System.ServiceModel;
7using System.Threading.Tasks;
8using System.Timers;
9using DistributedGA.Core.Domain;
10using DistributedGA.Core.Interface;
11using DistributedGA.Core.Util;
12
13namespace DistributedGA.Core.Implementation {
14  public class WcfMessageSender : IMessageSender, IDisposable {
15
16    private PeerInfo myself;
17
18    //providing two queues for faster access
19    private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> bufferedMessagesWrite;
20    private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> bufferedMessagesRead;
21
22    private Timer timer; //sends cached messages to network in background
23
24    private Object timerLock = new Object();
25    private bool isActive = false;
26
27    public void Init(PeerInfo source, int messageCacheCapacity) {
28      myself = source;
29      bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
30      bufferedMessagesRead.Limit = messageCacheCapacity;
31      bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
32      bufferedMessagesWrite.Limit = bufferedMessagesRead.Limit;
33      timer = new Timer(1000 * 10); //each 10 seconds
34      timer.Elapsed += GenerateSendingTasks;
35      timer.Start();
36    }
37
38    public void SendData(PeerInfo destination, ByteArrayWrapper data) {
39      bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, ByteArrayWrapper>(destination, data));
40    }
41
42    public void Dispose() {
43      timer.Stop();
44      timer.Dispose();
45      timer = null;
46    }
47
48    private void GenerateSendingTasks(object sender, ElapsedEventArgs e) {
49      if (isActive) return;
50      lock (timerLock) {
51        try {
52          isActive = true;
53          //changing the queues...
54          var tmp = bufferedMessagesRead;
55          bufferedMessagesRead = bufferedMessagesWrite;
56          bufferedMessagesWrite = tmp;
57          List<KeyValuePair<PeerInfo, ByteArrayWrapper>> messages = new List<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
58          while (!bufferedMessagesRead.IsEmpty) {
59            KeyValuePair<PeerInfo, ByteArrayWrapper> message;
60            if (bufferedMessagesRead.TryDequeue(out message)) {
61              messages.Add(message);
62              //Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning);
63            }
64          }
65          //now: merge them and start sending tasks
66
67          List<KeyValuePair<PeerInfo, ByteArrayWrapper[]>> mergedMessages = MergeMessages(messages);
68
69          var runningTasks = new List<Task>();
70          foreach (var item in mergedMessages) {
71            var itemToSend = item;
72            var t = Task.Factory.StartNew(() => SendDataFromQueue(itemToSend.Key, itemToSend.Value),
73              TaskCreationOptions.LongRunning);
74            runningTasks.Add(t);
75          }
76
77          Task.WaitAll(runningTasks.ToArray());
78        }
79        finally {
80          isActive = false;
81        }
82      }
83    }
84
85    /// <summary>
86    /// Merges and resorts all Messages by their destination
87    /// </summary>
88    /// <param name="messages">a list with multiple messages to the same  destination</param>
89    /// <returns></returns>
90    private List<KeyValuePair<PeerInfo, ByteArrayWrapper[]>> MergeMessages(List<KeyValuePair<PeerInfo, ByteArrayWrapper>> messages) {
91      Dictionary<PeerInfo, List<ByteArrayWrapper>> cache = new Dictionary<PeerInfo, List<ByteArrayWrapper>>();
92      foreach (var messagePackage in messages) {
93        if (!cache.ContainsKey(messagePackage.Key)) {
94          cache.Add(messagePackage.Key, new List<ByteArrayWrapper>());
95        }
96        //for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) {
97        cache[messagePackage.Key].Add(messagePackage.Value);
98        //}
99      }
100
101      return cache.Keys.Select(dest => new KeyValuePair<PeerInfo, ByteArrayWrapper[]>(dest, cache[dest].ToArray())).ToList();
102    }
103
104    /// <summary>
105    /// sends data to another peer, is used by a single thread to run in background
106    /// </summary>
107    /// <param name="destination">destination peer</param>
108    /// <param name="data">data to send</param>
109    private void SendDataFromQueue(PeerInfo destination, ByteArrayWrapper[] data) {
110      try {
111        Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port));
112        var serviceUrl = "DistributedGA.svc";
113        var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA"));
114        var serviceUri = new Uri(baseUri, serviceUrl);
115
116        var binding = new NetTcpBinding();
117        binding.MaxReceivedMessageSize = 20000000;
118        binding.MaxBufferSize = 20000000;
119        binding.MaxBufferPoolSize = 20000000;
120        binding.ReaderQuotas.MaxArrayLength = 20000000;
121        binding.ReaderQuotas.MaxDepth = 32;
122        binding.ReaderQuotas.MaxStringContentLength = 20000000;
123
124        var endpoint = new EndpointAddress(serviceUri);
125        using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {
126          using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {
127            for (int i = 0; i < data.GetLength(0); i++) {
128              ((IMessageContract)client).SendData(myself, data[i].Array); //maybe exception...
129            }
130          }
131        }
132      }
133      catch (Exception ex) {
134        //ignore
135        Console.WriteLine(ex.Message);
136      }
137    }
138
139  }
140}
Note: See TracBrowser for help on using the repository browser.