Changeset 13923 for branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs
- Timestamp:
- 06/19/16 21:37:24 (8 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs
r13918 r13923 1 1 using System; 2 using System.Collections.Concurrent; 3 using System.Collections.Generic; 2 4 using System.ServiceModel; 5 using System.Threading.Tasks; 6 using System.Timers; 3 7 using DistributedGA.Core.Domain; 4 8 using DistributedGA.Core.Interface; … … 6 10 namespace DistributedGA.Core.Implementation { 7 11 public class WcfMessageSender : IMessageSender { 12 8 13 private PeerInfo myself; 14 15 private ConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessages; 16 17 private Timer timer; //sends cached messages to network in background 9 18 10 19 public void Init(PeerInfo source) { 11 20 myself = source; 21 bufferedMessages = new ConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>(); 22 timer = new Timer(1000 * 60); //each 5 minutes 23 timer.Elapsed += GenerateSendingTasks; 24 timer.Start(); 12 25 } 13 26 14 27 public void SendData(PeerInfo destination, byte[][] data) { 28 bufferedMessages.Enqueue(new KeyValuePair<PeerInfo, byte[][]>(destination, data)); 29 } 15 30 16 var serviceUrl = "DistributedGA.svc"; 17 var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA")); 18 var serviceUri = new Uri(baseUri, serviceUrl); 31 public void Dispose() { 32 timer.Stop(); 33 timer.Dispose(); 34 timer = null; 35 } 19 36 20 var binding = new NetTcpBinding();21 var endpoint = new EndpointAddress(serviceUri);22 using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {23 using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {24 ((IMessageContract)client).SendData(myself, data); //maybe timout exception...37 private void GenerateSendingTasks(object sender, ElapsedEventArgs e) { 38 while (!bufferedMessages.IsEmpty) { 39 KeyValuePair<PeerInfo, byte[][]> message; 40 if (bufferedMessages.TryDequeue(out message)) { 41 Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning); 25 42 } 26 43 } 27 44 } 28 45 29 public void Dispose() { 46 /// <summary> 47 /// sends data to another peer, is used by a single thread to run in background 48 /// </summary> 49 /// <param name="destination">destination peer</param> 50 /// <param name="data">data to send</param> 51 private void SendDataFromQueue(PeerInfo destination, byte[][] data) { 52 try { 53 Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port)); 54 var serviceUrl = "DistributedGA.svc"; 55 var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA")); 56 var serviceUri = new Uri(baseUri, serviceUrl); 30 57 58 var binding = new NetTcpBinding(); 59 var endpoint = new EndpointAddress(serviceUri); 60 using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) { 61 using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) { 62 ((IMessageContract)client).SendData(myself, data); //maybe timout exception... 63 } 64 } 65 } 66 catch { } //if maybe sending failed (because of connection lost, etc.): just ignore 31 67 } 32 68 }
Note: See TracChangeset
for help on using the changeset viewer.