Changeset 13959
- Timestamp:
- 06/29/16 22:23:58 (8 years ago)
- Location:
- branches/thasling/DistributedGA
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs
r13956 r13959 14 14 private PeerInfo myself; 15 15 16 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessages; 16 //providing two queues for faster access 17 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessagesWrite; 18 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessagesRead; 19 17 20 18 21 private Timer timer; //sends cached messages to network in background … … 22 25 public void Init(PeerInfo source, int messageCacheCapacity) { 23 26 myself = source; 24 bufferedMessages = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>(); 25 bufferedMessages.Limit = messageCacheCapacity; 27 bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>(); 28 bufferedMessagesRead.Limit = messageCacheCapacity; 29 bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>(); 30 bufferedMessagesWrite.Limit = bufferedMessagesRead.Limit; 26 31 timer = new Timer(1000 * 60); //each 5 minutes 27 32 timer.Elapsed += GenerateSendingTasks; … … 30 35 31 36 public void SendData(PeerInfo destination, byte[][] data) { 32 bufferedMessages .Enqueue(new KeyValuePair<PeerInfo, byte[][]>(destination, data));37 bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[][]>(destination, data)); 33 38 } 34 39 … … 40 45 41 46 private void GenerateSendingTasks(object sender, ElapsedEventArgs e) { 42 while (!bufferedMessages.IsEmpty) { 43 KeyValuePair<PeerInfo, byte[][]> message; 44 if (bufferedMessages.TryDequeue(out message)) { 45 Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning); 47 lock (timerLock) { 48 //changing the queues... 49 var tmp = bufferedMessagesRead; 50 bufferedMessagesRead = bufferedMessagesWrite; 51 bufferedMessagesWrite = tmp; 52 List<KeyValuePair<PeerInfo, byte[][]>> messages = new List<KeyValuePair<PeerInfo, byte[][]>>(); 53 while (!bufferedMessagesRead.IsEmpty) { 54 KeyValuePair<PeerInfo, byte[][]> message; 55 if (bufferedMessagesRead.TryDequeue(out message)) { 56 messages.Add(message); 57 //Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning); 58 } 59 } 60 //now: merge them and start sending tasks 61 62 List<KeyValuePair<PeerInfo, byte[][]>> mergedMessages = MergeMessages(messages); 63 foreach (var item in mergedMessages) { 64 Task.Factory.StartNew(() => SendDataFromQueue(item.Key, item.Value), TaskCreationOptions.LongRunning); 46 65 } 47 66 } 67 } 68 69 /// <summary> 70 /// Merges and resorts all Messages by their destination 71 /// </summary> 72 /// <param name="messages">a list with multiple messages to the same destination</param> 73 /// <returns></returns> 74 private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[][]>> messages) { 75 List<KeyValuePair<PeerInfo, byte[][]>> res = new List<KeyValuePair<PeerInfo, byte[][]>>(); 76 Dictionary<PeerInfo, List<byte[]>> cache = new Dictionary<PeerInfo, List<byte[]>>(); 77 foreach (var messagePackage in messages) { 78 if (!cache.ContainsKey(messagePackage.Key)) { 79 cache.Add(messagePackage.Key, new List<byte[]>()); 80 } 81 for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) { 82 cache[messagePackage.Key].Add(messagePackage.Value[i]); 83 } 84 } 85 //now we have a dictionary with all messages per destionation 86 //so: create a byte[][] again and return 87 foreach (var dest in cache.Keys) { 88 byte[][] messagesPerDest = new byte[cache[dest].Count][]; 89 for (int i = 0; i <= messagesPerDest.GetUpperBound(0); i++) { 90 messagesPerDest[i] = cache[dest][i]; 91 } 92 res.Add(new KeyValuePair<PeerInfo, byte[][]>(dest, messagesPerDest)); 93 } 94 return res; 48 95 } 49 96 … … 54 101 /// <param name="data">data to send</param> 55 102 private void SendDataFromQueue(PeerInfo destination, byte[][] data) { 56 lock (timerLock) { 57 try { 58 Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port)); 59 var serviceUrl = "DistributedGA.svc"; 60 var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA")); 61 var serviceUri = new Uri(baseUri, serviceUrl); 103 try { 104 int arrayLength = 3; 105 if (data.GetUpperBound(0) > arrayLength) { 106 //HACK: SEND MAX 10 items 107 byte[][] fake = new byte[arrayLength][]; 108 for (int i = 0; i < arrayLength; i++) { 109 fake[i] = data[i]; 110 } 111 data = fake; 112 } 62 113 63 var binding = new NetTcpBinding(); 64 var endpoint = new EndpointAddress(serviceUri); 65 using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) { 66 using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) { 67 ((IMessageContract)client).SendData(myself, data); //maybe timout exception... 68 } 114 115 Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port)); 116 var serviceUrl = "DistributedGA.svc"; 117 var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA")); 118 var serviceUri = new Uri(baseUri, serviceUrl); 119 120 var binding = new NetTcpBinding(); 121 binding.ReaderQuotas.MaxArrayLength = 2147483647; 122 binding.ReaderQuotas.MaxNameTableCharCount = 2147483647; 123 binding.ReaderQuotas.MaxBytesPerRead = 2147483647; 124 binding.ReaderQuotas.MaxStringContentLength = 2147483647; 125 126 var endpoint = new EndpointAddress(serviceUri); 127 using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) { 128 using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) { 129 ((IMessageContract)client).SendData(myself, data); //maybe timout exception... 69 130 } 70 131 } 71 catch { } //if maybe sending failed (because of connection lost, etc.): just ignore 132 } 133 catch (Exception ex) { 134 Console.WriteLine(ex.Message); 72 135 } 73 136 } -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfPeerListManager.cs
r13956 r13959 72 72 private List<PeerInfo> ChoosePeersForMessaging(List<PeerInfo> allPeers) { 73 73 //communicate with 10% of the network 74 int noOfPeers = allPeers.Count / (100 / communicationRate);74 int noOfPeers = allPeers.Count / (100 / communicationRate); 75 75 List<int> indexList = GetRandomItemIndexes(noOfPeers, 0, allPeers.Count - 1); 76 76 List<PeerInfo> res = new List<PeerInfo>(); … … 86 86 int tmp = -1; 87 87 while (res.Count < noOfItems) { 88 tmp = rnd.Next(minValue, maxValue );88 tmp = rnd.Next(minValue, maxValue + 1); 89 89 if (!res.Contains(tmp)) { 90 90 res.Add(tmp); -
branches/thasling/DistributedGA/DistributedGA.Hive/P2PMigrationAnalyzer.cs
r13957 r13959 53 53 get { return (ILookupParameter<IntValue>)Parameters["CommunicationRate"]; } 54 54 } 55 public ILookupParameter<IntValue> MessageCacheCapacityParameter { 55 public ILookupParameter<IntValue> MessageCacheCapacityParameter { 56 56 get { return (ILookupParameter<IntValue>)Parameters["MessageCacheCapacity"]; } 57 57 }
Note: See TracChangeset
for help on using the changeset viewer.