- Timestamp:
- 07/06/16 16:47:47 (8 years ago)
- Location:
- branches/thasling/DistributedGA/DistributedGA.Core/Implementation
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs
r13982 r14009 22 22 //two queues are used to gather and and provide population more efficiently 23 23 private object activeQueueLocker = new Object(); 24 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> writeQueue;24 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> writeQueue; 25 25 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> readQueue; 26 26 … … 29 29 30 30 private double communicationRate; 31 private Random rand;32 31 33 32 public event EventHandler<Exception> ExceptionOccurend; … … 42 41 43 42 this.communicationRate = communicationRate; 44 rand = new Random();45 43 46 44 writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>(); 47 readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();45 readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>(); 48 46 writeQueue.Limit = messageCacheCapacity; 49 47 readQueue.Limit = writeQueue.Limit; … … 54 52 55 53 peerListManager = new WcfPeerListManager(); 56 peerListManager.Init(ownInstance, contactServerUrl );54 peerListManager.Init(ownInstance, contactServerUrl, communicationRate); 57 55 58 56 sender = new WcfMessageSender(); … … 78 76 public void PublishDataToNetwork(byte[] data) { 79 77 try { 80 var peers = peerListManager.GetPeerList(); 81 foreach (PeerInfo peer in peerListManager.GetPeerList()) { 82 var peersForMessaging = ChoosePeersForMessaging(ref peers); 83 84 //maybe something will go wrong during network communication... 78 var allPeers = peerListManager.GetPeerList(); 79 foreach (PeerInfo peer in allPeers) { 85 80 try { 86 81 sender.SendData(peer, data); … … 90 85 } 91 86 } 87 92 88 } 93 89 catch (Exception ex) { … … 99 95 try { 100 96 List<KeyValuePair<PeerInfo, byte[]>> res = new List<KeyValuePair<PeerInfo, byte[]>>(); 101 KeyValuePair<PeerInfo, byte[]> item;97 KeyValuePair<PeerInfo, byte[]> item; 102 98 lock (activeQueueLocker) { 103 99 //changing the current queue for storing items to send … … 131 127 132 128 private void PropagateException(Exception ex) { 133 //if (CountdownCompleted != null)134 // CountdownCompleted(this, e);135 129 if (ExceptionOccurend != null) { 136 130 ExceptionOccurend(this, ex); 137 131 } 138 }139 140 private List<PeerInfo> ChoosePeersForMessaging(ref List<PeerInfo> allPeers) {141 Shuffle<PeerInfo>(allPeers);142 int toTake = Convert.ToInt32(allPeers.Count * communicationRate) + 1;143 if (allPeers.Count > 0 && toTake == 0) {144 toTake = 1;145 }146 return allPeers.Take(toTake).ToList(); ;147 132 } 148 133 … … 161 146 } 162 147 163 private void Shuffle<T>(IList<T> list) {164 int n = list.Count;165 while (n > 1) {166 n--;167 int k = rand.Next(n + 1);168 T value = list[k];169 list[k] = list[n];170 list[n] = value;171 }172 }173 174 148 private void OnDataRecieved(object sender, MessageRecieveEventArgs e) { 175 149 if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) { 176 150 lock (activeQueueLocker) { 177 151 writeQueue.Enqueue(new KeyValuePair<PeerInfo, byte[]>(e.Sender, e.data)); 178 152 } 179 153 -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs
r13982 r14009 17 17 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesWrite; 18 18 private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesRead; 19 20 19 21 20 private Timer timer; //sends cached messages to network in background … … 80 79 } 81 80 //for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) { 82 81 cache[messagePackage.Key].Add(messagePackage.Value); 83 82 //} 84 83 } -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfPeerListManager.cs
r13982 r14009 22 22 private IContactService client; 23 23 24 private IContactService heartbeatClient;24 private List<PeerInfo> cachedPeerList; 25 25 26 private double communicationRate; 26 27 28 private Random rand; 27 29 28 public void Init(PeerInfo source, string contactServerUrl ) {30 public void Init(PeerInfo source, string contactServerUrl, double communicationRate) { 29 31 serverString = contactServerUrl; 32 this.communicationRate = communicationRate; 30 33 myself = source; 31 32 //Init ChannelFactory and Clients 34 cachedPeerList = new List<PeerInfo>(); 35 rand = new Random(); 36 //Init ChannelFactory and Client 33 37 var binding = new NetTcpBinding(); 34 38 var endpoint = new EndpointAddress(serverString); 35 39 myChannelFactory = new ChannelFactory<IContactService>(binding, endpoint); 36 40 client = myChannelFactory.CreateChannel(); 37 heartbeatClient = myChannelFactory.CreateChannel();38 41 //Register Peer 39 client.RegisterPeer( source);42 client.RegisterPeer(myself); 40 43 //Start heartbeat timer 41 44 timer = new Timer(1000 * 20); //each 20 seconds 42 timer.Elapsed += SendHeartbeatToServer;45 timer.Elapsed += RefreshPeerList; 43 46 timer.Start(); 44 47 } 45 48 46 49 public List<PeerInfo> GetPeerList() { 47 try { 48 var allPeers = client.GetPeerList(myself); //maybe timout exception... 49 return allPeers; 50 } 51 catch { } //if maybe sending failed (because of connection lost, etc.): just ignore 52 return new List<PeerInfo>(); 50 return cachedPeerList; 53 51 } 54 52 … … 61 59 timer.Dispose(); 62 60 ((IClientChannel)client).Close(); 63 ((IClientChannel)heartbeatClient).Close();64 61 myChannelFactory.Close(); 65 client = null;66 62 myChannelFactory = null; 67 63 } 68 64 69 private void SendHeartbeatToServer(object sender, ElapsedEventArgs e) { 65 private List<PeerInfo> ChoosePeersForMessaging(ref List<PeerInfo> allPeers) { 66 Shuffle<PeerInfo>(allPeers); 67 int toTake = Convert.ToInt32(allPeers.Count * communicationRate) + 1; 68 if (allPeers.Count > 0 && toTake == 0) { 69 toTake = 1; 70 } 71 return allPeers.Take(toTake).ToList(); ; 72 } 73 74 private void Shuffle<T>(IList<T> list) { 75 int n = list.Count; 76 while (n > 1) { 77 n--; 78 int k = rand.Next(n + 1); 79 T value = list[k]; 80 list[k] = list[n]; 81 list[n] = value; 82 } 83 } 84 85 private void RefreshPeerList(object sender, ElapsedEventArgs e) { 70 86 lock (timerLock) { 71 87 try { 72 heartbeatClient.UpdateHeartbeat(myself); 88 var allPeers = client.GetPeerList(myself); 89 cachedPeerList = ChoosePeersForMessaging(ref allPeers); 73 90 } 74 91 catch { } //nothing to do
Note: See TracChangeset
for help on using the changeset viewer.