Changeset 13972 for branches/thasling/DistributedGA/DistributedGA.Core
- Timestamp:
- 07/01/16 20:24:52 (8 years ago)
- Location:
- branches/thasling/DistributedGA/DistributedGA.Core
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs
r13971 r13972 22 22 //two queues are used to gather and and provide population more efficiently 23 23 private object activeQueueLocker = new Object(); 24 private SizedConcurrentQueue< byte[]> writeQueue;25 private SizedConcurrentQueue< byte[]> readQueue;24 private SizedConcurrentQueue<KeyValuePair<PeerInfo,byte[]>> writeQueue; 25 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> readQueue; 26 26 27 27 //uses IMessageService for recieving population from one peer at once … … 38 38 }; // TODO: get own peerinfo 39 39 40 writeQueue = new SizedConcurrentQueue< byte[]>();41 readQueue = new SizedConcurrentQueue< byte[]>();40 writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>(); 41 readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo,byte[]>>(); 42 42 writeQueue.Limit = messageCacheCapacity; 43 43 readQueue.Limit = writeQueue.Limit; … … 70 70 } 71 71 72 public void PublishDataToNetwork(byte[] []data) {72 public void PublishDataToNetwork(byte[] data) { 73 73 try { 74 74 foreach (PeerInfo peer in peerListManager.GetPeerList()) { … … 87 87 } 88 88 89 public byte[][]GetDataFromNetwork() {89 public List<KeyValuePair<PeerInfo, byte[]>> GetDataFromNetwork() { 90 90 try { 91 List< byte[]> res = new List<byte[]>();92 byte[] item = null;91 List<KeyValuePair<PeerInfo, byte[]>> res = new List<KeyValuePair<PeerInfo, byte[]>>(); 92 KeyValuePair<PeerInfo,byte[]> item; 93 93 lock (activeQueueLocker) { 94 94 //changing the current queue for storing items to send … … 105 105 } 106 106 } 107 return res .ToArray();107 return res;//.ToArray(); 108 108 } 109 109 catch (Exception ex) { … … 146 146 if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) { 147 147 lock (activeQueueLocker) { 148 writeQueue.Enqueue( e.data);148 writeQueue.Enqueue(new KeyValuePair<PeerInfo, byte[]>(e.Sender, e.data)); 149 149 } 150 150 -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs
r13971 r13972 15 15 16 16 //providing two queues for faster access 17 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[] []>> bufferedMessagesWrite;18 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[] []>> bufferedMessagesRead;17 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesWrite; 18 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesRead; 19 19 20 20 … … 25 25 public void Init(PeerInfo source, int messageCacheCapacity) { 26 26 myself = source; 27 bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[] []>>();27 bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>(); 28 28 bufferedMessagesRead.Limit = messageCacheCapacity; 29 bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[] []>>();29 bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>(); 30 30 bufferedMessagesWrite.Limit = bufferedMessagesRead.Limit; 31 31 timer = new Timer(1000 * 10); //each 10 seconds … … 34 34 } 35 35 36 public void SendData(PeerInfo destination, byte[] []data) {37 bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[] []>(destination, data));36 public void SendData(PeerInfo destination, byte[] data) { 37 bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[]>(destination, data)); 38 38 } 39 39 … … 50 50 bufferedMessagesRead = bufferedMessagesWrite; 51 51 bufferedMessagesWrite = tmp; 52 List<KeyValuePair<PeerInfo, byte[] []>> messages = new List<KeyValuePair<PeerInfo, byte[][]>>();52 List<KeyValuePair<PeerInfo, byte[]>> messages = new List<KeyValuePair<PeerInfo, byte[]>>(); 53 53 while (!bufferedMessagesRead.IsEmpty) { 54 KeyValuePair<PeerInfo, byte[] []> message;54 KeyValuePair<PeerInfo, byte[]> message; 55 55 if (bufferedMessagesRead.TryDequeue(out message)) { 56 56 messages.Add(message); … … 72 72 /// <param name="messages">a list with multiple messages to the same destination</param> 73 73 /// <returns></returns> 74 private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[] []>> messages) {74 private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[]>> messages) { 75 75 List<KeyValuePair<PeerInfo, byte[][]>> res = new List<KeyValuePair<PeerInfo, byte[][]>>(); 76 76 Dictionary<PeerInfo, List<byte[]>> cache = new Dictionary<PeerInfo, List<byte[]>>(); … … 79 79 cache.Add(messagePackage.Key, new List<byte[]>()); 80 80 } 81 for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) {82 cache[messagePackage.Key].Add(messagePackage.Value [i]);83 }81 //for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) { 82 cache[messagePackage.Key].Add(messagePackage.Value); 83 //} 84 84 } 85 85 //now we have a dictionary with all messages per destionation -
branches/thasling/DistributedGA/DistributedGA.Core/Interface/IMessageHandler.cs
r13969 r13972 9 9 void Init(string lanIpPrefix, string contactServerUrl, string problemInstance, int messageCacheCapacty, double communicationRate); 10 10 11 void PublishDataToNetwork(byte[] []data);11 void PublishDataToNetwork(byte[] data); 12 12 13 byte[][]GetDataFromNetwork();13 List<KeyValuePair<PeerInfo, byte[]>> GetDataFromNetwork(); 14 14 15 15 PeerInfo GetPeerInfo(); -
branches/thasling/DistributedGA/DistributedGA.Core/Interface/IMessageSender.cs
r13956 r13972 7 7 void Init(PeerInfo source, int messageCacheCapacity); 8 8 9 void SendData(PeerInfo destination, byte[] []data);9 void SendData(PeerInfo destination, byte[] data); 10 10 11 11 void Dispose();
Note: See TracChangeset
for help on using the changeset viewer.