Changeset 14253
- Timestamp:
- 08/12/16 13:30:41 (8 years ago)
- Location:
- branches/thasling/DistributedGA
- Files:
-
- 1 added
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/thasling/DistributedGA/DistributedGA.Core/DistributedGA.Core.csproj
r13937 r14253 64 64 <Compile Include="Interface\IPeerListManager.cs" /> 65 65 <Compile Include="Properties\AssemblyInfo.cs" /> 66 <Compile Include="Util\ByteArrayWrapper.cs" /> 66 67 <Compile Include="Util\SizedConcurrentQueue.cs" /> 67 68 </ItemGroup> -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs
r14252 r14253 22 22 //two queues are used to gather and and provide population more efficiently 23 23 private object activeQueueLocker = new Object(); 24 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> writeQueue;25 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> readQueue;24 private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> writeQueue; 25 private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> readQueue; 26 26 27 27 //uses IMessageService for recieving population from one peer at once 28 28 private IMessageService host = null; 29 30 private double communicationRate;31 29 32 30 public event EventHandler<Exception> ExceptionOccurend; … … 40 38 }; // TODO: get own peerinfo 41 39 42 this.communicationRate = communicationRate; 43 44 writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>(); 45 readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>(); 40 writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>(); 41 readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>(); 46 42 writeQueue.Limit = messageCacheCapacity; 47 43 readQueue.Limit = writeQueue.Limit; … … 57 53 sender.Init(ownInstance, messageCacheCapacity); 58 54 59 } 60 catch (Exception ex) { 55 } catch (Exception ex) { 61 56 AddError("PeerNetworkMessageHandler.Init", ex); 62 57 } … … 68 63 sender.Dispose(); 69 64 peerListManager.Dispose(); 70 } 71 catch (Exception ex) { 65 } catch (Exception ex) { 72 66 AddError("PeerNetworkMessageHandler.Dispose", ex); 73 67 } 74 68 } 75 69 76 public void PublishDataToNetwork( byte[]data) {70 public void PublishDataToNetwork(ByteArrayWrapper data) { 77 71 try { 78 72 var allPeers = peerListManager.GetPeerList(); … … 80 74 try { 81 75 sender.SendData(peer, data); 82 } 83 catch (Exception ex) { 76 } catch (Exception ex) { 84 77 AddError("PeerNetworkMessageHandler.PublishDataToNetwork(during sending to one peer!)", ex); 85 78 } 86 79 } 87 80 88 } 89 catch (Exception ex) { 81 } catch (Exception ex) { 90 82 AddError("PeerNetworkMessageHandler.PublishDataToNetwork", ex); 91 83 } 92 84 } 93 85 94 public List<KeyValuePair<PeerInfo, byte[]>> GetDataFromNetwork() {86 public List<KeyValuePair<PeerInfo, ByteArrayWrapper>> GetDataFromNetwork() { 95 87 try { 96 List<KeyValuePair<PeerInfo, byte[]>> res = new List<KeyValuePair<PeerInfo, byte[]>>();97 KeyValuePair<PeerInfo, byte[]> item;88 List<KeyValuePair<PeerInfo, ByteArrayWrapper>> res = new List<KeyValuePair<PeerInfo, ByteArrayWrapper>>(); 89 KeyValuePair<PeerInfo, ByteArrayWrapper> item; 98 90 lock (activeQueueLocker) { 99 91 //changing the current queue for storing items to send … … 111 103 } 112 104 return res;//.ToArray(); 113 } 114 catch (Exception ex) { 105 } catch (Exception ex) { 115 106 AddError("PeerNetworkMessageHandler.GetDataFromNetwork", ex); 116 107 return null; … … 142 133 .Select(ip => ip.ToString()) 143 134 .First(str => str.StartsWith(ipPrefix)); 144 } 145 catch { return null; } 135 } catch { return null; } 146 136 } 147 137 … … 149 139 if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) { 150 140 lock (activeQueueLocker) { 151 writeQueue.Enqueue(new KeyValuePair<PeerInfo, byte[]>(e.Sender, e.data));141 writeQueue.Enqueue(new KeyValuePair<PeerInfo, ByteArrayWrapper>(e.Sender, ByteArrayWrapper.CreateByteArrayWrapper(e.data))); 152 142 } 153 143 … … 160 150 try { 161 151 peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message)); 162 } 163 catch { } 152 } catch { } 164 153 } 165 154 } -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs
r14252 r14253 1 1 using System; 2 using System.CodeDom; 2 3 using System.Collections.Concurrent; 3 4 using System.Collections.Generic; 5 using System.Linq; 4 6 using System.ServiceModel; 5 7 using System.Threading.Tasks; … … 15 17 16 18 //providing two queues for faster access 17 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesWrite;18 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesRead;19 private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> bufferedMessagesWrite; 20 private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> bufferedMessagesRead; 19 21 20 22 private Timer timer; //sends cached messages to network in background 21 23 22 24 private Object timerLock = new Object(); 25 private bool isActive = false; 23 26 24 27 public void Init(PeerInfo source, int messageCacheCapacity) { 25 28 myself = source; 26 bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();29 bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>(); 27 30 bufferedMessagesRead.Limit = messageCacheCapacity; 28 bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();31 bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>(); 29 32 bufferedMessagesWrite.Limit = bufferedMessagesRead.Limit; 30 33 timer = new Timer(1000 * 10); //each 10 seconds … … 33 36 } 34 37 35 public void SendData(PeerInfo destination, byte[]data) {36 bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[]>(destination, data));38 public void SendData(PeerInfo destination, ByteArrayWrapper data) { 39 bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, ByteArrayWrapper>(destination, data)); 37 40 } 38 41 … … 44 47 45 48 private void GenerateSendingTasks(object sender, ElapsedEventArgs e) { 49 if (isActive) return; 46 50 lock (timerLock) { 47 //changing the queues... 48 var tmp = bufferedMessagesRead; 49 bufferedMessagesRead = bufferedMessagesWrite; 50 bufferedMessagesWrite = tmp; 51 List<KeyValuePair<PeerInfo, byte[]>> messages = new List<KeyValuePair<PeerInfo, byte[]>>(); 52 while (!bufferedMessagesRead.IsEmpty) { 53 KeyValuePair<PeerInfo, byte[]> message; 54 if (bufferedMessagesRead.TryDequeue(out message)) { 55 messages.Add(message); 56 //Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning); 51 try { 52 isActive = true; 53 //changing the queues... 54 var tmp = bufferedMessagesRead; 55 bufferedMessagesRead = bufferedMessagesWrite; 56 bufferedMessagesWrite = tmp; 57 List<KeyValuePair<PeerInfo, ByteArrayWrapper>> messages = new List<KeyValuePair<PeerInfo, ByteArrayWrapper>>(); 58 while (!bufferedMessagesRead.IsEmpty) { 59 KeyValuePair<PeerInfo, ByteArrayWrapper> message; 60 if (bufferedMessagesRead.TryDequeue(out message)) { 61 messages.Add(message); 62 //Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning); 63 } 57 64 } 58 } 59 //now: merge them and start sending tasks 65 //now: merge them and start sending tasks 60 66 61 List<KeyValuePair<PeerInfo, byte[][]>> mergedMessages = MergeMessages(messages); 62 foreach (var item in mergedMessages) { 63 Task.Factory.StartNew(() => SendDataFromQueue(item.Key, item.Value), TaskCreationOptions.LongRunning); 67 List<KeyValuePair<PeerInfo, ByteArrayWrapper[]>> mergedMessages = MergeMessages(messages); 68 69 var runningTasks = new List<Task>(); 70 foreach (var item in mergedMessages) { 71 var itemToSend = item; 72 var t = Task.Factory.StartNew(() => SendDataFromQueue(itemToSend.Key, itemToSend.Value), 73 TaskCreationOptions.LongRunning); 74 runningTasks.Add(t); 75 } 76 77 Task.WaitAll(runningTasks.ToArray()); 78 } finally { 79 isActive = false; 64 80 } 65 81 } … … 71 87 /// <param name="messages">a list with multiple messages to the same destination</param> 72 88 /// <returns></returns> 73 private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[]>> messages) { 74 List<KeyValuePair<PeerInfo, byte[][]>> res = new List<KeyValuePair<PeerInfo, byte[][]>>(); 75 Dictionary<PeerInfo, List<byte[]>> cache = new Dictionary<PeerInfo, List<byte[]>>(); 89 private List<KeyValuePair<PeerInfo, ByteArrayWrapper[]>> MergeMessages(List<KeyValuePair<PeerInfo, ByteArrayWrapper>> messages) { 90 Dictionary<PeerInfo, List<ByteArrayWrapper>> cache = new Dictionary<PeerInfo, List<ByteArrayWrapper>>(); 76 91 foreach (var messagePackage in messages) { 77 92 if (!cache.ContainsKey(messagePackage.Key)) { 78 cache.Add(messagePackage.Key, new List< byte[]>());93 cache.Add(messagePackage.Key, new List<ByteArrayWrapper>()); 79 94 } 80 95 //for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) { … … 82 97 //} 83 98 } 84 //now we have a dictionary with all messages per destionation 85 //so: create a byte[][] again and return 86 foreach (var dest in cache.Keys) { 87 byte[][] messagesPerDest = new byte[cache[dest].Count][]; 88 for (int i = 0; i <= messagesPerDest.GetUpperBound(0); i++) { 89 messagesPerDest[i] = cache[dest][i]; 90 } 91 res.Add(new KeyValuePair<PeerInfo, byte[][]>(dest, messagesPerDest)); 92 } 93 return res; 99 100 return cache.Keys.Select(dest => new KeyValuePair<PeerInfo, ByteArrayWrapper[]>(dest, cache[dest].ToArray())).ToList(); 94 101 } 95 102 … … 99 106 /// <param name="destination">destination peer</param> 100 107 /// <param name="data">data to send</param> 101 private void SendDataFromQueue(PeerInfo destination, byte[][] data) {108 private void SendDataFromQueue(PeerInfo destination, ByteArrayWrapper[] data) { 102 109 try { 103 110 Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port)); … … 117 124 using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) { 118 125 using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) { 119 for (int i = 0; i < data.Get UpperBound(0) + 1; i++) {120 ((IMessageContract)client).SendData(myself, data[i] ); //maybe exception...126 for (int i = 0; i < data.GetLength(0); i++) { 127 ((IMessageContract)client).SendData(myself, data[i].Array); //maybe exception... 121 128 } 122 129 } 123 130 } 124 } 125 catch (Exception ex) { 131 } catch (Exception ex) { 126 132 //ignore 127 133 Console.WriteLine(ex.Message); -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfPeerListManager.cs
r14252 r14253 58 58 } 59 59 60 private List<PeerInfo> ChoosePeersForMessaging( refList<PeerInfo> allPeers) {60 private List<PeerInfo> ChoosePeersForMessaging(List<PeerInfo> allPeers) { 61 61 Shuffle<PeerInfo>(allPeers); 62 62 int toTake = Convert.ToInt32(allPeers.Count * communicationRate) + 1; … … 64 64 toTake = 1; 65 65 } 66 return allPeers.Take(toTake).ToList(); ;66 return allPeers.Take(toTake).ToList(); 67 67 } 68 68 … … 89 89 } 90 90 } 91 cachedPeerList = ChoosePeersForMessaging( refallPeers);91 cachedPeerList = ChoosePeersForMessaging(allPeers); 92 92 } 93 93 catch { } //nothing to do -
branches/thasling/DistributedGA/DistributedGA.Core/Interface/IMessageHandler.cs
r13972 r14253 2 2 using System.Collections.Generic; 3 3 using DistributedGA.Core.Domain; 4 using DistributedGA.Core.Util; 4 5 5 6 namespace DistributedGA.Core.Interface { … … 9 10 void Init(string lanIpPrefix, string contactServerUrl, string problemInstance, int messageCacheCapacty, double communicationRate); 10 11 11 void PublishDataToNetwork( byte[]data);12 void PublishDataToNetwork(ByteArrayWrapper data); 12 13 13 List<KeyValuePair<PeerInfo, byte[]>> GetDataFromNetwork();14 List<KeyValuePair<PeerInfo, ByteArrayWrapper>> GetDataFromNetwork(); 14 15 15 16 PeerInfo GetPeerInfo(); -
branches/thasling/DistributedGA/DistributedGA.Core/Interface/IMessageSender.cs
r13972 r14253 1 1 using DistributedGA.Core.Domain; 2 using DistributedGA.Core.Util; 2 3 3 4 namespace DistributedGA.Core.Interface { … … 7 8 void Init(PeerInfo source, int messageCacheCapacity); 8 9 9 void SendData(PeerInfo destination, byte[]data);10 void SendData(PeerInfo destination, ByteArrayWrapper data); 10 11 11 12 void Dispose(); -
branches/thasling/DistributedGA/DistributedGA.Hive/P2PMigrationAnalyzer.cs
r14252 r14253 28 28 using DistributedGA.Core.Implementation; 29 29 using DistributedGA.Core.Interface; 30 using DistributedGA.Core.Util; 30 31 using DistributedGA.Hive; 31 32 using HeuristicLab.Common; … … 119 120 Parameters.Add(new ValueParameter<StringValue>("JobGUID", "", new StringValue(Guid.NewGuid().ToString()))); 120 121 Parameters.Add(new ValueParameter<ILog>("Log", "The log", new Log(1000))); 122 Parameters.Add(new ValueParameter<IntValue>("ByteArraysAllocated", new IntValue(0))); 123 Parameters.Add(new ValueParameter<IntValue>("ByteArraysFreed", new IntValue(0))); 124 Parameters.Add(new ValueParameter<IntValue>("ByteArraysAlive", new IntValue(0))); 121 125 122 126 var validValues = new ItemSet<EnumValue<MigrationStrategy>>(); … … 145 149 var communicationRate = ((PercentValue)Parameters["CommunicationRate"].ActualValue).Value; 146 150 var messageCacheCapacity = ((IntValue)Parameters["MessageCacheCapacity"].ActualValue).Value; 147 h.Init(lanIpPrefix, contactServerUri, problemInstance, (int)(100 * messageCacheCapacity), (int)(100 * communicationRate));151 h.Init(lanIpPrefix, contactServerUri, problemInstance, messageCacheCapacity, (int)(100 * communicationRate)); 148 152 h.ExceptionOccurend += ExceptionThrown; 149 153 } … … 221 225 HeuristicLab.Persistence.Default.Xml.XmlGenerator.Serialize(msgScope, stream); 222 226 message = stream.GetBuffer(); 223 h.PublishDataToNetwork( message);227 h.PublishDataToNetwork(ByteArrayWrapper.CreateByteArrayWrapper(message)); 224 228 } 225 229 } … … 230 234 // recieve 231 235 var message = h.GetDataFromNetwork(); 232 List<KeyValuePair<PeerInfo, byte[]>> immigrants = new List<KeyValuePair<PeerInfo, byte[]>>();236 List<KeyValuePair<PeerInfo, ByteArrayWrapper>> immigrants = new List<KeyValuePair<PeerInfo, ByteArrayWrapper>>(); 233 237 //limit number of immigrants to use 234 238 if (noOfEmigrants < message.Count) { … … 264 268 var qualityTranslatedName = QualityParameter.TranslatedName; 265 269 foreach (var msg in immigrants) { 266 using (var stream = new MemoryStream(msg.Value )) {270 using (var stream = new MemoryStream(msg.Value.Array)) { 267 271 var immigrantScope = HeuristicLab.Persistence.Default.Xml.XmlParser.Deserialize<IScope>(stream); 268 272 … … 290 294 291 295 MigrationIterations.Value++; 296 297 ((IValueParameter<IntValue>)Parameters["ByteArraysAllocated"]).Value.Value = ByteArrayWrapper.AllocatedCounter; 298 ((IValueParameter<IntValue>)Parameters["ByteArraysFreed"]).Value.Value = ByteArrayWrapper.FreedCounter; 299 ((IValueParameter<IntValue>)Parameters["ByteArraysAlive"]).Value.Value = ByteArrayWrapper.AliveCounter; 300 292 301 return base.Apply(); 293 302 } -
branches/thasling/DistributedGA/DistributedGA.sln
r14252 r14253 1 1 2 2 Microsoft Visual Studio Solution File, Format Version 12.00 3 # Visual Studio 2013 4 VisualStudioVersion = 12.0.21005.1 5 MinimumVisualStudioVersion = 10.0.40219.1 3 # Visual Studio 2012 6 4 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DistributedGA.Test", "DistributedGA.Test\DistributedGA.Test.csproj", "{E0E91C06-C56A-454F-9F7C-3FA7AE7F920E}" 7 5 EndProject … … 50 48 HideSolutionNode = FALSE 51 49 EndGlobalSection 52 GlobalSection(Performance) = preSolution53 HasPerformanceSessions = true54 EndGlobalSection55 50 EndGlobal
Note: See TracChangeset
for help on using the changeset viewer.