using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.ServiceModel; using System.Threading.Tasks; using System.Timers; using DistributedGA.Core.Domain; using DistributedGA.Core.Interface; using DistributedGA.Core.Util; namespace DistributedGA.Core.Implementation { public class WcfMessageSender : IMessageSender { private PeerInfo myself; //providing two queues for faster access private SizedConcurrentQueue> bufferedMessagesWrite; private SizedConcurrentQueue> bufferedMessagesRead; private Timer timer; //sends cached messages to network in background private Object timerLock = new Object(); public void Init(PeerInfo source, int messageCacheCapacity) { myself = source; bufferedMessagesRead = new SizedConcurrentQueue>(); bufferedMessagesRead.Limit = messageCacheCapacity; bufferedMessagesWrite = new SizedConcurrentQueue>(); bufferedMessagesWrite.Limit = bufferedMessagesRead.Limit; timer = new Timer(1000 * 10); //each 10 seconds timer.Elapsed += GenerateSendingTasks; timer.Start(); } public void SendData(PeerInfo destination, byte[][] data) { bufferedMessagesWrite.Enqueue(new KeyValuePair(destination, data)); } public void Dispose() { timer.Stop(); timer.Dispose(); timer = null; } private void GenerateSendingTasks(object sender, ElapsedEventArgs e) { lock (timerLock) { //changing the queues... var tmp = bufferedMessagesRead; bufferedMessagesRead = bufferedMessagesWrite; bufferedMessagesWrite = tmp; List> messages = new List>(); while (!bufferedMessagesRead.IsEmpty) { KeyValuePair message; if (bufferedMessagesRead.TryDequeue(out message)) { messages.Add(message); //Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning); } } //now: merge them and start sending tasks List> mergedMessages = MergeMessages(messages); foreach (var item in mergedMessages) { Task.Factory.StartNew(() => SendDataFromQueue(item.Key, item.Value), TaskCreationOptions.LongRunning); } } } /// /// Merges and resorts all Messages by their destination /// /// a list with multiple messages to the same destination /// private List> MergeMessages(List> messages) { List> res = new List>(); Dictionary> cache = new Dictionary>(); foreach (var messagePackage in messages) { if (!cache.ContainsKey(messagePackage.Key)) { cache.Add(messagePackage.Key, new List()); } for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) { cache[messagePackage.Key].Add(messagePackage.Value[i]); } } //now we have a dictionary with all messages per destionation //so: create a byte[][] again and return foreach (var dest in cache.Keys) { byte[][] messagesPerDest = new byte[cache[dest].Count][]; for (int i = 0; i <= messagesPerDest.GetUpperBound(0); i++) { messagesPerDest[i] = cache[dest][i]; } res.Add(new KeyValuePair(dest, messagesPerDest)); } return res; } /// /// sends data to another peer, is used by a single thread to run in background /// /// destination peer /// data to send private void SendDataFromQueue(PeerInfo destination, byte[][] data) { try { //int arrayLength = 3; //if (data.GetUpperBound(0) > arrayLength) { // //HACK: SEND MAX 10 items // byte[][] fake = new byte[arrayLength][]; // for (int i = 0; i < arrayLength; i++) { // fake[i] = data[i]; // } // data = fake; //} Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port)); var serviceUrl = "DistributedGA.svc"; var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA")); var serviceUri = new Uri(baseUri, serviceUrl); var binding = new NetTcpBinding(); binding.MaxReceivedMessageSize = 20000000; binding.MaxBufferSize = 20000000; binding.MaxBufferPoolSize = 20000000; binding.ReaderQuotas.MaxArrayLength = 20000000; binding.ReaderQuotas.MaxDepth = 32; binding.ReaderQuotas.MaxStringContentLength = 20000000; var endpoint = new EndpointAddress(serviceUri); using (var myChannelFactory = new ChannelFactory(binding, endpoint)) { using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) { for (int i = 0; i < data.GetUpperBound(0) + 1; i++) { ((IMessageContract)client).SendData(myself, data[i]); //maybe timout exception... } } } } catch (Exception ex) { //ignore Console.WriteLine(ex.Message); } } } }