Changeset 13972


Ignore:
Timestamp:
07/01/16 20:24:52 (3 years ago)
Author:
thasling
Message:

#2615:
improved log
made changes in data structure

Location:
branches/thasling/DistributedGA
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • branches/thasling/DistributedGA/DistributedGA.Core.Host/Program.cs

    r13956 r13972  
    3131          var message = CreateMessage(pi, i);
    3232          Console.WriteLine("Publishing messages...");
    33           h.PublishDataToNetwork(message);
     33          //h.PublishDataToNetwork(message);
    3434          Console.WriteLine("Messages published.");
    3535          Console.WriteLine("Recieved messages:");
    3636          foreach (var item in h.GetDataFromNetwork()) {
    37             Console.WriteLine(string.Format("Message:{0}", GetString(item)));
     37            //Console.WriteLine(string.Format("Message:{0}", GetString(item)));
    3838          }
    3939        }
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs

    r13971 r13972  
    2222    //two queues are used to gather and and provide population more efficiently
    2323    private object activeQueueLocker = new Object();
    24     private SizedConcurrentQueue<byte[]> writeQueue;
    25     private SizedConcurrentQueue<byte[]> readQueue;
     24    private SizedConcurrentQueue<KeyValuePair<PeerInfo,byte[]>> writeQueue;
     25    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> readQueue;
    2626
    2727    //uses IMessageService for recieving population from one peer at once
     
    3838        }; // TODO: get own peerinfo
    3939
    40         writeQueue = new SizedConcurrentQueue<byte[]>();
    41         readQueue = new SizedConcurrentQueue<byte[]>();
     40        writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
     41        readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo,byte[]>>();
    4242        writeQueue.Limit = messageCacheCapacity;
    4343        readQueue.Limit = writeQueue.Limit;
     
    7070    }
    7171
    72     public void PublishDataToNetwork(byte[][] data) {
     72    public void PublishDataToNetwork(byte[] data) {
    7373      try {
    7474        foreach (PeerInfo peer in peerListManager.GetPeerList()) {
     
    8787    }
    8888
    89     public byte[][] GetDataFromNetwork() {
     89    public List<KeyValuePair<PeerInfo, byte[]>> GetDataFromNetwork() {
    9090      try {
    91         List<byte[]> res = new List<byte[]>();
    92         byte[] item = null;
     91        List<KeyValuePair<PeerInfo, byte[]>> res = new List<KeyValuePair<PeerInfo, byte[]>>();
     92        KeyValuePair<PeerInfo,byte[]> item;
    9393        lock (activeQueueLocker) {
    9494          //changing the current queue for storing items to send
     
    105105          }
    106106        }
    107         return res.ToArray();
     107        return res;//.ToArray();
    108108      }
    109109      catch (Exception ex) {
     
    146146      if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) {
    147147        lock (activeQueueLocker) {
    148             writeQueue.Enqueue(e.data);
     148            writeQueue.Enqueue(new KeyValuePair<PeerInfo, byte[]>(e.Sender, e.data));
    149149        }
    150150
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs

    r13971 r13972  
    1515
    1616    //providing two queues for faster access
    17     private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessagesWrite;
    18     private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessagesRead;
     17    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesWrite;
     18    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesRead;
    1919
    2020
     
    2525    public void Init(PeerInfo source, int messageCacheCapacity) {
    2626      myself = source;
    27       bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>();
     27      bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
    2828      bufferedMessagesRead.Limit = messageCacheCapacity;
    29       bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>();
     29      bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
    3030      bufferedMessagesWrite.Limit = bufferedMessagesRead.Limit;
    3131      timer = new Timer(1000 * 10); //each 10 seconds
     
    3434    }
    3535
    36     public void SendData(PeerInfo destination, byte[][] data) {
    37       bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[][]>(destination, data));
     36    public void SendData(PeerInfo destination, byte[] data) {
     37      bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[]>(destination, data));
    3838    }
    3939
     
    5050        bufferedMessagesRead = bufferedMessagesWrite;
    5151        bufferedMessagesWrite = tmp;
    52         List<KeyValuePair<PeerInfo, byte[][]>> messages = new List<KeyValuePair<PeerInfo, byte[][]>>();
     52        List<KeyValuePair<PeerInfo, byte[]>> messages = new List<KeyValuePair<PeerInfo, byte[]>>();
    5353        while (!bufferedMessagesRead.IsEmpty) {
    54           KeyValuePair<PeerInfo, byte[][]> message;
     54          KeyValuePair<PeerInfo, byte[]> message;
    5555          if (bufferedMessagesRead.TryDequeue(out message)) {
    5656            messages.Add(message);
     
    7272    /// <param name="messages">a list with multiple messages to the same  destination</param>
    7373    /// <returns></returns>
    74     private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[][]>> messages) {
     74    private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[]>> messages) {
    7575      List<KeyValuePair<PeerInfo, byte[][]>> res = new List<KeyValuePair<PeerInfo, byte[][]>>();
    7676      Dictionary<PeerInfo, List<byte[]>> cache = new Dictionary<PeerInfo, List<byte[]>>();
     
    7979          cache.Add(messagePackage.Key, new List<byte[]>());
    8080        }
    81         for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) {
    82           cache[messagePackage.Key].Add(messagePackage.Value[i]);
    83         }
     81        //for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) {
     82          cache[messagePackage.Key].Add(messagePackage.Value);
     83        //}
    8484      }
    8585      //now we have a dictionary with all messages per destionation
  • branches/thasling/DistributedGA/DistributedGA.Core/Interface/IMessageHandler.cs

    r13969 r13972  
    99    void Init(string lanIpPrefix, string contactServerUrl, string problemInstance, int messageCacheCapacty, double communicationRate);
    1010
    11     void PublishDataToNetwork(byte[][] data);
     11    void PublishDataToNetwork(byte[] data);
    1212
    13     byte[][] GetDataFromNetwork();
     13    List<KeyValuePair<PeerInfo, byte[]>> GetDataFromNetwork();
    1414
    1515    PeerInfo GetPeerInfo();
  • branches/thasling/DistributedGA/DistributedGA.Core/Interface/IMessageSender.cs

    r13956 r13972  
    77    void Init(PeerInfo source, int messageCacheCapacity);
    88
    9     void SendData(PeerInfo destination, byte[][] data);
     9    void SendData(PeerInfo destination, byte[] data);
    1010
    1111    void Dispose();
  • branches/thasling/DistributedGA/DistributedGA.Hive/P2PMigrationAnalyzer.cs

    r13971 r13972  
    208208        {
    209209          // send
    210           var message = new byte[emigrantsList.Count][];
    211210          for (int ei = 0; ei < emigrantsList.Count; ei++) {
    212211            using (var stream = new MemoryStream()) {
     212              byte[] message;
    213213              var emigrantScope = emigrantsList[ei];
    214214
     
    219219              }
    220220              HeuristicLab.Persistence.Default.Xml.XmlGenerator.Serialize(msgScope, stream);
    221               message[ei] = stream.GetBuffer();
     221              message = stream.GetBuffer();
     222              h.PublishDataToNetwork(message);
     223
    222224            }
    223225          }
    224           h.PublishDataToNetwork(message);
    225226        }
    226227
     
    229230          // recieve
    230231          var message = h.GetDataFromNetwork();
    231           for (int ei = 0; ei < message.Length; ei++) {
    232             using (var stream = new MemoryStream(message[ei])) {
     232          //for (int ei = 0; ei < message.Length; ei++) {
     233          foreach (var msg in message) {
     234            using (var stream = new MemoryStream(msg.Value)) {
    233235              var immigrantScope = HeuristicLab.Persistence.Default.Xml.XmlParser.Deserialize<IScope>(stream);
    234236
     
    278280              double quality = 0.0;
    279281              quality = qImmigrant;
    280               log.LogMessage(string.Format("Recieved individual with quality {0}", quality));
     282              log.LogMessage(string.Format("Recieved individual with quality {0} from peer {1}:{2} ; Job: {3}",
     283                                            quality, msg.Key.IpAddress, msg.Key.Port, msg.Key.ProblemInstance));
    281284            }
    282285          }
  • branches/thasling/DistributedGA/DistributedGA.Hive/P2PTask.cs

    r13956 r13972  
    115115                        var message = CreateMessage(pi, i);
    116116                        Console.WriteLine("Publishing messages...");
    117                         h.PublishDataToNetwork(message);
     117                        //h.PublishDataToNetwork(message);
    118118                        Console.WriteLine("Messages published.");
    119119                        Console.WriteLine("Recieved messages:");
    120120                        foreach (var item in h.GetDataFromNetwork())
    121121                        {
    122                             log.LogMessage(string.Format("Message:{0}", GetString(item)));
     122                            //log.LogMessage(string.Format("Message:{0}", GetString(item)));
    123123                        }
    124124                        ExecutionTime = DateTime.Now - startTime;
Note: See TracChangeset for help on using the changeset viewer.