Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
07/01/16 20:24:52 (8 years ago)
Author:
thasling
Message:

#2615:
improved log
made changes in data structure

Location:
branches/thasling/DistributedGA/DistributedGA.Core/Implementation
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs

    r13971 r13972  
    2222    //two queues are used to gather and and provide population more efficiently
    2323    private object activeQueueLocker = new Object();
    24     private SizedConcurrentQueue<byte[]> writeQueue;
    25     private SizedConcurrentQueue<byte[]> readQueue;
     24    private SizedConcurrentQueue<KeyValuePair<PeerInfo,byte[]>> writeQueue;
     25    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> readQueue;
    2626
    2727    //uses IMessageService for recieving population from one peer at once
     
    3838        }; // TODO: get own peerinfo
    3939
    40         writeQueue = new SizedConcurrentQueue<byte[]>();
    41         readQueue = new SizedConcurrentQueue<byte[]>();
     40        writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
     41        readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo,byte[]>>();
    4242        writeQueue.Limit = messageCacheCapacity;
    4343        readQueue.Limit = writeQueue.Limit;
     
    7070    }
    7171
    72     public void PublishDataToNetwork(byte[][] data) {
     72    public void PublishDataToNetwork(byte[] data) {
    7373      try {
    7474        foreach (PeerInfo peer in peerListManager.GetPeerList()) {
     
    8787    }
    8888
    89     public byte[][] GetDataFromNetwork() {
     89    public List<KeyValuePair<PeerInfo, byte[]>> GetDataFromNetwork() {
    9090      try {
    91         List<byte[]> res = new List<byte[]>();
    92         byte[] item = null;
     91        List<KeyValuePair<PeerInfo, byte[]>> res = new List<KeyValuePair<PeerInfo, byte[]>>();
     92        KeyValuePair<PeerInfo,byte[]> item;
    9393        lock (activeQueueLocker) {
    9494          //changing the current queue for storing items to send
     
    105105          }
    106106        }
    107         return res.ToArray();
     107        return res;//.ToArray();
    108108      }
    109109      catch (Exception ex) {
     
    146146      if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) {
    147147        lock (activeQueueLocker) {
    148             writeQueue.Enqueue(e.data);
     148            writeQueue.Enqueue(new KeyValuePair<PeerInfo, byte[]>(e.Sender, e.data));
    149149        }
    150150
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs

    r13971 r13972  
    1515
    1616    //providing two queues for faster access
    17     private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessagesWrite;
    18     private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessagesRead;
     17    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesWrite;
     18    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesRead;
    1919
    2020
     
    2525    public void Init(PeerInfo source, int messageCacheCapacity) {
    2626      myself = source;
    27       bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>();
     27      bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
    2828      bufferedMessagesRead.Limit = messageCacheCapacity;
    29       bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>();
     29      bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
    3030      bufferedMessagesWrite.Limit = bufferedMessagesRead.Limit;
    3131      timer = new Timer(1000 * 10); //each 10 seconds
     
    3434    }
    3535
    36     public void SendData(PeerInfo destination, byte[][] data) {
    37       bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[][]>(destination, data));
     36    public void SendData(PeerInfo destination, byte[] data) {
     37      bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[]>(destination, data));
    3838    }
    3939
     
    5050        bufferedMessagesRead = bufferedMessagesWrite;
    5151        bufferedMessagesWrite = tmp;
    52         List<KeyValuePair<PeerInfo, byte[][]>> messages = new List<KeyValuePair<PeerInfo, byte[][]>>();
     52        List<KeyValuePair<PeerInfo, byte[]>> messages = new List<KeyValuePair<PeerInfo, byte[]>>();
    5353        while (!bufferedMessagesRead.IsEmpty) {
    54           KeyValuePair<PeerInfo, byte[][]> message;
     54          KeyValuePair<PeerInfo, byte[]> message;
    5555          if (bufferedMessagesRead.TryDequeue(out message)) {
    5656            messages.Add(message);
     
    7272    /// <param name="messages">a list with multiple messages to the same  destination</param>
    7373    /// <returns></returns>
    74     private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[][]>> messages) {
     74    private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[]>> messages) {
    7575      List<KeyValuePair<PeerInfo, byte[][]>> res = new List<KeyValuePair<PeerInfo, byte[][]>>();
    7676      Dictionary<PeerInfo, List<byte[]>> cache = new Dictionary<PeerInfo, List<byte[]>>();
     
    7979          cache.Add(messagePackage.Key, new List<byte[]>());
    8080        }
    81         for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) {
    82           cache[messagePackage.Key].Add(messagePackage.Value[i]);
    83         }
     81        //for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) {
     82          cache[messagePackage.Key].Add(messagePackage.Value);
     83        //}
    8484      }
    8585      //now we have a dictionary with all messages per destionation
Note: See TracChangeset for help on using the changeset viewer.