Free cookie consent management tool by TermsFeed Policy Generator

Changeset 14253


Ignore:
Timestamp:
08/12/16 13:30:41 (8 years ago)
Author:
gkronber
Message:

bugfixing (memory leaks)

Location:
branches/thasling/DistributedGA
Files:
1 added
8 edited

Legend:

Unmodified
Added
Removed
  • branches/thasling/DistributedGA/DistributedGA.Core/DistributedGA.Core.csproj

    r13937 r14253  
    6464    <Compile Include="Interface\IPeerListManager.cs" />
    6565    <Compile Include="Properties\AssemblyInfo.cs" />
     66    <Compile Include="Util\ByteArrayWrapper.cs" />
    6667    <Compile Include="Util\SizedConcurrentQueue.cs" />
    6768  </ItemGroup>
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs

    r14252 r14253  
    2222    //two queues are used to gather and and provide population more efficiently
    2323    private object activeQueueLocker = new Object();
    24     private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> writeQueue;
    25     private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> readQueue;
     24    private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> writeQueue;
     25    private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> readQueue;
    2626
    2727    //uses IMessageService for recieving population from one peer at once
    2828    private IMessageService host = null;
    29 
    30     private double communicationRate;
    3129
    3230    public event EventHandler<Exception> ExceptionOccurend;
     
    4038        }; // TODO: get own peerinfo
    4139
    42         this.communicationRate = communicationRate;
    43 
    44         writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
    45         readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
     40        writeQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
     41        readQueue = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
    4642        writeQueue.Limit = messageCacheCapacity;
    4743        readQueue.Limit = writeQueue.Limit;
     
    5753        sender.Init(ownInstance, messageCacheCapacity);
    5854
    59       }
    60       catch (Exception ex) {
     55      } catch (Exception ex) {
    6156        AddError("PeerNetworkMessageHandler.Init", ex);
    6257      }
     
    6863        sender.Dispose();
    6964        peerListManager.Dispose();
    70       }
    71       catch (Exception ex) {
     65      } catch (Exception ex) {
    7266        AddError("PeerNetworkMessageHandler.Dispose", ex);
    7367      }
    7468    }
    7569
    76     public void PublishDataToNetwork(byte[] data) {
     70    public void PublishDataToNetwork(ByteArrayWrapper data) {
    7771      try {
    7872        var allPeers = peerListManager.GetPeerList();
     
    8074          try {
    8175            sender.SendData(peer, data);
    82           }
    83           catch (Exception ex) {
     76          } catch (Exception ex) {
    8477            AddError("PeerNetworkMessageHandler.PublishDataToNetwork(during sending to one peer!)", ex);
    8578          }
    8679        }
    8780
    88       }
    89       catch (Exception ex) {
     81      } catch (Exception ex) {
    9082        AddError("PeerNetworkMessageHandler.PublishDataToNetwork", ex);
    9183      }
    9284    }
    9385
    94     public List<KeyValuePair<PeerInfo, byte[]>> GetDataFromNetwork() {
     86    public List<KeyValuePair<PeerInfo, ByteArrayWrapper>> GetDataFromNetwork() {
    9587      try {
    96         List<KeyValuePair<PeerInfo, byte[]>> res = new List<KeyValuePair<PeerInfo, byte[]>>();
    97         KeyValuePair<PeerInfo, byte[]> item;
     88        List<KeyValuePair<PeerInfo, ByteArrayWrapper>> res = new List<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
     89        KeyValuePair<PeerInfo, ByteArrayWrapper> item;
    9890        lock (activeQueueLocker) {
    9991          //changing the current queue for storing items to send
     
    111103        }
    112104        return res;//.ToArray();
    113       }
    114       catch (Exception ex) {
     105      } catch (Exception ex) {
    115106        AddError("PeerNetworkMessageHandler.GetDataFromNetwork", ex);
    116107        return null;
     
    142133          .Select(ip => ip.ToString())
    143134          .First(str => str.StartsWith(ipPrefix));
    144       }
    145       catch { return null; }
     135      } catch { return null; }
    146136    }
    147137
     
    149139      if (e != null && e.Sender.ProblemInstance.Equals(ownInstance.ProblemInstance)) {
    150140        lock (activeQueueLocker) {
    151           writeQueue.Enqueue(new KeyValuePair<PeerInfo, byte[]>(e.Sender, e.data));
     141          writeQueue.Enqueue(new KeyValuePair<PeerInfo, ByteArrayWrapper>(e.Sender, ByteArrayWrapper.CreateByteArrayWrapper(e.data)));
    152142        }
    153143
     
    160150        try {
    161151          peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message));
    162         }
    163         catch { }
     152        } catch { }
    164153      }
    165154    }
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs

    r14252 r14253  
    11using System;
     2using System.CodeDom;
    23using System.Collections.Concurrent;
    34using System.Collections.Generic;
     5using System.Linq;
    46using System.ServiceModel;
    57using System.Threading.Tasks;
     
    1517
    1618    //providing two queues for faster access
    17     private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesWrite;
    18     private SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>> bufferedMessagesRead;
     19    private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> bufferedMessagesWrite;
     20    private SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>> bufferedMessagesRead;
    1921
    2022    private Timer timer; //sends cached messages to network in background
    2123
    2224    private Object timerLock = new Object();
     25    private bool isActive = false;
    2326
    2427    public void Init(PeerInfo source, int messageCacheCapacity) {
    2528      myself = source;
    26       bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
     29      bufferedMessagesRead = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
    2730      bufferedMessagesRead.Limit = messageCacheCapacity;
    28       bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, byte[]>>();
     31      bufferedMessagesWrite = new SizedConcurrentQueue<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
    2932      bufferedMessagesWrite.Limit = bufferedMessagesRead.Limit;
    3033      timer = new Timer(1000 * 10); //each 10 seconds
     
    3336    }
    3437
    35     public void SendData(PeerInfo destination, byte[] data) {
    36       bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, byte[]>(destination, data));
     38    public void SendData(PeerInfo destination, ByteArrayWrapper data) {
     39      bufferedMessagesWrite.Enqueue(new KeyValuePair<PeerInfo, ByteArrayWrapper>(destination, data));
    3740    }
    3841
     
    4447
    4548    private void GenerateSendingTasks(object sender, ElapsedEventArgs e) {
     49      if (isActive) return;
    4650      lock (timerLock) {
    47         //changing the queues...
    48         var tmp = bufferedMessagesRead;
    49         bufferedMessagesRead = bufferedMessagesWrite;
    50         bufferedMessagesWrite = tmp;
    51         List<KeyValuePair<PeerInfo, byte[]>> messages = new List<KeyValuePair<PeerInfo, byte[]>>();
    52         while (!bufferedMessagesRead.IsEmpty) {
    53           KeyValuePair<PeerInfo, byte[]> message;
    54           if (bufferedMessagesRead.TryDequeue(out message)) {
    55             messages.Add(message);
    56             //Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning);
     51        try {
     52          isActive = true;
     53          //changing the queues...
     54          var tmp = bufferedMessagesRead;
     55          bufferedMessagesRead = bufferedMessagesWrite;
     56          bufferedMessagesWrite = tmp;
     57          List<KeyValuePair<PeerInfo, ByteArrayWrapper>> messages = new List<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
     58          while (!bufferedMessagesRead.IsEmpty) {
     59            KeyValuePair<PeerInfo, ByteArrayWrapper> message;
     60            if (bufferedMessagesRead.TryDequeue(out message)) {
     61              messages.Add(message);
     62              //Task.Factory.StartNew(() => SendDataFromQueue(message.Key, message.Value), TaskCreationOptions.LongRunning);
     63            }
    5764          }
    58         }
    59         //now: merge them and start sending tasks
     65          //now: merge them and start sending tasks
    6066
    61         List<KeyValuePair<PeerInfo, byte[][]>> mergedMessages = MergeMessages(messages);
    62         foreach (var item in mergedMessages) {
    63           Task.Factory.StartNew(() => SendDataFromQueue(item.Key, item.Value), TaskCreationOptions.LongRunning);
     67          List<KeyValuePair<PeerInfo, ByteArrayWrapper[]>> mergedMessages = MergeMessages(messages);
     68
     69          var runningTasks = new List<Task>();
     70          foreach (var item in mergedMessages) {
     71            var itemToSend = item;
     72            var t = Task.Factory.StartNew(() => SendDataFromQueue(itemToSend.Key, itemToSend.Value),
     73              TaskCreationOptions.LongRunning);
     74            runningTasks.Add(t);
     75          }
     76
     77          Task.WaitAll(runningTasks.ToArray());
     78        } finally {
     79          isActive = false;
    6480        }
    6581      }
     
    7187    /// <param name="messages">a list with multiple messages to the same  destination</param>
    7288    /// <returns></returns>
    73     private List<KeyValuePair<PeerInfo, byte[][]>> MergeMessages(List<KeyValuePair<PeerInfo, byte[]>> messages) {
    74       List<KeyValuePair<PeerInfo, byte[][]>> res = new List<KeyValuePair<PeerInfo, byte[][]>>();
    75       Dictionary<PeerInfo, List<byte[]>> cache = new Dictionary<PeerInfo, List<byte[]>>();
     89    private List<KeyValuePair<PeerInfo, ByteArrayWrapper[]>> MergeMessages(List<KeyValuePair<PeerInfo, ByteArrayWrapper>> messages) {
     90      Dictionary<PeerInfo, List<ByteArrayWrapper>> cache = new Dictionary<PeerInfo, List<ByteArrayWrapper>>();
    7691      foreach (var messagePackage in messages) {
    7792        if (!cache.ContainsKey(messagePackage.Key)) {
    78           cache.Add(messagePackage.Key, new List<byte[]>());
     93          cache.Add(messagePackage.Key, new List<ByteArrayWrapper>());
    7994        }
    8095        //for (int i = 0; i <= messagePackage.Value.GetUpperBound(0); i++) {
     
    8297        //}
    8398      }
    84       //now we have a dictionary with all messages per destionation
    85       //so: create a byte[][] again and return
    86       foreach (var dest in cache.Keys) {
    87         byte[][] messagesPerDest = new byte[cache[dest].Count][];
    88         for (int i = 0; i <= messagesPerDest.GetUpperBound(0); i++) {
    89           messagesPerDest[i] = cache[dest][i];
    90         }
    91         res.Add(new KeyValuePair<PeerInfo, byte[][]>(dest, messagesPerDest));
    92       }
    93       return res;
     99
     100      return cache.Keys.Select(dest => new KeyValuePair<PeerInfo, ByteArrayWrapper[]>(dest, cache[dest].ToArray())).ToList();
    94101    }
    95102
     
    99106    /// <param name="destination">destination peer</param>
    100107    /// <param name="data">data to send</param>
    101     private void SendDataFromQueue(PeerInfo destination, byte[][] data) {
     108    private void SendDataFromQueue(PeerInfo destination, ByteArrayWrapper[] data) {
    102109      try {
    103110        Console.WriteLine(string.Format("Sending data to {0}:{1} in the background...", destination.IpAddress, destination.Port));
     
    117124        using (var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint)) {
    118125          using (IClientChannel client = (IClientChannel)myChannelFactory.CreateChannel()) {
    119             for (int i = 0; i < data.GetUpperBound(0) + 1; i++) {
    120               ((IMessageContract)client).SendData(myself, data[i]); //maybe exception...
     126            for (int i = 0; i < data.GetLength(0); i++) {
     127              ((IMessageContract)client).SendData(myself, data[i].Array); //maybe exception...
    121128            }
    122129          }
    123130        }
    124       }
    125       catch (Exception ex) {
     131      } catch (Exception ex) {
    126132        //ignore
    127133        Console.WriteLine(ex.Message);
  • branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfPeerListManager.cs

    r14252 r14253  
    5858    }
    5959
    60     private List<PeerInfo> ChoosePeersForMessaging(ref List<PeerInfo> allPeers) {
     60    private List<PeerInfo> ChoosePeersForMessaging(List<PeerInfo> allPeers) {
    6161      Shuffle<PeerInfo>(allPeers);
    6262      int toTake = Convert.ToInt32(allPeers.Count * communicationRate) + 1;
     
    6464        toTake = 1;
    6565      }
    66       return allPeers.Take(toTake).ToList(); ;
     66      return allPeers.Take(toTake).ToList();
    6767    }
    6868
     
    8989            }
    9090          }
    91           cachedPeerList = ChoosePeersForMessaging(ref allPeers);
     91          cachedPeerList = ChoosePeersForMessaging(allPeers);
    9292        }
    9393        catch { } //nothing to do
  • branches/thasling/DistributedGA/DistributedGA.Core/Interface/IMessageHandler.cs

    r13972 r14253  
    22using System.Collections.Generic;
    33using DistributedGA.Core.Domain;
     4using DistributedGA.Core.Util;
    45
    56namespace DistributedGA.Core.Interface {
     
    910    void Init(string lanIpPrefix, string contactServerUrl, string problemInstance, int messageCacheCapacty, double communicationRate);
    1011
    11     void PublishDataToNetwork(byte[] data);
     12    void PublishDataToNetwork(ByteArrayWrapper data);
    1213
    13     List<KeyValuePair<PeerInfo, byte[]>> GetDataFromNetwork();
     14    List<KeyValuePair<PeerInfo, ByteArrayWrapper>> GetDataFromNetwork();
    1415
    1516    PeerInfo GetPeerInfo();
  • branches/thasling/DistributedGA/DistributedGA.Core/Interface/IMessageSender.cs

    r13972 r14253  
    11using DistributedGA.Core.Domain;
     2using DistributedGA.Core.Util;
    23
    34namespace DistributedGA.Core.Interface {
     
    78    void Init(PeerInfo source, int messageCacheCapacity);
    89
    9     void SendData(PeerInfo destination, byte[] data);
     10    void SendData(PeerInfo destination, ByteArrayWrapper data);
    1011
    1112    void Dispose();
  • branches/thasling/DistributedGA/DistributedGA.Hive/P2PMigrationAnalyzer.cs

    r14252 r14253  
    2828using DistributedGA.Core.Implementation;
    2929using DistributedGA.Core.Interface;
     30using DistributedGA.Core.Util;
    3031using DistributedGA.Hive;
    3132using HeuristicLab.Common;
     
    119120      Parameters.Add(new ValueParameter<StringValue>("JobGUID", "", new StringValue(Guid.NewGuid().ToString())));
    120121      Parameters.Add(new ValueParameter<ILog>("Log", "The log", new Log(1000)));
     122      Parameters.Add(new ValueParameter<IntValue>("ByteArraysAllocated", new IntValue(0)));
     123      Parameters.Add(new ValueParameter<IntValue>("ByteArraysFreed", new IntValue(0)));
     124      Parameters.Add(new ValueParameter<IntValue>("ByteArraysAlive", new IntValue(0)));
    121125
    122126      var validValues = new ItemSet<EnumValue<MigrationStrategy>>();
     
    145149      var communicationRate = ((PercentValue)Parameters["CommunicationRate"].ActualValue).Value;
    146150      var messageCacheCapacity = ((IntValue)Parameters["MessageCacheCapacity"].ActualValue).Value;
    147       h.Init(lanIpPrefix, contactServerUri, problemInstance, (int)(100 * messageCacheCapacity), (int)(100 * communicationRate));
     151      h.Init(lanIpPrefix, contactServerUri, problemInstance, messageCacheCapacity, (int)(100 * communicationRate));
    148152      h.ExceptionOccurend += ExceptionThrown;
    149153    }
     
    221225              HeuristicLab.Persistence.Default.Xml.XmlGenerator.Serialize(msgScope, stream);
    222226              message = stream.GetBuffer();
    223               h.PublishDataToNetwork(message);
     227              h.PublishDataToNetwork(ByteArrayWrapper.CreateByteArrayWrapper(message));
    224228            }
    225229          }
     
    230234          // recieve
    231235          var message = h.GetDataFromNetwork();
    232           List<KeyValuePair<PeerInfo, byte[]>> immigrants = new List<KeyValuePair<PeerInfo, byte[]>>();
     236          List<KeyValuePair<PeerInfo, ByteArrayWrapper>> immigrants = new List<KeyValuePair<PeerInfo, ByteArrayWrapper>>();
    233237          //limit number of immigrants to use
    234238          if (noOfEmigrants < message.Count) {
     
    264268          var qualityTranslatedName = QualityParameter.TranslatedName;
    265269          foreach (var msg in immigrants) {
    266             using (var stream = new MemoryStream(msg.Value)) {
     270            using (var stream = new MemoryStream(msg.Value.Array)) {
    267271              var immigrantScope = HeuristicLab.Persistence.Default.Xml.XmlParser.Deserialize<IScope>(stream);
    268272
     
    290294
    291295      MigrationIterations.Value++;
     296
     297      ((IValueParameter<IntValue>)Parameters["ByteArraysAllocated"]).Value.Value = ByteArrayWrapper.AllocatedCounter;
     298      ((IValueParameter<IntValue>)Parameters["ByteArraysFreed"]).Value.Value = ByteArrayWrapper.FreedCounter;
     299      ((IValueParameter<IntValue>)Parameters["ByteArraysAlive"]).Value.Value = ByteArrayWrapper.AliveCounter;
     300
    292301      return base.Apply();
    293302    }
  • branches/thasling/DistributedGA/DistributedGA.sln

    r14252 r14253  
    11
    22Microsoft Visual Studio Solution File, Format Version 12.00
    3 # Visual Studio 2013
    4 VisualStudioVersion = 12.0.21005.1
    5 MinimumVisualStudioVersion = 10.0.40219.1
     3# Visual Studio 2012
    64Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DistributedGA.Test", "DistributedGA.Test\DistributedGA.Test.csproj", "{E0E91C06-C56A-454F-9F7C-3FA7AE7F920E}"
    75EndProject
     
    5048    HideSolutionNode = FALSE
    5149  EndGlobalSection
    52   GlobalSection(Performance) = preSolution
    53     HasPerformanceSessions = true
    54   EndGlobalSection
    5550EndGlobal
Note: See TracChangeset for help on using the changeset viewer.