source: branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs @ 13970

Last change on this file since 13970 was 13970, checked in by thasling, 5 years ago

#2615:
made minor changes

File size: 5.4 KB
Line 
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.ServiceModel;
5using System.Threading.Tasks;
6using System.Timers;
7using DistributedGA.Core.Domain;
8using DistributedGA.Core.Interface;
9using DistributedGA.Core.Util;
10
11namespace DistributedGA.Core.Implementation {
12  public class WcfMessageSender : IMessageSender {
13
14    private PeerInfo myself;
15
16    //providing two queues for faster access
17    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessagesWrite;
18    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessagesRead;
19
20
21    private Timer timer; //sends cached messages to network in background
22
23    private Object timerLock = new Object();
24
25    public void Init(PeerInfo source, int messageCacheCapacity) {
26      myself = source;
27      bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>();
28      bufferedMessagesRead.Limit = messageCacheCapacity;
29      bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>();
30      bufferedMessagesWrite.Limit = bufferedMessagesRead.Limit;
31      timer = new Timer(1000 * 10); //each 10 seconds
32      timer.Elapsed += GenerateSendingTasks;
33      timer.Start();
34    }
35
36    public void SendData(PeerInfo destination, byte[][] data) {
37      bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[][]>(destination, data));
38    }
39
40    public void Dispose() {
41      timer.Stop();
42      timer.Dispose();
43      timer = null;
44    }
45
46    private void GenerateSendingTasks(object sender, ElapsedEventArgs e) {
47      lock (timerLock) {
48        //changing the queues...
49        var tmp = bufferedMessagesRead;
50        bufferedMessagesRead = bufferedMessagesWrite;
51        bufferedMessagesWrite = tmp;
52        List<KeyValuePair<PeerInfo, byte[][]>> messages = new List<KeyValuePair<PeerInfo, byte[][]>>();
53        while (!bufferedMessagesRead.IsEmpty) {
54          KeyValuePair<PeerInfo, byte[][]> message;
55          if (bufferedMessagesRead.TryDequeue(out message)) {
56            messages.Add(message);
57            //Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning);
58          }
59        }
60        //now: merge them and start sending tasks
61
62        List<KeyValuePair<PeerInfo, byte[][]>> mergedMessages = MergeMessages(messages);
63        foreach (var item in mergedMessages) {
64          Task.Factory.StartNew(() => SendDataFromQueue(item.Key, item.Value), TaskCreationOptions.LongRunning);
65        }
66      }
67    }
68
69    /// <summary>
70    /// Merges and resorts all Messages by their destination
71    /// </summary>
72    /// <param name="messages">a list with multiple messages to the same  destination</param>
73    /// <returns></returns>
74    private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[][]>> messages) {
75      List<KeyValuePair<PeerInfo, byte[][]>> res = new List<KeyValuePair<PeerInfo, byte[][]>>();
76      Dictionary<PeerInfo, List<byte[]>> cache = new Dictionary<PeerInfo, List<byte[]>>();
77      foreach (var messagePackage in messages) {
78        if (!cache.ContainsKey(messagePackage.Key)) {
79          cache.Add(messagePackage.Key, new List<byte[]>());
80        }
81        for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) {
82          cache[messagePackage.Key].Add(messagePackage.Value[i]);
83        }
84      }
85      //now we have a dictionary with all messages per destionation
86      //so: create a byte[][] again and return
87      foreach (var dest in cache.Keys) {
88        byte[][] messagesPerDest = new byte[cache[dest].Count][];
89        for (int i = 0; i <= messagesPerDest.GetUpperBound(0); i++) {
90          messagesPerDest[i] = cache[dest][i];
91        }
92        res.Add(new KeyValuePair<PeerInfo, byte[][]>(dest, messagesPerDest));
93      }
94      return res;
95    }
96
97    /// <summary>
98    /// sends data to another peer, is used by a single thread to run in background
99    /// </summary>
100    /// <param name="destination">destination peer</param>
101    /// <param name="data">data to send</param>
102    private void SendDataFromQueue(PeerInfo destination, byte[][] data) {
103      try {
104        int arrayLength = 3;
105        if (data.GetUpperBound(0) > arrayLength) {
106          //HACK: SEND MAX 10 items
107          byte[][] fake = new byte[arrayLength][];
108          for (int i = 0; i < arrayLength; i++) {
109            fake[i] = data[i];
110          }
111          data = fake;
112        }
113
114
115        Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port));
116        var serviceUrl = "DistributedGA.svc";
117        var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA"));
118        var serviceUri = new Uri(baseUri, serviceUrl);
119
120        var binding = new NetTcpBinding();
121        var endpoint = new EndpointAddress(serviceUri);
122        using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {
123          using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {
124            ((IMessageContract)client).SendData(myself, data); //maybe timout exception...
125          }
126        }
127      }
128      catch (Exception ex) {
129        //ignore
130        Console.WriteLine(ex.Message);
131      }
132    }
133
134  }
135}
Note: See TracBrowser for help on using the repository browser.