Changeset 13923 for branches/thasling/DistributedGA
- Timestamp:
- 06/19/16 21:37:24 (9 years ago)
- Location:
- branches/thasling/DistributedGA
- Files:
-
- 3 edited
- 1 copied
Legend:
- Unmodified
- Added
- Removed
-
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs
r13918 r13923 1 1 using System; 2 using System.Collections.Concurrent; 3 using System.Collections.Generic; 2 4 using System.ServiceModel; 5 using System.Threading.Tasks; 6 using System.Timers; 3 7 using DistributedGA.Core.Domain; 4 8 using DistributedGA.Core.Interface; … … 6 10 namespace DistributedGA.Core.Implementation { 7 11 public class WcfMessageSender : IMessageSender { 12 8 13 private PeerInfo myself; 14 15 private ConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessages; 16 17 private Timer timer; //sends cached messages to network in background 9 18 10 19 public void Init(PeerInfo source) { 11 20 myself = source; 21 bufferedMessages = new ConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>(); 22 timer = new Timer(1000 * 60); //each 5 minutes 23 timer.Elapsed += GenerateSendingTasks; 24 timer.Start(); 12 25 } 13 26 14 27 public void SendData(PeerInfo destination, byte[][] data) { 28 bufferedMessages.Enqueue(new KeyValuePair<PeerInfo, byte[][]>(destination, data)); 29 } 15 30 16 var serviceUrl = "DistributedGA.svc"; 17 var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA")); 18 var serviceUri = new Uri(baseUri, serviceUrl); 31 public void Dispose() { 32 timer.Stop(); 33 timer.Dispose(); 34 timer = null; 35 } 19 36 20 var binding = new NetTcpBinding();21 var endpoint = new EndpointAddress(serviceUri);22 using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {23 using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {24 ((IMessageContract)client).SendData(myself, data); //maybe timout exception...37 private void GenerateSendingTasks(object sender, ElapsedEventArgs e) { 38 while (!bufferedMessages.IsEmpty) { 39 KeyValuePair<PeerInfo, byte[][]> message; 40 if (bufferedMessages.TryDequeue(out message)) { 41 Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning); 25 42 } 26 43 } 27 44 } 28 45 29 public void Dispose() { 46 /// <summary> 47 /// sends data to another peer, is used by a single thread to run in background 48 /// </summary> 49 /// <param name="destination">destination peer</param> 50 /// <param name="data">data to send</param> 51 private void SendDataFromQueue(PeerInfo destination, byte[][] data) { 52 try { 53 Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port)); 54 var serviceUrl = "DistributedGA.svc"; 55 var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA")); 56 var serviceUri = new Uri(baseUri, serviceUrl); 30 57 58 var binding = new NetTcpBinding(); 59 var endpoint = new EndpointAddress(serviceUri); 60 using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) { 61 using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) { 62 ((IMessageContract)client).SendData(myself, data); //maybe timout exception... 63 } 64 } 65 } 66 catch { } //if maybe sending failed (because of connection lost, etc.): just ignore 31 67 } 32 68 } -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageService.cs
r13887 r13923 31 31 var serviceUri = new Uri(baseUri, serviceUrl); 32 32 NetTcpBinding binding = new NetTcpBinding(); 33 //using (var host = new ServiceHost(typeof(MessageContractImpl), serviceUri))34 33 35 34 using (var host = new ServiceHost(messageContract, serviceUri)) … … 42 41 } 43 42 }).Start(); 44 //close service again:45 // _ResetEvent.Set();46 43 return port; 47 44 } -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfPeerListManager.cs
r13918 r13923 7 7 using System.Timers; 8 8 9 namespace DistributedGA.Core.Implementation 10 { 11 public class WcfPeerListManager : IPeerListManager 12 { 13 private string serverString = null; 9 namespace DistributedGA.Core.Implementation { 10 public class WcfPeerListManager : IPeerListManager { 14 11 15 private IContactService client= null;12 private string serverString = null; 16 13 17 private IContactService heartbeatClient= null;14 private PeerInfo myself = null; 18 15 19 private PeerInfo myself = null;16 private Timer timer = null; //sends heartbeat to contact-server 20 17 21 private Timer timer = null; //sends heartbeat to contact-server18 private ChannelFactory<IContactService> myChannelFactory; 22 19 23 public void Init(PeerInfo source, string contactServerUrl) 24 { 25 serverString = contactServerUrl; 26 client = CreateClient(); 27 heartbeatClient = CreateClient(); 28 myself = source; 29 client.RegisterPeer(source); 30 timer = new Timer(1000 ); //each 5 minutes 31 timer.Elapsed += SendHeartbeatToServer; 32 timer.Start(); 33 } 20 private IContactService client; 21 22 private IContactService heartbeatClient; 23 24 public void Init(PeerInfo source, string contactServerUrl) { 25 serverString = contactServerUrl; 26 myself = source; 27 //Init ChannelFactory and Clients 28 var binding = new NetTcpBinding(); 29 var endpoint = new EndpointAddress(serverString); 30 myChannelFactory = new ChannelFactory<IContactService>(binding, endpoint); 31 client = myChannelFactory.CreateChannel(); 32 heartbeatClient = myChannelFactory.CreateChannel(); 33 //Register Peer 34 client.RegisterPeer(source); 35 //Start heartbeat timer 36 timer = new Timer(1000); //each 5 minutes 37 timer.Elapsed += SendHeartbeatToServer; 38 timer.Start(); 39 } 40 41 public List<PeerInfo> GetPeerList() { 42 try { 43 var allPeers = client.GetPeerList(myself); //maybe timout exception... 44 var peersForMessaging = ChoosePeersForMessaging(allPeers); 45 //return peersForMessaging; 46 return allPeers; //TODO: Enable 10% list communication 47 } 48 catch { } //if maybe sending failed (because of connection lost, etc.): just ignore 49 return new List<PeerInfo>(); 50 } 51 52 public void SendLogToServer(string msg) { 53 client.MakeLog(myself, msg); 54 } 55 56 public void Dispose() { 57 timer.Stop(); 58 timer.Dispose(); 59 ((IClientChannel)client).Close(); 60 ((IClientChannel)heartbeatClient).Close(); 61 myChannelFactory.Close(); 62 } 34 63 35 64 65 private List<PeerInfo> ChoosePeersForMessaging(List<PeerInfo> allPeers) { 66 //communicate with 10% of the network 67 int noOfPeers = allPeers.Count / 10; 68 List<int> indexList = GetRandomItemIndexes(noOfPeers, 0, allPeers.Count - 1); 69 List<PeerInfo> res = new List<PeerInfo>(); 70 foreach (int index in indexList) { 71 res.Add(allPeers.ElementAt(index)); 72 } 73 return res; 74 } 36 75 37 public List<PeerInfo> GetPeerList() 38 { 39 var allPeers = client.GetPeerList(myself); 40 var peersForMessaging = ChoosePeersForMessaging(allPeers); 41 //return peersForMessaging; 42 return allPeers; //TODO: Enable 10% list communication 76 private List<int> GetRandomItemIndexes(int noOfItems, int minValue, int maxValue) { 77 List<int> res = new List<int>(); 78 Random rnd = new Random(); 79 int tmp = -1; 80 while (res.Count < noOfItems) { 81 tmp = rnd.Next(minValue, maxValue); 82 if (!res.Contains(tmp)) { 83 res.Add(tmp); 43 84 } 85 } 86 return res; 87 } 44 88 45 public void SendLogToServer(string msg) 46 { 47 client.MakeLog(myself, msg); 48 } 89 private void SendHeartbeatToServer(object sender, ElapsedEventArgs e) { 90 try { 91 heartbeatClient.UpdateHeartbeat(myself); 92 } 93 catch { } //nothing to do 94 } 49 95 50 private IContactService CreateClient() 51 { 52 var binding = new NetTcpBinding(); 53 var endpoint = new EndpointAddress(serverString); 54 var myChannelFactory = new ChannelFactory<IContactService>(binding, endpoint); 55 56 IContactService client = null; 57 client = myChannelFactory.CreateChannel(); 58 return client; 59 } 60 61 private List<PeerInfo> ChoosePeersForMessaging(List<PeerInfo> allPeers) 62 { 63 //communicate with 10% of the network 64 int noOfPeers = allPeers.Count / 10; 65 List<int> indexList = GetRandomItemIndexes(noOfPeers, 0, allPeers.Count - 1); 66 List<PeerInfo> res = new List<PeerInfo>(); 67 foreach (int index in indexList) 68 { 69 res.Add(allPeers.ElementAt(index)); 70 } 71 return res; 72 } 73 74 private List<int> GetRandomItemIndexes(int noOfItems, int minValue, int maxValue) 75 { 76 List<int> res = new List<int>(); 77 Random rnd = new Random(); 78 int tmp = -1; 79 while (res.Count < noOfItems) 80 { 81 tmp = rnd.Next(minValue, maxValue); 82 if (!res.Contains(tmp)) 83 { 84 res.Add(tmp); 85 } 86 } 87 return res; 88 } 89 90 private void SendHeartbeatToServer(object sender, ElapsedEventArgs e) 91 { 92 try 93 { 94 heartbeatClient.UpdateHeartbeat(myself); 95 } 96 catch { } //nothing to do, exception is raised when getting peer list 97 } 98 99 100 101 public void Dispose() 102 { 103 timer.Stop(); 104 timer.Dispose(); 105 timer = null; 106 } 107 } 96 } 108 97 }
Note: See TracChangeset
for help on using the changeset viewer.