- Timestamp:
- 08/12/16 13:30:41 (8 years ago)
- Location:
- branches/thasling/DistributedGA/DistributedGA.Core/Implementation
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs
r14252 r14253 22 22 //two queues are used to gather and and provide population more efficiently 23 23 private object activeQueueLocker = new Object(); 24 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> writeQueue;25 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> readQueue;24 private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> writeQueue; 25 private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> readQueue; 26 26 27 27 //uses IMessageService for recieving population from one peer at once 28 28 private IMessageService host = null; 29 30 private double communicationRate;31 29 32 30 public event EventHandler<Exception> ExceptionOccurend; … … 40 38 }; // TODO: get own peerinfo 41 39 42 this.communicationRate = communicationRate; 43 44 writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>(); 45 readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>(); 40 writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>(); 41 readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>(); 46 42 writeQueue.Limit = messageCacheCapacity; 47 43 readQueue.Limit = writeQueue.Limit; … … 57 53 sender.Init(ownInstance, messageCacheCapacity); 58 54 59 } 60 catch (Exception ex) { 55 } catch (Exception ex) { 61 56 AddError("PeerNetworkMessageHandler.Init", ex); 62 57 } … … 68 63 sender.Dispose(); 69 64 peerListManager.Dispose(); 70 } 71 catch (Exception ex) { 65 } catch (Exception ex) { 72 66 AddError("PeerNetworkMessageHandler.Dispose", ex); 73 67 } 74 68 } 75 69 76 public void PublishDataToNetwork( byte[]data) {70 public void PublishDataToNetwork(ByteArrayWrapper data) { 77 71 try { 78 72 var allPeers = peerListManager.GetPeerList(); … … 80 74 try { 81 75 sender.SendData(peer, data); 82 } 83 catch (Exception ex) { 76 } catch (Exception ex) { 84 77 AddError("PeerNetworkMessageHandler.PublishDataToNetwork(during sending to one peer!)", ex); 85 78 } 86 79 } 87 80 88 } 89 catch (Exception ex) { 81 } catch (Exception ex) { 90 82 AddError("PeerNetworkMessageHandler.PublishDataToNetwork", ex); 91 83 } 92 84 } 93 85 94 public List<KeyValuePair<PeerInfo, byte[]>> GetDataFromNetwork() {86 public List<KeyValuePair<PeerInfo, ByteArrayWrapper>> GetDataFromNetwork() { 95 87 try { 96 List<KeyValuePair<PeerInfo, byte[]>> res = new List<KeyValuePair<PeerInfo, byte[]>>();97 KeyValuePair<PeerInfo, byte[]> item;88 List<KeyValuePair<PeerInfo, ByteArrayWrapper>> res = new List<KeyValuePair<PeerInfo, ByteArrayWrapper>>(); 89 KeyValuePair<PeerInfo, ByteArrayWrapper> item; 98 90 lock (activeQueueLocker) { 99 91 //changing the current queue for storing items to send … … 111 103 } 112 104 return res;//.ToArray(); 113 } 114 catch (Exception ex) { 105 } catch (Exception ex) { 115 106 AddError("PeerNetworkMessageHandler.GetDataFromNetwork", ex); 116 107 return null; … … 142 133 .Select(ip => ip.ToString()) 143 134 .First(str => str.StartsWith(ipPrefix)); 144 } 145 catch { return null; } 135 } catch { return null; } 146 136 } 147 137 … … 149 139 if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) { 150 140 lock (activeQueueLocker) { 151 writeQueue.Enqueue(new KeyValuePair<PeerInfo, byte[]>(e.Sender, e.data));141 writeQueue.Enqueue(new KeyValuePair<PeerInfo, ByteArrayWrapper>(e.Sender, ByteArrayWrapper.CreateByteArrayWrapper(e.data))); 152 142 } 153 143 … … 160 150 try { 161 151 peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message)); 162 } 163 catch { } 152 } catch { } 164 153 } 165 154 } -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs
r14252 r14253 1 1 using System; 2 using System.CodeDom; 2 3 using System.Collections.Concurrent; 3 4 using System.Collections.Generic; 5 using System.Linq; 4 6 using System.ServiceModel; 5 7 using System.Threading.Tasks; … … 15 17 16 18 //providing two queues for faster access 17 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesWrite;18 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesRead;19 private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> bufferedMessagesWrite; 20 private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> bufferedMessagesRead; 19 21 20 22 private Timer timer; //sends cached messages to network in background 21 23 22 24 private Object timerLock = new Object(); 25 private bool isActive = false; 23 26 24 27 public void Init(PeerInfo source, int messageCacheCapacity) { 25 28 myself = source; 26 bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();29 bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>(); 27 30 bufferedMessagesRead.Limit = messageCacheCapacity; 28 bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();31 bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>(); 29 32 bufferedMessagesWrite.Limit = bufferedMessagesRead.Limit; 30 33 timer = new Timer(1000 * 10); //each 10 seconds … … 33 36 } 34 37 35 public void SendData(PeerInfo destination, byte[]data) {36 bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[]>(destination, data));38 public void SendData(PeerInfo destination, ByteArrayWrapper data) { 39 bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, ByteArrayWrapper>(destination, data)); 37 40 } 38 41 … … 44 47 45 48 private void GenerateSendingTasks(object sender, ElapsedEventArgs e) { 49 if (isActive) return; 46 50 lock (timerLock) { 47 //changing the queues... 48 var tmp = bufferedMessagesRead; 49 bufferedMessagesRead = bufferedMessagesWrite; 50 bufferedMessagesWrite = tmp; 51 List<KeyValuePair<PeerInfo, byte[]>> messages = new List<KeyValuePair<PeerInfo, byte[]>>(); 52 while (!bufferedMessagesRead.IsEmpty) { 53 KeyValuePair<PeerInfo, byte[]> message; 54 if (bufferedMessagesRead.TryDequeue(out message)) { 55 messages.Add(message); 56 //Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning); 51 try { 52 isActive = true; 53 //changing the queues... 54 var tmp = bufferedMessagesRead; 55 bufferedMessagesRead = bufferedMessagesWrite; 56 bufferedMessagesWrite = tmp; 57 List<KeyValuePair<PeerInfo, ByteArrayWrapper>> messages = new List<KeyValuePair<PeerInfo, ByteArrayWrapper>>(); 58 while (!bufferedMessagesRead.IsEmpty) { 59 KeyValuePair<PeerInfo, ByteArrayWrapper> message; 60 if (bufferedMessagesRead.TryDequeue(out message)) { 61 messages.Add(message); 62 //Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning); 63 } 57 64 } 58 } 59 //now: merge them and start sending tasks 65 //now: merge them and start sending tasks 60 66 61 List<KeyValuePair<PeerInfo, byte[][]>> mergedMessages = MergeMessages(messages); 62 foreach (var item in mergedMessages) { 63 Task.Factory.StartNew(() => SendDataFromQueue(item.Key, item.Value), TaskCreationOptions.LongRunning); 67 List<KeyValuePair<PeerInfo, ByteArrayWrapper[]>> mergedMessages = MergeMessages(messages); 68 69 var runningTasks = new List<Task>(); 70 foreach (var item in mergedMessages) { 71 var itemToSend = item; 72 var t = Task.Factory.StartNew(() => SendDataFromQueue(itemToSend.Key, itemToSend.Value), 73 TaskCreationOptions.LongRunning); 74 runningTasks.Add(t); 75 } 76 77 Task.WaitAll(runningTasks.ToArray()); 78 } finally { 79 isActive = false; 64 80 } 65 81 } … … 71 87 /// <param name="messages">a list with multiple messages to the same destination</param> 72 88 /// <returns></returns> 73 private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[]>> messages) { 74 List<KeyValuePair<PeerInfo, byte[][]>> res = new List<KeyValuePair<PeerInfo, byte[][]>>(); 75 Dictionary<PeerInfo, List<byte[]>> cache = new Dictionary<PeerInfo, List<byte[]>>(); 89 private List<KeyValuePair<PeerInfo, ByteArrayWrapper[]>> MergeMessages(List<KeyValuePair<PeerInfo, ByteArrayWrapper>> messages) { 90 Dictionary<PeerInfo, List<ByteArrayWrapper>> cache = new Dictionary<PeerInfo, List<ByteArrayWrapper>>(); 76 91 foreach (var messagePackage in messages) { 77 92 if (!cache.ContainsKey(messagePackage.Key)) { 78 cache.Add(messagePackage.Key, new List< byte[]>());93 cache.Add(messagePackage.Key, new List<ByteArrayWrapper>()); 79 94 } 80 95 //for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) { … … 82 97 //} 83 98 } 84 //now we have a dictionary with all messages per destionation 85 //so: create a byte[][] again and return 86 foreach (var dest in cache.Keys) { 87 byte[][] messagesPerDest = new byte[cache[dest].Count][]; 88 for (int i = 0; i <= messagesPerDest.GetUpperBound(0); i++) { 89 messagesPerDest[i] = cache[dest][i]; 90 } 91 res.Add(new KeyValuePair<PeerInfo, byte[][]>(dest, messagesPerDest)); 92 } 93 return res; 99 100 return cache.Keys.Select(dest => new KeyValuePair<PeerInfo, ByteArrayWrapper[]>(dest, cache[dest].ToArray())).ToList(); 94 101 } 95 102 … … 99 106 /// <param name="destination">destination peer</param> 100 107 /// <param name="data">data to send</param> 101 private void SendDataFromQueue(PeerInfo destination, byte[][] data) {108 private void SendDataFromQueue(PeerInfo destination, ByteArrayWrapper[] data) { 102 109 try { 103 110 Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port)); … … 117 124 using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) { 118 125 using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) { 119 for (int i = 0; i < data.Get UpperBound(0) + 1; i++) {120 ((IMessageContract)client).SendData(myself, data[i] ); //maybe exception...126 for (int i = 0; i < data.GetLength(0); i++) { 127 ((IMessageContract)client).SendData(myself, data[i].Array); //maybe exception... 121 128 } 122 129 } 123 130 } 124 } 125 catch (Exception ex) { 131 } catch (Exception ex) { 126 132 //ignore 127 133 Console.WriteLine(ex.Message); -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfPeerListManager.cs
r14252 r14253 58 58 } 59 59 60 private List<PeerInfo> ChoosePeersForMessaging( refList<PeerInfo> allPeers) {60 private List<PeerInfo> ChoosePeersForMessaging(List<PeerInfo> allPeers) { 61 61 Shuffle<PeerInfo>(allPeers); 62 62 int toTake = Convert.ToInt32(allPeers.Count * communicationRate) + 1; … … 64 64 toTake = 1; 65 65 } 66 return allPeers.Take(toTake).ToList(); ;66 return allPeers.Take(toTake).ToList(); 67 67 } 68 68 … … 89 89 } 90 90 } 91 cachedPeerList = ChoosePeersForMessaging( refallPeers);91 cachedPeerList = ChoosePeersForMessaging(allPeers); 92 92 } 93 93 catch { } //nothing to do
Note: See TracChangeset
for help on using the changeset viewer.