Changeset 14009 for branches/thasling
- Timestamp:
- 07/06/16 16:47:47 (8 years ago)
- Location:
- branches/thasling/DistributedGA
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs
r13982 r14009 22 22 //two queues are used to gather and and provide population more efficiently 23 23 private object activeQueueLocker = new Object(); 24 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> writeQueue;24 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> writeQueue; 25 25 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> readQueue; 26 26 … … 29 29 30 30 private double communicationRate; 31 private Random rand;32 31 33 32 public event EventHandler<Exception> ExceptionOccurend; … … 42 41 43 42 this.communicationRate = communicationRate; 44 rand = new Random();45 43 46 44 writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>(); 47 readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();45 readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>(); 48 46 writeQueue.Limit = messageCacheCapacity; 49 47 readQueue.Limit = writeQueue.Limit; … … 54 52 55 53 peerListManager = new WcfPeerListManager(); 56 peerListManager.Init(ownInstance, contactServerUrl );54 peerListManager.Init(ownInstance, contactServerUrl, communicationRate); 57 55 58 56 sender = new WcfMessageSender(); … … 78 76 public void PublishDataToNetwork(byte[] data) { 79 77 try { 80 var peers = peerListManager.GetPeerList(); 81 foreach (PeerInfo peer in peerListManager.GetPeerList()) { 82 var peersForMessaging = ChoosePeersForMessaging(ref peers); 83 84 //maybe something will go wrong during network communication... 78 var allPeers = peerListManager.GetPeerList(); 79 foreach (PeerInfo peer in allPeers) { 85 80 try { 86 81 sender.SendData(peer, data); … … 90 85 } 91 86 } 87 92 88 } 93 89 catch (Exception ex) { … … 99 95 try { 100 96 List<KeyValuePair<PeerInfo, byte[]>> res = new List<KeyValuePair<PeerInfo, byte[]>>(); 101 KeyValuePair<PeerInfo, byte[]> item;97 KeyValuePair<PeerInfo, byte[]> item; 102 98 lock (activeQueueLocker) { 103 99 //changing the current queue for storing items to send … … 131 127 132 128 private void PropagateException(Exception ex) { 133 //if (CountdownCompleted != null)134 // CountdownCompleted(this, e);135 129 if (ExceptionOccurend != null) { 136 130 ExceptionOccurend(this, ex); 137 131 } 138 }139 140 private List<PeerInfo> ChoosePeersForMessaging(ref List<PeerInfo> allPeers) {141 Shuffle<PeerInfo>(allPeers);142 int toTake = Convert.ToInt32(allPeers.Count * communicationRate) + 1;143 if (allPeers.Count > 0 && toTake == 0) {144 toTake = 1;145 }146 return allPeers.Take(toTake).ToList(); ;147 132 } 148 133 … … 161 146 } 162 147 163 private void Shuffle<T>(IList<T> list) {164 int n = list.Count;165 while (n > 1) {166 n--;167 int k = rand.Next(n + 1);168 T value = list[k];169 list[k] = list[n];170 list[n] = value;171 }172 }173 174 148 private void OnDataRecieved(object sender, MessageRecieveEventArgs e) { 175 149 if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) { 176 150 lock (activeQueueLocker) { 177 151 writeQueue.Enqueue(new KeyValuePair<PeerInfo, byte[]>(e.Sender, e.data)); 178 152 } 179 153 -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs
r13982 r14009 17 17 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesWrite; 18 18 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesRead; 19 20 19 21 20 private Timer timer; //sends cached messages to network in background … … 80 79 } 81 80 //for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) { 82 81 cache[messagePackage.Key].Add(messagePackage.Value); 83 82 //} 84 83 } -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfPeerListManager.cs
r13982 r14009 22 22 private IContactService client; 23 23 24 private IContactService heartbeatClient;24 private List<PeerInfo> cachedPeerList; 25 25 26 private double communicationRate; 26 27 28 private Random rand; 27 29 28 public void Init(PeerInfo source, string contactServerUrl ) {30 public void Init(PeerInfo source, string contactServerUrl, double communicationRate) { 29 31 serverString = contactServerUrl; 32 this.communicationRate = communicationRate; 30 33 myself = source; 31 32 //Init ChannelFactory and Clients 34 cachedPeerList = new List<PeerInfo>(); 35 rand = new Random(); 36 //Init ChannelFactory and Client 33 37 var binding = new NetTcpBinding(); 34 38 var endpoint = new EndpointAddress(serverString); 35 39 myChannelFactory = new ChannelFactory<IContactService>(binding, endpoint); 36 40 client = myChannelFactory.CreateChannel(); 37 heartbeatClient = myChannelFactory.CreateChannel();38 41 //Register Peer 39 client.RegisterPeer( source);42 client.RegisterPeer(myself); 40 43 //Start heartbeat timer 41 44 timer = new Timer(1000 * 20); //each 20 seconds 42 timer.Elapsed += SendHeartbeatToServer;45 timer.Elapsed += RefreshPeerList; 43 46 timer.Start(); 44 47 } 45 48 46 49 public List<PeerInfo> GetPeerList() { 47 try { 48 var allPeers = client.GetPeerList(myself); //maybe timout exception... 49 return allPeers; 50 } 51 catch { } //if maybe sending failed (because of connection lost, etc.): just ignore 52 return new List<PeerInfo>(); 50 return cachedPeerList; 53 51 } 54 52 … … 61 59 timer.Dispose(); 62 60 ((IClientChannel)client).Close(); 63 ((IClientChannel)heartbeatClient).Close();64 61 myChannelFactory.Close(); 65 client = null;66 62 myChannelFactory = null; 67 63 } 68 64 69 private void SendHeartbeatToServer(object sender, ElapsedEventArgs e) { 65 private List<PeerInfo> ChoosePeersForMessaging(ref List<PeerInfo> allPeers) { 66 Shuffle<PeerInfo>(allPeers); 67 int toTake = Convert.ToInt32(allPeers.Count * communicationRate) + 1; 68 if (allPeers.Count > 0 && toTake == 0) { 69 toTake = 1; 70 } 71 return allPeers.Take(toTake).ToList(); ; 72 } 73 74 private void Shuffle<T>(IList<T> list) { 75 int n = list.Count; 76 while (n > 1) { 77 n--; 78 int k = rand.Next(n + 1); 79 T value = list[k]; 80 list[k] = list[n]; 81 list[n] = value; 82 } 83 } 84 85 private void RefreshPeerList(object sender, ElapsedEventArgs e) { 70 86 lock (timerLock) { 71 87 try { 72 heartbeatClient.UpdateHeartbeat(myself); 88 var allPeers = client.GetPeerList(myself); 89 cachedPeerList = ChoosePeersForMessaging(ref allPeers); 73 90 } 74 91 catch { } //nothing to do -
branches/thasling/DistributedGA/DistributedGA.Core/Interface/IPeerListManager.cs
r13982 r14009 6 6 public interface IPeerListManager { 7 7 8 void Init(PeerInfo source, string contactServerUrl ); //Registers own instance at the contact-server8 void Init(PeerInfo source, string contactServerUrl, double communicationRate); //Registers own instance at the contact-server 9 9 10 10 List<PeerInfo> GetPeerList(); //Recieves all peers in the network from contact-server -
branches/thasling/DistributedGA/DistributedGA.Hive/MigrationStrategy.cs
r13957 r14009 6 6 7 7 namespace DistributedGA.Hive { 8 public enum MigrationStrategy { TakeBestReplaceBad, TakeRandomReplaceBad, TakeBestReplaceRandom, TakeRandomReplaceRandom };8 public enum MigrationStrategy { Best, Worst, Random }; 9 9 } -
branches/thasling/DistributedGA/DistributedGA.Hive/P2PMigrationAnalyzer.cs
r13997 r14009 72 72 } 73 73 74 public IConstrainedValueParameter<EnumValue<MigrationStrategy>> MigrationStrategyParameter { 75 get { return (IConstrainedValueParameter<EnumValue<MigrationStrategy>>)Parameters["MigrationStrategy"]; } 74 public IConstrainedValueParameter<EnumValue<MigrationStrategy>> MigrationStrategySelectParameter { 75 get { return (IConstrainedValueParameter<EnumValue<MigrationStrategy>>)Parameters["MigrationStrategySelect"]; } 76 } 77 78 public IConstrainedValueParameter<EnumValue<MigrationStrategy>> MigrationStrategyReplaceParameter { 79 get { return (IConstrainedValueParameter<EnumValue<MigrationStrategy>>)Parameters["MigrationStrategyReplace"]; } 76 80 } 77 81 … … 100 104 Parameters.Add(new LookupParameter<BoolValue>("Maximization")); 101 105 Parameters.Add(new ScopeTreeLookupParameter<DoubleValue>("Quality", 1)); 102 Parameters.Add(new ValueParameter<IntValue>("MigrationInterval", "", new IntValue( 1)));106 Parameters.Add(new ValueParameter<IntValue>("MigrationInterval", "", new IntValue(5))); 103 107 Parameters.Add(new ValueParameter<PercentValue>("MigrationRate", "", new PercentValue(0.05))); 104 108 Parameters.Add(new ValueParameter<PercentValue>("CommunicationRate", "", new PercentValue(0.10))); 105 Parameters.Add(new ValueParameter<IntValue>("MessageCacheCapacity", "", new IntValue(100 )));109 Parameters.Add(new ValueParameter<IntValue>("MessageCacheCapacity", "", new IntValue(1000))); 106 110 Parameters.Add(new ValueParameter<StringValue>("LanIpPrefix", "", new StringValue("10."))); 107 111 Parameters.Add(new LookupParameter<IRandom>("Random", "The random number generator")); … … 111 115 112 116 var validValues = new ItemSet<EnumValue<MigrationStrategy>>(); 113 validValues.Add(new EnumValue<MigrationStrategy>(MigrationStrategy. TakeBestReplaceBad));114 validValues.Add(new EnumValue<MigrationStrategy>(MigrationStrategy. TakeBestReplaceRandom));115 validValues.Add(new EnumValue<MigrationStrategy>(MigrationStrategy. TakeRandomReplaceBad));116 validValues.Add(new EnumValue<MigrationStrategy>(MigrationStrategy.TakeRandomReplaceRandom));117 118 Parameters.Add(new ConstrainedValueParameter<EnumValue<MigrationStrategy>>("MigrationStrategy", validValues, (new EnumValue<MigrationStrategy>(MigrationStrategy.TakeBestReplaceBad)))); 117 validValues.Add(new EnumValue<MigrationStrategy>(MigrationStrategy.Best)); 118 validValues.Add(new EnumValue<MigrationStrategy>(MigrationStrategy.Worst)); 119 validValues.Add(new EnumValue<MigrationStrategy>(MigrationStrategy.Random)); 120 Parameters.Add(new ConstrainedValueParameter<EnumValue<MigrationStrategy>>("MigrationStrategySelect", validValues, (new EnumValue<MigrationStrategy>(MigrationStrategy.Random)))); 121 Parameters.Add(new ConstrainedValueParameter<EnumValue<MigrationStrategy>>("MigrationStrategyReplace", validValues, (new EnumValue<MigrationStrategy>(MigrationStrategy.Random)))); 122 119 123 } 120 124 121 125 public override IDeepCloneable Clone(Cloner cloner) { 122 126 return new P2PMigrationAnalyzer(this, cloner); 123 }124 125 public override void ClearState() {126 base.ClearState();127 h.Dispose();128 h = null;129 127 } 130 128 … … 143 141 if (h == null) { 144 142 Init(); 143 //TODO: sort population 144 145 145 } 146 146 if (MigrationIterationsParameter.ActualValue == null) { … … 160 160 } 161 161 var popQualities = QualityParameter.ActualValue; 162 var selectedMigStrat = MigrationStrategyParameter.Value.Value; 162 var selectedMigStratSelect = MigrationStrategySelectParameter.Value.Value; 163 var selectedMigStratReplace = MigrationStrategyReplaceParameter.Value.Value; 163 164 164 165 var rand = Random; … … 168 169 for (int i = 0; i < noOfEmigrants; i++) { 169 170 //select emigrant depending on strategy 170 switch (selectedMigStrat ) {171 case MigrationStrategy. TakeBestReplaceBad:171 switch (selectedMigStratSelect) { 172 case MigrationStrategy.Best: 172 173 emigrants = scope.SubScopes[i]; 173 174 emigrantsList.Add(emigrants); 174 175 break; 175 176 176 case MigrationStrategy.TakeBestReplaceRandom: 177 emigrants = scope.SubScopes[i]; 178 emigrantsList.Add(emigrants); 179 break; 180 181 case MigrationStrategy.TakeRandomReplaceBad: 177 case MigrationStrategy.Random: 182 178 replIdx = rand.Next(scope.SubScopes.Count); 183 179 emigrants = scope.SubScopes[replIdx]; … … 185 181 break; 186 182 187 case MigrationStrategy.TakeRandomReplaceRandom: 188 replIdx = rand.Next(scope.SubScopes.Count); 189 emigrants = scope.SubScopes[replIdx]; 183 case MigrationStrategy.Worst: 184 emigrants = scope.SubScopes[scope.SubScopes.Count - i]; 190 185 emigrantsList.Add(emigrants); 191 186 break; 187 192 188 193 189 default: … … 226 222 227 223 // replace individual in current population 228 switch (selectedMigStrat ) {229 case MigrationStrategy. TakeBestReplaceBad:224 switch (selectedMigStratReplace) { 225 case MigrationStrategy.Best: 230 226 scope.SubScopes.RemoveAt(0); 231 227 break; 232 228 233 case MigrationStrategy.TakeRandomReplaceBad: 234 scope.SubScopes.RemoveAt(0); 235 break; 236 237 case MigrationStrategy.TakeBestReplaceRandom: 238 //replace random 229 case MigrationStrategy.Random: 239 230 replIdx = rand.Next(scope.SubScopes.Count); 240 231 scope.SubScopes.RemoveAt(replIdx); 241 232 break; 242 233 243 case MigrationStrategy. TakeRandomReplaceRandom:234 case MigrationStrategy.Worst: 244 235 //replace random 245 replIdx = rand.Next(scope.SubScopes.Count); 246 scope.SubScopes.RemoveAt(replIdx); 236 scope.SubScopes.RemoveAt(scope.SubScopes.Count); 247 237 break; 248 238
Note: See TracChangeset
for help on using the changeset viewer.