Index: /branches/thasling/DistributedGA/DistributedGA.Core/DistributedGA.Core.csproj
===================================================================
--- /branches/thasling/DistributedGA/DistributedGA.Core/DistributedGA.Core.csproj (revision 13936)
+++ /branches/thasling/DistributedGA/DistributedGA.Core/DistributedGA.Core.csproj (revision 13937)
@@ -64,4 +64,5 @@
+
Index: /branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs
===================================================================
--- /branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs (revision 13936)
+++ /branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs (revision 13937)
@@ -6,4 +6,5 @@
using DistributedGA.Core.Domain;
using DistributedGA.Core.Interface;
+using DistributedGA.Core.Util;
namespace DistributedGA.Core.Implementation {
@@ -21,6 +22,6 @@
//to queues are used to gather and and provide population more efficiently
private object activeQueueLocker = new object();
- private ConcurrentQueue writeQueue;
- private ConcurrentQueue readQueue;
+ private SizedConcurrentQueue writeQueue;
+ private SizedConcurrentQueue readQueue;
//uses IMessageService for recieving population from one peer at once
@@ -37,11 +38,12 @@
}; // TODO: get own peerinfo
- writeQueue = new ConcurrentQueue();
- readQueue = new ConcurrentQueue();
-
+ writeQueue = new SizedConcurrentQueue();
+ readQueue = new SizedConcurrentQueue();
+ writeQueue.Limit = 1000;
+ readQueue.Limit = writeQueue.Limit;
host = new WcfMessageService();
ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet
- host.OnDataRecieved += new EventHandler(OnPopulationRecieved);
+ host.OnDataRecieved += new EventHandler(OnDataRecieved);
peerListManager = new WcfPeerListManager();
@@ -131,5 +133,5 @@
}
- private void OnPopulationRecieved(object sender, MessageRecieveEventArgs e) {
+ private void OnDataRecieved(object sender, MessageRecieveEventArgs e) {
if (e != null) {
lock (activeQueueLocker) {
Index: /branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs
===================================================================
--- /branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs (revision 13936)
+++ /branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs (revision 13937)
@@ -7,4 +7,5 @@
using DistributedGA.Core.Domain;
using DistributedGA.Core.Interface;
+using DistributedGA.Core.Util;
namespace DistributedGA.Core.Implementation {
@@ -13,5 +14,5 @@
private PeerInfo myself;
- private ConcurrentQueue> bufferedMessages;
+ private SizedConcurrentQueue> bufferedMessages;
private Timer timer; //sends cached messages to network in background
@@ -19,5 +20,6 @@
public void Init(PeerInfo source) {
myself = source;
- bufferedMessages = new ConcurrentQueue>();
+ bufferedMessages = new SizedConcurrentQueue>();
+ bufferedMessages.Limit = 1000;
timer = new Timer(1000 * 60); //each 5 minutes
timer.Elapsed += GenerateSendingTasks;
Index: /branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfPeerListManager.cs
===================================================================
--- /branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfPeerListManager.cs (revision 13936)
+++ /branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfPeerListManager.cs (revision 13937)
@@ -60,4 +60,6 @@
((IClientChannel)heartbeatClient).Close();
myChannelFactory.Close();
+ client = null;
+ myChannelFactory = null;
}
Index: /branches/thasling/DistributedGA/DistributedGA.Core/Util/SizedConcurrentQueue.cs
===================================================================
--- /branches/thasling/DistributedGA/DistributedGA.Core/Util/SizedConcurrentQueue.cs (revision 13937)
+++ /branches/thasling/DistributedGA/DistributedGA.Core/Util/SizedConcurrentQueue.cs (revision 13937)
@@ -0,0 +1,23 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace DistributedGA.Core.Util {
+ public class SizedConcurrentQueue : ConcurrentQueue {
+ public int Limit { get; set; }
+ private readonly object syncObject = new object();
+
+ public new void Enqueue(T item) {
+ base.Enqueue(item);
+ lock (syncObject) {
+ T overflow;
+ while (base.Count > Limit) {
+ base.TryDequeue(out overflow);
+ }
+ }
+ }
+ }
+}