Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
07/06/16 16:47:47 (8 years ago)
Author:
thasling
Message:

#2615:
made minor changes,
wrong commit happende last time, so still tbd:
-sort List
-dispose of anaylzer is never called

Location:
branches/thasling/DistributedGA/DistributedGA.Core/Implementation
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs

    r13982 r14009  
    2222    //two queues are used to gather and and provide population more efficiently
    2323    private object activeQueueLocker = new Object();
    24     private SizedConcurrentQueue<KeyValuePair<PeerInfo,byte[]>> writeQueue;
     24    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> writeQueue;
    2525    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> readQueue;
    2626
     
    2929
    3030    private double communicationRate;
    31     private Random rand;
    3231
    3332    public event EventHandler<Exception> ExceptionOccurend;
     
    4241
    4342        this.communicationRate = communicationRate;
    44         rand = new Random();
    4543
    4644        writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
    47         readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo,byte[]>>();
     45        readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
    4846        writeQueue.Limit = messageCacheCapacity;
    4947        readQueue.Limit = writeQueue.Limit;
     
    5452
    5553        peerListManager = new WcfPeerListManager();
    56         peerListManager.Init(ownInstance, contactServerUrl);
     54        peerListManager.Init(ownInstance, contactServerUrl, communicationRate);
    5755
    5856        sender = new WcfMessageSender();
     
    7876    public void PublishDataToNetwork(byte[] data) {
    7977      try {
    80         var peers = peerListManager.GetPeerList();
    81         foreach (PeerInfo peer in peerListManager.GetPeerList()) {
    82           var peersForMessaging = ChoosePeersForMessaging(ref peers);
    83 
    84           //maybe something will go wrong during network communication...
     78        var allPeers = peerListManager.GetPeerList();
     79        foreach (PeerInfo peer in allPeers) {
    8580          try {
    8681            sender.SendData(peer, data);
     
    9085          }
    9186        }
     87
    9288      }
    9389      catch (Exception ex) {
     
    9995      try {
    10096        List<KeyValuePair<PeerInfo, byte[]>> res = new List<KeyValuePair<PeerInfo, byte[]>>();
    101         KeyValuePair<PeerInfo,byte[]> item;
     97        KeyValuePair<PeerInfo, byte[]> item;
    10298        lock (activeQueueLocker) {
    10399          //changing the current queue for storing items to send
     
    131127
    132128    private void PropagateException(Exception ex) {
    133       //if (CountdownCompleted != null)
    134       //  CountdownCompleted(this, e);
    135129      if (ExceptionOccurend != null) {
    136130        ExceptionOccurend(this, ex);
    137131      }
    138     }
    139 
    140     private List<PeerInfo> ChoosePeersForMessaging(ref List<PeerInfo> allPeers) {
    141       Shuffle<PeerInfo>(allPeers);
    142       int toTake = Convert.ToInt32(allPeers.Count * communicationRate) + 1;
    143       if (allPeers.Count > 0 && toTake == 0) {
    144         toTake = 1;
    145       }
    146       return allPeers.Take(toTake).ToList(); ;
    147132    }
    148133
     
    161146    }
    162147
    163     private void Shuffle<T>(IList<T> list) {
    164       int n = list.Count;
    165       while (n > 1) {
    166         n--;
    167         int k = rand.Next(n + 1);
    168         T value = list[k];
    169         list[k] = list[n];
    170         list[n] = value;
    171       }
    172     }
    173 
    174148    private void OnDataRecieved(object sender, MessageRecieveEventArgs e) {
    175149      if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) {
    176150        lock (activeQueueLocker) {
    177             writeQueue.Enqueue(new KeyValuePair<PeerInfo, byte[]>(e.Sender, e.data));
     151          writeQueue.Enqueue(new KeyValuePair<PeerInfo, byte[]>(e.Sender, e.data));
    178152        }
    179153
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs

    r13982 r14009  
    1717    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesWrite;
    1818    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesRead;
    19 
    2019
    2120    private Timer timer; //sends cached messages to network in background
     
    8079        }
    8180        //for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) {
    82           cache[messagePackage.Key].Add(messagePackage.Value);
     81        cache[messagePackage.Key].Add(messagePackage.Value);
    8382        //}
    8483      }
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfPeerListManager.cs

    r13982 r14009  
    2222    private IContactService client;
    2323
    24     private IContactService heartbeatClient;
     24    private List<PeerInfo> cachedPeerList;
    2525
     26    private double communicationRate;
    2627
     28    private Random rand;
    2729
    28     public void Init(PeerInfo source, string contactServerUrl) {
     30    public void Init(PeerInfo source, string contactServerUrl, double communicationRate) {
    2931      serverString = contactServerUrl;
     32      this.communicationRate = communicationRate;
    3033      myself = source;
    31 
    32       //Init ChannelFactory and Clients
     34      cachedPeerList = new List<PeerInfo>();
     35      rand = new Random();
     36      //Init ChannelFactory and Client
    3337      var binding = new NetTcpBinding();
    3438      var endpoint = new EndpointAddress(serverString);
    3539      myChannelFactory = new ChannelFactory<IContactService>(binding, endpoint);
    3640      client = myChannelFactory.CreateChannel();
    37       heartbeatClient = myChannelFactory.CreateChannel();
    3841      //Register Peer
    39       client.RegisterPeer(source);
     42      client.RegisterPeer(myself);
    4043      //Start heartbeat timer
    4144      timer = new Timer(1000 * 20); //each 20 seconds
    42       timer.Elapsed += SendHeartbeatToServer;
     45      timer.Elapsed += RefreshPeerList;
    4346      timer.Start();
    4447    }
    4548
    4649    public List<PeerInfo> GetPeerList() {
    47       try {
    48         var allPeers = client.GetPeerList(myself); //maybe timout exception...
    49         return allPeers;
    50       }
    51       catch { } //if maybe sending failed (because of connection lost, etc.): just ignore
    52       return new List<PeerInfo>();
     50      return cachedPeerList;
    5351    }
    5452
     
    6159      timer.Dispose();
    6260      ((IClientChannel)client).Close();
    63       ((IClientChannel)heartbeatClient).Close();
    6461      myChannelFactory.Close();
    65       client = null;
    6662      myChannelFactory = null;
    6763    }
    6864
    69     private void SendHeartbeatToServer(object sender, ElapsedEventArgs e) {
     65    private List<PeerInfo> ChoosePeersForMessaging(ref List<PeerInfo> allPeers) {
     66      Shuffle<PeerInfo>(allPeers);
     67      int toTake = Convert.ToInt32(allPeers.Count * communicationRate) + 1;
     68      if (allPeers.Count > 0 && toTake == 0) {
     69        toTake = 1;
     70      }
     71      return allPeers.Take(toTake).ToList(); ;
     72    }
     73
     74    private void Shuffle<T>(IList<T> list) {
     75      int n = list.Count;
     76      while (n > 1) {
     77        n--;
     78        int k = rand.Next(n + 1);
     79        T value = list[k];
     80        list[k] = list[n];
     81        list[n] = value;
     82      }
     83    }
     84
     85    private void RefreshPeerList(object sender, ElapsedEventArgs e) {
    7086      lock (timerLock) {
    7187        try {
    72           heartbeatClient.UpdateHeartbeat(myself);
     88          var allPeers = client.GetPeerList(myself);
     89          cachedPeerList = ChoosePeersForMessaging(ref allPeers);
    7390        }
    7491        catch { } //nothing to do
Note: See TracChangeset for help on using the changeset viewer.