Changeset 13959


Ignore:
Timestamp:
06/29/16 22:23:58 (3 years ago)
Author:
thasling
Message:

#2615:
it works

Location:
branches/thasling/DistributedGA
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs

    r13956 r13959  
    1414    private PeerInfo myself;
    1515
    16     private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessages;
     16    //providing two queues for faster access
     17    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessagesWrite;
     18    private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>> bufferedMessagesRead;
     19
    1720
    1821    private Timer timer; //sends cached messages to network in background
     
    2225    public void Init(PeerInfo source, int messageCacheCapacity) {
    2326      myself = source;
    24       bufferedMessages = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>();
    25       bufferedMessages.Limit = messageCacheCapacity;
     27      bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>();
     28      bufferedMessagesRead.Limit = messageCacheCapacity;
     29      bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[][]>>();
     30      bufferedMessagesWrite.Limit = bufferedMessagesRead.Limit;
    2631      timer = new Timer(1000 * 60); //each 5 minutes
    2732      timer.Elapsed += GenerateSendingTasks;
     
    3035
    3136    public void SendData(PeerInfo destination, byte[][] data) {
    32       bufferedMessages.Enqueue(new KeyValuePair<PeerInfo, byte[][]>(destination, data));
     37      bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[][]>(destination, data));
    3338    }
    3439
     
    4045
    4146    private void GenerateSendingTasks(object sender, ElapsedEventArgs e) {
    42       while (!bufferedMessages.IsEmpty) {
    43         KeyValuePair<PeerInfo, byte[][]> message;
    44         if (bufferedMessages.TryDequeue(out message)) {
    45           Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning);
     47      lock (timerLock) {
     48        //changing the queues...
     49        var tmp = bufferedMessagesRead;
     50        bufferedMessagesRead = bufferedMessagesWrite;
     51        bufferedMessagesWrite = tmp;
     52        List<KeyValuePair<PeerInfo, byte[][]>> messages = new List<KeyValuePair<PeerInfo, byte[][]>>();
     53        while (!bufferedMessagesRead.IsEmpty) {
     54          KeyValuePair<PeerInfo, byte[][]> message;
     55          if (bufferedMessagesRead.TryDequeue(out message)) {
     56            messages.Add(message);
     57            //Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning);
     58          }
     59        }
     60        //now: merge them and start sending tasks
     61
     62        List<KeyValuePair<PeerInfo, byte[][]>> mergedMessages = MergeMessages(messages);
     63        foreach (var item in mergedMessages) {
     64          Task.Factory.StartNew(() => SendDataFromQueue(item.Key, item.Value), TaskCreationOptions.LongRunning);
    4665        }
    4766      }
     67    }
     68
     69    /// <summary>
     70    /// Merges and resorts all Messages by their destination
     71    /// </summary>
     72    /// <param name="messages">a list with multiple messages to the same  destination</param>
     73    /// <returns></returns>
     74    private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[][]>> messages) {
     75      List<KeyValuePair<PeerInfo, byte[][]>> res = new List<KeyValuePair<PeerInfo, byte[][]>>();
     76      Dictionary<PeerInfo, List<byte[]>> cache = new Dictionary<PeerInfo, List<byte[]>>();
     77      foreach (var messagePackage in messages) {
     78        if (!cache.ContainsKey(messagePackage.Key)) {
     79          cache.Add(messagePackage.Key, new List<byte[]>());
     80        }
     81        for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) {
     82          cache[messagePackage.Key].Add(messagePackage.Value[i]);
     83        }
     84      }
     85      //now we have a dictionary with all messages per destionation
     86      //so: create a byte[][] again and return
     87      foreach (var dest in cache.Keys) {
     88        byte[][] messagesPerDest = new byte[cache[dest].Count][];
     89        for (int i = 0; i <= messagesPerDest.GetUpperBound(0); i++) {
     90          messagesPerDest[i] = cache[dest][i];
     91        }
     92        res.Add(new KeyValuePair<PeerInfo, byte[][]>(dest, messagesPerDest));
     93      }
     94      return res;
    4895    }
    4996
     
    54101    /// <param name="data">data to send</param>
    55102    private void SendDataFromQueue(PeerInfo destination, byte[][] data) {
    56       lock (timerLock) {
    57         try {
    58           Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port));
    59           var serviceUrl = "DistributedGA.svc";
    60           var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA"));
    61           var serviceUri = new Uri(baseUri, serviceUrl);
     103      try {
     104        int arrayLength = 3;
     105        if (data.GetUpperBound(0) > arrayLength) {
     106          //HACK: SEND MAX 10 items
     107          byte[][] fake = new byte[arrayLength][];
     108          for (int i = 0; i < arrayLength; i++) {
     109            fake[i] = data[i];
     110          }
     111          data = fake;
     112        }
    62113
    63           var binding = new NetTcpBinding();
    64           var endpoint = new EndpointAddress(serviceUri);
    65           using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {
    66             using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {
    67               ((IMessageContract)client).SendData(myself, data); //maybe timout exception...
    68             }
     114
     115        Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port));
     116        var serviceUrl = "DistributedGA.svc";
     117        var baseUri = new Uri(string.Concat("net.tcp://", destination.IpAddress, ":", destination.Port, "/DistributedGA"));
     118        var serviceUri = new Uri(baseUri, serviceUrl);
     119
     120        var binding = new NetTcpBinding();
     121        binding.ReaderQuotas.MaxArrayLength = 2147483647;
     122        binding.ReaderQuotas.MaxNameTableCharCount = 2147483647;
     123        binding.ReaderQuotas.MaxBytesPerRead = 2147483647;
     124        binding.ReaderQuotas.MaxStringContentLength = 2147483647;
     125
     126        var endpoint = new EndpointAddress(serviceUri);
     127        using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {
     128          using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {
     129            ((IMessageContract)client).SendData(myself, data); //maybe timout exception...
    69130          }
    70131        }
    71         catch { } //if maybe sending failed (because of connection lost, etc.): just ignore
     132      }
     133      catch (Exception ex) {
     134        Console.WriteLine(ex.Message);
    72135      }
    73136    }
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfPeerListManager.cs

    r13956 r13959  
    7272    private List<PeerInfo> ChoosePeersForMessaging(List<PeerInfo> allPeers) {
    7373      //communicate with 10% of the network
    74       int noOfPeers = allPeers.Count / (100 /communicationRate);
     74      int noOfPeers = allPeers.Count / (100 / communicationRate);
    7575      List<int> indexList = GetRandomItemIndexes(noOfPeers, 0, allPeers.Count - 1);
    7676      List<PeerInfo> res = new List<PeerInfo>();
     
    8686      int tmp = -1;
    8787      while (res.Count < noOfItems) {
    88         tmp = rnd.Next(minValue, maxValue);
     88        tmp = rnd.Next(minValue, maxValue + 1);
    8989        if (!res.Contains(tmp)) {
    9090          res.Add(tmp);
  • branches/thasling/DistributedGA/DistributedGA.Hive/P2PMigrationAnalyzer.cs

    r13957 r13959  
    5353      get { return (ILookupParameter<IntValue>)Parameters["CommunicationRate"]; }
    5454    }
    55     public ILookupParameter<IntValue> MessageCacheCapacityParameter {                         
     55    public ILookupParameter<IntValue> MessageCacheCapacityParameter {
    5656      get { return (ILookupParameter<IntValue>)Parameters["MessageCacheCapacity"]; }
    5757    }
Note: See TracChangeset for help on using the changeset viewer.