using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.ServiceModel; using System.Threading.Tasks; using System.Timers; using DistributedGA.Core.Domain; using DistributedGA.Core.Interface; using DistributedGA.Core.Util; namespace DistributedGA.Core.Implementation { public class WcfMessageSender : IMessageSender { private PeerInfo myself; private SizedConcurrentQueue> bufferedMessages; private Timer timer; //sends cached messages to network in background public void Init(PeerInfo source) { myself = source; bufferedMessages = new SizedConcurrentQueue>(); bufferedMessages.Limit = 1000; timer = new Timer(1000 * 60); //each 5 minutes timer.Elapsed += GenerateSendingTasks; timer.Start(); } public void SendData(PeerInfo destination, byte[][] data) { bufferedMessages.Enqueue(new KeyValuePair(destination, data)); } public void Dispose() { timer.Stop(); timer.Dispose(); timer = null; } private void GenerateSendingTasks(object sender, ElapsedEventArgs e) { while (!bufferedMessages.IsEmpty) { KeyValuePair message; if (bufferedMessages.TryDequeue(out message)) { Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning); } } } /// /// sends data to another peer, is used by a single thread to run in background /// /// destination peer /// data to send private void SendDataFromQueue(PeerInfo destination, byte[][] data) { try { Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port)); var serviceUrl = "DistributedGA.svc"; var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA")); var serviceUri = new Uri(baseUri, serviceUrl); var binding = new NetTcpBinding(); var endpoint = new EndpointAddress(serviceUri); using (var myChannelFactory = new ChannelFactory(binding, endpoint)) { using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) { ((IMessageContract)client).SendData(myself, data); //maybe timout exception... } } } catch { } //if maybe sending failed (because of connection lost, etc.): just ignore } } }