- Timestamp:
- 06/08/16 17:02:06 (8 years ago)
- Location:
- branches/thasling/DistributedGA/DistributedGA.Core
- Files:
-
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/thasling/DistributedGA/DistributedGA.Core
-
Property
svn:global-ignores
set to
obj
-
Property
svn:global-ignores
set to
-
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/MessageContractImpl.cs
r13553 r13887 4 4 using DistributedGA.Core.Interface; 5 5 6 namespace DistributedGA.Core.Implementation { 7 [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] 8 public class MessageContractImpl : IMessageContract { 9 public void SendData(PeerInfo sender, SolutionInfo[] data) { 10 MessageRecieveEventArgs args = new MessageRecieveEventArgs() { Sender = sender, data = data }; 6 namespace DistributedGA.Core.Implementation 7 { 8 [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] 9 public class MessageContractImpl : IMessageContract 10 { 11 public void SendData(PeerInfo sender, byte[][] data) 12 { 13 MessageRecieveEventArgs args = new MessageRecieveEventArgs() { Sender = sender, data = data }; 11 14 12 if (MessageRecieved != null) { 13 MessageRecieved(this, args); 14 } 15 if (MessageRecieved != null) 16 { 17 MessageRecieved(this, args); 18 } 19 } 20 21 public event EventHandler<MessageRecieveEventArgs> MessageRecieved; 22 15 23 } 16 17 public event EventHandler<MessageRecieveEventArgs> MessageRecieved;18 19 }20 24 } -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/PeerNetworkMessageHandler.cs
r13556 r13887 7 7 using DistributedGA.Core.Interface; 8 8 9 namespace DistributedGA.Core.Implementation { 10 public class PeerNetworkMessageHandler : IMessageHandler { 11 //own peer-instance Information 12 private PeerInfo ownInstance = null; 9 namespace DistributedGA.Core.Implementation 10 { 11 public class PeerNetworkMessageHandler : IMessageHandler 12 { 13 //own peer-instance Information 14 private PeerInfo ownInstance = null; 13 15 14 //uses peer-list from IPeerListManager to decide which peers to contact15 private IPeerListManager peerListManager;16 //uses peer-list from IPeerListManager to decide which peers to contact 17 private IPeerListManager peerListManager; 16 18 17 //uses IMessageSender to send populations to peers18 private IMessageSender sender = null;19 //uses IMessageSender to send populations to peers 20 private IMessageSender sender = null; 19 21 20 //provides current population for the higher layer IMigrationOperator21 //to queues are used to gather and and provide population more efficiently22 private object activeQueueLocker = new object();23 private ConcurrentQueue<SolutionInfo> writeQueue;24 private ConcurrentQueue<SolutionInfo> readQueue;22 //provides current population for the higher layer IMigrationOperator 23 //to queues are used to gather and and provide population more efficiently 24 private object activeQueueLocker = new object(); 25 private ConcurrentQueue<byte[]> writeQueue; 26 private ConcurrentQueue<byte[]> readQueue; 25 27 26 //uses IMessageService for recieving population from one peer at once27 private IMessageService host = null;28 //uses IMessageService for recieving population from one peer at once 29 private IMessageService host = null; 28 30 29 31 30 32 31 public void Init(string lanIpPrefix, string contactServerUrl) { 32 try { 33 ownInstance = new PeerInfo() { 34 IpAddress = GetExternalIpAddress(lanIpPrefix), 35 Port = 0, 36 ProblemInstance = "TestProblem" 37 }; // TODO: get own peerinfo 33 public void Init(string lanIpPrefix, string contactServerUrl) 34 { 35 try 36 { 37 ownInstance = new PeerInfo() 38 { 39 IpAddress = GetInternalIpAddress(lanIpPrefix), 40 Port = 0, 41 ProblemInstance = "TestProblem" 42 }; // TODO: get own peerinfo 38 43 39 writeQueue = new ConcurrentQueue<SolutionInfo>();40 readQueue = new ConcurrentQueue<SolutionInfo>();44 writeQueue = new ConcurrentQueue<byte[]>(); 45 readQueue = new ConcurrentQueue<byte[]>(); 41 46 42 host = new WcfMessageService();43 ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet44 host.OnDataRecieved += new EventHandler<MessageRecieveEventArgs>(OnPopulationRecieved);47 host = new WcfMessageService(); 48 ownInstance.Port = host.Init(ownInstance.IpAddress); //getting port, on which service is hostet 49 host.OnDataRecieved += new EventHandler<MessageRecieveEventArgs>(OnPopulationRecieved); 45 50 46 peerListManager = new WcfPeerListManager();47 peerListManager.Init(ownInstance, contactServerUrl);51 peerListManager = new WcfPeerListManager(); 52 peerListManager.Init(ownInstance, contactServerUrl); 48 53 49 sender = new WcfMessageSender();50 sender.Init(ownInstance);54 sender = new WcfMessageSender(); 55 sender.Init(ownInstance); 51 56 52 } catch (Exception ex) { 53 AddError("PeerNetworkMessageHandler.Init", ex); 54 } 55 } 56 57 public void Dispose() { 58 try { 59 host.Dispose(); 60 } catch (Exception ex) { 61 AddError("PeerNetworkMessageHandler.Dispose", ex); 62 } 63 } 64 65 public void PublishDataToNetwork(SolutionInfo[] data) { 66 try { 67 foreach (PeerInfo peer in peerListManager.GetPeerList()) { 68 //HACK: manipulate for monitoring in test 69 foreach (SolutionInfo si in data) { 70 si.IterationNumber = ownInstance.Port; 71 } 72 //maybe something will go wrong during network communication... 73 try { 74 sender.SendData(peer, data); 75 } catch (Exception ex) { 76 AddError("PeerNetworkMessageHandler.PublishMigrationInfo(during sending to one peer!)", ex); 77 } 78 } 79 } catch (Exception ex) { 80 AddError("PeerNetworkMessageHandler.PublishMigrationInfo", ex); 81 } 82 } 83 84 public SolutionInfo[] GetDataFromNetwork() { 85 try { 86 List<SolutionInfo> res = new List<SolutionInfo>(); 87 SolutionInfo item = null; 88 lock (activeQueueLocker) { 89 var tmp = readQueue; 90 readQueue = writeQueue; 91 writeQueue = tmp; 57 } 58 catch (Exception ex) 59 { 60 AddError("PeerNetworkMessageHandler.Init", ex); 61 } 92 62 } 93 63 94 //creating resultset 95 while (!readQueue.IsEmpty) { 96 if (readQueue.TryDequeue(out item)) { 97 res.Add(item); 98 } 64 public void Dispose() 65 { 66 try 67 { 68 host.Dispose(); 69 } 70 catch (Exception ex) 71 { 72 AddError("PeerNetworkMessageHandler.Dispose", ex); 73 } 99 74 } 100 return res.ToArray(); 101 } catch (Exception ex) { 102 AddError("PeerNetworkMessageHandler.GetMigrationInfo", ex); 103 return null; 104 } 75 76 public void PublishDataToNetwork(byte[][] data) 77 { 78 try 79 { 80 foreach (PeerInfo peer in peerListManager.GetPeerList()) 81 { 82 //maybe something will go wrong during network communication... 83 try 84 { 85 sender.SendData(peer, data); 86 } 87 catch (Exception ex) 88 { 89 AddError("PeerNetworkMessageHandler.PublishDataToNetwork(during sending to one peer!)", ex); 90 } 91 } 92 } 93 catch (Exception ex) 94 { 95 AddError("PeerNetworkMessageHandler.PublishDataToNetwork", ex); 96 } 97 } 98 99 public byte[][] GetDataFromNetwork() 100 { 101 try 102 { 103 List<byte[]> res = new List<byte[]>(); 104 byte[] item = null; 105 lock (activeQueueLocker) 106 { 107 var tmp = readQueue; 108 readQueue = writeQueue; 109 writeQueue = tmp; 110 } 111 112 //creating resultset 113 while (!readQueue.IsEmpty) 114 { 115 if (readQueue.TryDequeue(out item)) 116 { 117 res.Add(item); 118 } 119 } 120 return res.ToArray(); 121 } 122 catch (Exception ex) 123 { 124 AddError("PeerNetworkMessageHandler.GetDataFromNetwork", ex); 125 return null; 126 } 127 } 128 129 public PeerInfo GetPeerInfo() 130 { 131 return ownInstance; 132 } 133 134 public List<PeerInfo> GetCurrentNetwork() 135 { 136 return peerListManager.GetPeerList(); 137 } 138 139 private string GetInternalIpAddress(string ipPrefix) 140 { 141 try 142 { 143 var strHostName = Dns.GetHostName(); 144 // Then using host name, get the IP address list.. 145 IPHostEntry ipEntry = Dns.GetHostEntry(strHostName); 146 IPAddress[] addr = ipEntry.AddressList; 147 148 return addr 149 .Select(ip => ip.ToString()) 150 .First(str => str.StartsWith(ipPrefix)); 151 } 152 catch { return null; } 153 } 154 155 private void OnPopulationRecieved(object sender, MessageRecieveEventArgs e) 156 { 157 if (e != null) 158 { 159 lock (activeQueueLocker) 160 { 161 foreach (byte[] item in e.data) 162 { 163 writeQueue.Enqueue(item); 164 } 165 } 166 } 167 } 168 169 private void AddError(string source, Exception ex) 170 { 171 if (peerListManager != null) 172 { 173 try 174 { 175 peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message)); 176 } 177 catch { } 178 } 179 } 180 105 181 } 106 107 public PeerInfo GetPeerInfo() {108 return ownInstance;109 }110 111 public List<PeerInfo> GetCurrentNetwork() {112 return peerListManager.GetPeerList();113 }114 115 private string GetExternalIpAddress(string ipPrefix) {116 try {117 var strHostName = Dns.GetHostName();118 // Then using host name, get the IP address list..119 IPHostEntry ipEntry = Dns.GetHostEntry(strHostName);120 IPAddress[] addr = ipEntry.AddressList;121 122 return addr123 .Select(ip => ip.ToString())124 .First(str => str.StartsWith(ipPrefix));125 } catch { return null; }126 }127 128 private void OnPopulationRecieved(object sender, MessageRecieveEventArgs e) {129 if (e != null) {130 lock (activeQueueLocker) {131 foreach (SolutionInfo si in e.data) {132 writeQueue.Enqueue(si);133 }134 }135 }136 }137 138 private void AddError(string source, Exception ex) {139 if (peerListManager != null) {140 try {141 peerListManager.SendLogToServer(string.Concat("Source: ", source, ", Exception: ", ex.Message));142 } catch { }143 }144 }145 146 }147 182 } -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/TestPeerListManager.cs
r13556 r13887 3 3 using DistributedGA.Core.Interface; 4 4 5 namespace DistributedGA.Core.Implementation { 6 public class TestPeerListManager : IPeerListManager { 5 namespace DistributedGA.Core.Implementation 6 { 7 public class TestPeerListManager : IPeerListManager 8 { 7 9 8 10 9 public List<PeerInfo> GetPeerList() { 10 PeerInfo pi1 = new PeerInfo() { IpAddress = "localhost", Port = 3030 }; 11 PeerInfo pi2 = new PeerInfo() { IpAddress = "localhost", Port = 3031 }; 12 PeerInfo pi3 = new PeerInfo() { IpAddress = "localhost", Port = 3032 }; 13 PeerInfo pi4 = new PeerInfo() { IpAddress = "localhost", Port = 3033 }; 14 PeerInfo pi5 = new PeerInfo() { IpAddress = "localhost", Port = 3034 }; 15 return new List<PeerInfo>() { pi1, pi2, pi3, pi4, pi5 }; 11 public List<PeerInfo> GetPeerList() 12 { 13 PeerInfo pi1 = new PeerInfo() { IpAddress = "localhost", Port = 3030 }; 14 PeerInfo pi2 = new PeerInfo() { IpAddress = "localhost", Port = 3031 }; 15 PeerInfo pi3 = new PeerInfo() { IpAddress = "localhost", Port = 3032 }; 16 PeerInfo pi4 = new PeerInfo() { IpAddress = "localhost", Port = 3033 }; 17 PeerInfo pi5 = new PeerInfo() { IpAddress = "localhost", Port = 3034 }; 18 return new List<PeerInfo>() { pi1, pi2, pi3, pi4, pi5 }; 19 } 20 21 public void SendLogToServer(string msg) 22 { 23 24 } 25 26 public void Init(PeerInfo source, string contactServerUrl) 27 { 28 29 } 16 30 } 17 18 public void SendLogToServer(string msg) {19 20 }21 22 public void Init(PeerInfo source, string contactServerUrl) {23 24 }25 }26 31 } -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageSender.cs
r13557 r13887 4 4 using DistributedGA.Core.Interface; 5 5 6 namespace DistributedGA.Core.Implementation { 7 public class WcfMessageSender : IMessageSender { 8 private PeerInfo myself; 6 namespace DistributedGA.Core.Implementation 7 { 8 public class WcfMessageSender : IMessageSender 9 { 10 private PeerInfo myself; 9 11 10 public void Init(PeerInfo source) { 11 myself = source; 12 public void Init(PeerInfo source) 13 { 14 myself = source; 15 } 16 17 public void SendData(PeerInfo destination, byte[][] data) 18 { 19 var client = CreateServerClient(destination.IpAddress, destination.Port); 20 client.SendData(myself, data); //maybe timout exception... 21 } 22 23 private IMessageContract CreateServerClient(string ip, int port) 24 { 25 var serviceUrl = "DistributedGA.svc"; 26 var baseUri = new Uri(string.Concat("net.tcp://", ip, ":", port, "/DistributedGA")); 27 var serviceUri = new Uri(baseUri, serviceUrl); 28 29 var binding = new NetTcpBinding(); 30 var endpoint = new EndpointAddress(serviceUri); 31 var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint); 32 33 IMessageContract client = null; 34 client = myChannelFactory.CreateChannel(); 35 return client; 36 } 37 12 38 } 13 14 public void SendData(PeerInfo destination, SolutionInfo[] data) {15 var client = CreateServerClient(destination.IpAddress, destination.Port);16 client.SendData(myself, data); //maybe timout exception...17 }18 19 private IMessageContract CreateServerClient(string ip, int port) {20 var serviceUrl = "DistributedGA.svc";21 var baseUri = new Uri(string.Concat("net.tcp://", ip, ":", port, "/DistributedGA"));22 var serviceUri = new Uri(baseUri, serviceUrl);23 24 var binding = new NetTcpBinding();25 var endpoint = new EndpointAddress(serviceUri);26 var myChannelFactory = new ChannelFactory<IMessageContract>(binding, endpoint);27 28 IMessageContract client = null;29 client = myChannelFactory.CreateChannel();30 return client;31 }32 33 }34 39 } -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfMessageService.cs
r13557 r13887 7 7 using DistributedGA.Core.Interface; 8 8 9 namespace DistributedGA.Core.Implementation { 10 public class WcfMessageService : IMessageService { 11 public event EventHandler<MessageRecieveEventArgs> OnDataRecieved; 9 namespace DistributedGA.Core.Implementation 10 { 11 public class WcfMessageService : IMessageService 12 { 13 public event EventHandler<MessageRecieveEventArgs> OnDataRecieved; 12 14 13 private static ManualResetEvent _ResetEvent = new ManualResetEvent(false);15 private static ManualResetEvent _ResetEvent = new ManualResetEvent(false); 14 16 15 private IMessageContract messageContract = null;17 private IMessageContract messageContract = null; 16 18 17 public int Init(string ip) { 18 int port = 0; 19 port = FreeTcpPort(); 19 public int Init(string ip) 20 { 21 int port = 0; 22 port = FreeTcpPort(); 20 23 21 messageContract = new MessageContractImpl();22 messageContract.MessageRecieved += new EventHandler<MessageRecieveEventArgs>(OnMessageRecieved);24 messageContract = new MessageContractImpl(); 25 messageContract.MessageRecieved += new EventHandler<MessageRecieveEventArgs>(OnMessageRecieved); 23 26 24 var serviceUrl = "DistributedGA.svc"; 25 new Thread(() => { 26 var baseUri = new Uri(string.Concat("net.tcp://", ip, ":", port, "/DistributedGA")); 27 var serviceUri = new Uri(baseUri, serviceUrl); 28 NetTcpBinding binding = new NetTcpBinding(); 29 //using (var host = new ServiceHost(typeof(MessageContractImpl), serviceUri)) 27 var serviceUrl = "DistributedGA.svc"; 28 new Thread(() => 29 { 30 var baseUri = new Uri(string.Concat("net.tcp://", ip, ":", port, "/DistributedGA")); 31 var serviceUri = new Uri(baseUri, serviceUrl); 32 NetTcpBinding binding = new NetTcpBinding(); 33 //using (var host = new ServiceHost(typeof(MessageContractImpl), serviceUri)) 30 34 31 using (var host = new ServiceHost(messageContract, serviceUri)) { 32 host.AddServiceEndpoint(typeof(IMessageContract), binding, serviceUri); 35 using (var host = new ServiceHost(messageContract, serviceUri)) 36 { 37 host.AddServiceEndpoint(typeof(IMessageContract), binding, serviceUri); 33 38 34 host.Open();39 host.Open(); 35 40 36 _ResetEvent.WaitOne(); 41 _ResetEvent.WaitOne(); 42 } 43 }).Start(); 44 //close service again: 45 // _ResetEvent.Set(); 46 return port; 37 47 } 38 }).Start(); 39 //close service again: 40 // _ResetEvent.Set(); 41 return port; 48 49 public void Dispose() 50 { 51 _ResetEvent.Set(); 52 } 53 54 private int FreeTcpPort() 55 { 56 TcpListener l = new TcpListener(IPAddress.Loopback, 0); 57 l.Start(); 58 int port = ((IPEndPoint)l.LocalEndpoint).Port; 59 l.Stop(); 60 return port; 61 } 62 63 private void OnMessageRecieved(object sender, MessageRecieveEventArgs e) 64 { 65 if (OnDataRecieved != null) 66 { 67 OnDataRecieved(this, e); 68 } 69 } 42 70 } 43 44 public void Dispose() {45 _ResetEvent.Set();46 }47 48 private int FreeTcpPort() {49 TcpListener l = new TcpListener(IPAddress.Loopback, 0);50 l.Start();51 int port = ((IPEndPoint)l.LocalEndpoint).Port;52 l.Stop();53 return port;54 }55 56 private void OnMessageRecieved(object sender, MessageRecieveEventArgs e) {57 if (OnDataRecieved != null) {58 OnDataRecieved(this, e);59 }60 }61 }62 71 } -
branches/thasling/DistributedGA/DistributedGA.Core/Implementation/WcfPeerListManager.cs
r13557 r13887 5 5 using DistributedGA.Core.Domain; 6 6 using DistributedGA.Core.Interface; 7 using System.Timers; 7 8 8 namespace DistributedGA.Core.Implementation { 9 public class WcfPeerListManager : IPeerListManager { 10 private string serverString = null; 9 namespace DistributedGA.Core.Implementation 10 { 11 public class WcfPeerListManager : IPeerListManager 12 { 13 private string serverString = null; 11 14 12 private IContactService client = null;15 private IContactService client = null; 13 16 14 private PeerInfo myself= null;17 private IContactService heartbeatClient = null; 15 18 16 public void Init(PeerInfo source, string contactServerUrl) { 17 serverString = contactServerUrl; 18 client = CreateClient(); 19 myself = source; 20 client.RegisterPeer(source); 19 private PeerInfo myself = null; 20 21 private Timer timer = null; //sends heartbeat to contact-server 22 23 public void Init(PeerInfo source, string contactServerUrl) 24 { 25 serverString = contactServerUrl; 26 client = CreateClient(); 27 heartbeatClient = CreateClient(); 28 myself = source; 29 client.RegisterPeer(source); 30 timer = new Timer(1000 * 60 * 5); //each 5 minutes 31 timer.Elapsed += SendHeartbeatToServer; 32 timer.Start(); 33 } 34 35 36 37 public List<PeerInfo> GetPeerList() 38 { 39 var allPeers = client.GetPeerList(myself); 40 var peersForMessaging = ChoosePeersForMessaging(allPeers); 41 //return peersForMessaging; 42 return allPeers; //TODO: Enable 10% list communication 43 } 44 45 public void SendLogToServer(string msg) 46 { 47 client.MakeLog(myself, msg); 48 } 49 50 private IContactService CreateClient() 51 { 52 var binding = new NetTcpBinding(); 53 var endpoint = new EndpointAddress(serverString); 54 var myChannelFactory = new ChannelFactory<IContactService>(binding, endpoint); 55 56 IContactService client = null; 57 client = myChannelFactory.CreateChannel(); 58 return client; 59 } 60 61 private List<PeerInfo> ChoosePeersForMessaging(List<PeerInfo> allPeers) 62 { 63 //communicate with 10% of the network 64 int noOfPeers = allPeers.Count / 10; 65 List<int> indexList = GetRandomItemIndexes(noOfPeers, 0, allPeers.Count - 1); 66 List<PeerInfo> res = new List<PeerInfo>(); 67 foreach (int index in indexList) 68 { 69 res.Add(allPeers.ElementAt(index)); 70 } 71 return res; 72 } 73 74 private List<int> GetRandomItemIndexes(int noOfItems, int minValue, int maxValue) 75 { 76 List<int> res = new List<int>(); 77 Random rnd = new Random(); 78 int tmp = -1; 79 while (res.Count < noOfItems) 80 { 81 tmp = rnd.Next(minValue, maxValue); 82 if (!res.Contains(tmp)) 83 { 84 res.Add(tmp); 85 } 86 } 87 return res; 88 } 89 90 private void SendHeartbeatToServer(object sender, ElapsedEventArgs e) 91 { 92 try 93 { 94 heartbeatClient.UpdateHeartbeat(myself); 95 } 96 catch { } //nothing to do, exception is raised when getting peer list 97 } 98 21 99 } 22 23 public List<PeerInfo> GetPeerList() {24 var allPeers = client.GetPeerList(myself);25 var peersForMessaging = ChoosePeersForMessaging(allPeers);26 //return peersForMessaging;27 return allPeers; //TODO: Enable 10% list communication28 }29 30 public void SendLogToServer(string msg) {31 client.MakeLog(myself, msg);32 }33 34 private IContactService CreateClient() {35 var binding = new NetTcpBinding();36 var endpoint = new EndpointAddress(serverString);37 var myChannelFactory = new ChannelFactory<IContactService>(binding, endpoint);38 39 IContactService client = null;40 client = myChannelFactory.CreateChannel();41 return client;42 }43 44 private List<PeerInfo> ChoosePeersForMessaging(List<PeerInfo> allPeers) {45 //communicate with 10% of the network46 int noOfPeers = allPeers.Count / 10;47 List<int> indexList = GetRandomItemIndexes(noOfPeers, 0, allPeers.Count - 1);48 List<PeerInfo> res = new List<PeerInfo>();49 foreach (int index in indexList) {50 res.Add(allPeers.ElementAt(index));51 }52 return allPeers;53 }54 55 private List<int> GetRandomItemIndexes(int noOfItems, int minValue, int maxValue) {56 List<int> res = new List<int>();57 Random rnd = new Random();58 int tmp = -1;59 while (res.Count < noOfItems) {60 tmp = rnd.Next(minValue, maxValue);61 if (!res.Contains(tmp)) {62 res.Add(tmp);63 }64 }65 return res;66 }67 68 }69 100 }
Note: See TracChangeset
for help on using the changeset viewer.