Changeset 13972
- Timestamp:
- 07/01/16 20:24:52 (8 years ago)
- Location:
- branches/thasling/DistributedGA
- Files:
-
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/thasling/DistributedGA/DistributedGA.Core.Host/Program.cs
r13956 r13972 31 31 var message = CreateMessage(pi, i); 32 32 Console.WriteLine("Publishing messages..."); 33 h.PublishDataToNetwork(message);33 //h.PublishDataToNetwork(message); 34 34 Console.WriteLine("Messages published."); 35 35 Console.WriteLine("Recieved messages:"); 36 36 foreach (var item in h.GetDataFromNetwork()) { 37 Console.WriteLine(string.Format("Message:{0}", GetString(item)));37 //Console.WriteLine(string.Format("Message:{0}", GetString(item))); 38 38 } 39 39 } -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs
r13971 r13972 22 22 //two queues are used to gather and and provide population more efficiently 23 23 private object activeQueueLocker = new Object(); 24 private SizedConcurrentQueue< byte[]> writeQueue;25 private SizedConcurrentQueue< byte[]> readQueue;24 private SizedConcurrentQueue<KeyValuePair<PeerInfo,byte[]>> writeQueue; 25 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> readQueue; 26 26 27 27 //uses IMessageService for recieving population from one peer at once … … 38 38 }; // TODO: get own peerinfo 39 39 40 writeQueue = new SizedConcurrentQueue< byte[]>();41 readQueue = new SizedConcurrentQueue< byte[]>();40 writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>(); 41 readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo,byte[]>>(); 42 42 writeQueue.Limit = messageCacheCapacity; 43 43 readQueue.Limit = writeQueue.Limit; … … 70 70 } 71 71 72 public void PublishDataToNetwork(byte[] []data) {72 public void PublishDataToNetwork(byte[] data) { 73 73 try { 74 74 foreach (PeerInfo peer in peerListManager.GetPeerList()) { … … 87 87 } 88 88 89 public byte[][]GetDataFromNetwork() {89 public List<KeyValuePair<PeerInfo, byte[]>> GetDataFromNetwork() { 90 90 try { 91 List< byte[]> res = new List<byte[]>();92 byte[] item = null;91 List<KeyValuePair<PeerInfo, byte[]>> res = new List<KeyValuePair<PeerInfo, byte[]>>(); 92 KeyValuePair<PeerInfo,byte[]> item; 93 93 lock (activeQueueLocker) { 94 94 //changing the current queue for storing items to send … … 105 105 } 106 106 } 107 return res .ToArray();107 return res;//.ToArray(); 108 108 } 109 109 catch (Exception ex) { … … 146 146 if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) { 147 147 lock (activeQueueLocker) { 148 writeQueue.Enqueue( e.data);148 writeQueue.Enqueue(new KeyValuePair<PeerInfo, byte[]>(e.Sender, e.data)); 149 149 } 150 150 -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs
r13971 r13972 15 15 16 16 //providing two queues for faster access 17 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[] []>> bufferedMessagesWrite;18 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[] []>> bufferedMessagesRead;17 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesWrite; 18 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesRead; 19 19 20 20 … … 25 25 public void Init(PeerInfo source, int messageCacheCapacity) { 26 26 myself = source; 27 bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[] []>>();27 bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>(); 28 28 bufferedMessagesRead.Limit = messageCacheCapacity; 29 bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[] []>>();29 bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>(); 30 30 bufferedMessagesWrite.Limit = bufferedMessagesRead.Limit; 31 31 timer = new Timer(1000 * 10); //each 10 seconds … … 34 34 } 35 35 36 public void SendData(PeerInfo destination, byte[] []data) {37 bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[] []>(destination, data));36 public void SendData(PeerInfo destination, byte[] data) { 37 bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[]>(destination, data)); 38 38 } 39 39 … … 50 50 bufferedMessagesRead = bufferedMessagesWrite; 51 51 bufferedMessagesWrite = tmp; 52 List<KeyValuePair<PeerInfo, byte[] []>> messages = new List<KeyValuePair<PeerInfo, byte[][]>>();52 List<KeyValuePair<PeerInfo, byte[]>> messages = new List<KeyValuePair<PeerInfo, byte[]>>(); 53 53 while (!bufferedMessagesRead.IsEmpty) { 54 KeyValuePair<PeerInfo, byte[] []> message;54 KeyValuePair<PeerInfo, byte[]> message; 55 55 if (bufferedMessagesRead.TryDequeue(out message)) { 56 56 messages.Add(message); … … 72 72 /// <param name="messages">a list with multiple messages to the same destination</param> 73 73 /// <returns></returns> 74 private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[] []>> messages) {74 private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[]>> messages) { 75 75 List<KeyValuePair<PeerInfo, byte[][]>> res = new List<KeyValuePair<PeerInfo, byte[][]>>(); 76 76 Dictionary<PeerInfo, List<byte[]>> cache = new Dictionary<PeerInfo, List<byte[]>>(); … … 79 79 cache.Add(messagePackage.Key, new List<byte[]>()); 80 80 } 81 for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) {82 cache[messagePackage.Key].Add(messagePackage.Value [i]);83 }81 //for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) { 82 cache[messagePackage.Key].Add(messagePackage.Value); 83 //} 84 84 } 85 85 //now we have a dictionary with all messages per destionation -
branches/thasling/DistributedGA/DistributedGA.Core/Interface/IMessageHandler.cs
r13969 r13972 9 9 void Init(string lanIpPrefix, string contactServerUrl, string problemInstance, int messageCacheCapacty, double communicationRate); 10 10 11 void PublishDataToNetwork(byte[] []data);11 void PublishDataToNetwork(byte[] data); 12 12 13 byte[][]GetDataFromNetwork();13 List<KeyValuePair<PeerInfo, byte[]>> GetDataFromNetwork(); 14 14 15 15 PeerInfo GetPeerInfo(); -
branches/thasling/DistributedGA/DistributedGA.Core/Interface/IMessageSender.cs
r13956 r13972 7 7 void Init(PeerInfo source, int messageCacheCapacity); 8 8 9 void SendData(PeerInfo destination, byte[] []data);9 void SendData(PeerInfo destination, byte[] data); 10 10 11 11 void Dispose(); -
branches/thasling/DistributedGA/DistributedGA.Hive/P2PMigrationAnalyzer.cs
r13971 r13972 208 208 { 209 209 // send 210 var message = new byte[emigrantsList.Count][];211 210 for (int ei = 0; ei < emigrantsList.Count; ei++) { 212 211 using (var stream = new MemoryStream()) { 212 byte[] message; 213 213 var emigrantScope = emigrantsList[ei]; 214 214 … … 219 219 } 220 220 HeuristicLab.Persistence.Default.Xml.XmlGenerator.Serialize(msgScope, stream); 221 message[ei] = stream.GetBuffer(); 221 message = stream.GetBuffer(); 222 h.PublishDataToNetwork(message); 223 222 224 } 223 225 } 224 h.PublishDataToNetwork(message);225 226 } 226 227 … … 229 230 // recieve 230 231 var message = h.GetDataFromNetwork(); 231 for (int ei = 0; ei < message.Length; ei++) { 232 using (var stream = new MemoryStream(message[ei])) { 232 //for (int ei = 0; ei < message.Length; ei++) { 233 foreach (var msg in message) { 234 using (var stream = new MemoryStream(msg.Value)) { 233 235 var immigrantScope = HeuristicLab.Persistence.Default.Xml.XmlParser.Deserialize<IScope>(stream); 234 236 … … 278 280 double quality = 0.0; 279 281 quality = qImmigrant; 280 log.LogMessage(string.Format("Recieved individual with quality {0}", quality)); 282 log.LogMessage(string.Format("Recieved individual with quality {0} from peer {1}:{2} ; Job: {3}", 283 quality, msg.Key.IpAddress, msg.Key.Port, msg.Key.ProblemInstance)); 281 284 } 282 285 } -
branches/thasling/DistributedGA/DistributedGA.Hive/P2PTask.cs
r13956 r13972 115 115 var message = CreateMessage(pi, i); 116 116 Console.WriteLine("Publishing messages..."); 117 h.PublishDataToNetwork(message);117 //h.PublishDataToNetwork(message); 118 118 Console.WriteLine("Messages published."); 119 119 Console.WriteLine("Recieved messages:"); 120 120 foreach (var item in h.GetDataFromNetwork()) 121 121 { 122 log.LogMessage(string.Format("Message:{0}", GetString(item)));122 //log.LogMessage(string.Format("Message:{0}", GetString(item))); 123 123 } 124 124 ExecutionTime = DateTime.Now - startTime;
Note: See TracChangeset
for help on using the changeset viewer.