- Timestamp:
- 06/09/11 12:52:48 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/MPI/HeuristicLab.MPIAlgorithmRunner/3.3/AlgorithmBroker.cs
r6400 r6401 33 33 using System.Threading; 34 34 using System.Xml; 35 using System.IO; 36 using HeuristicLab.Persistence.Default.Xml; 35 37 36 38 namespace HeuristicLab.MPIAlgorithmRunner { 39 public class StreamingHelper { 40 public static IItem StreamItem(Stream stream) { 41 using (MemoryStream memoryStream = new MemoryStream()) { 42 byte[] buffer = new byte[256]; 43 int read; 44 while ((read = stream.Read(buffer, 0, buffer.Length)) != 0) 45 memoryStream.Write(buffer, 0, read); 46 47 using (MemoryStream itemStream = new MemoryStream(memoryStream.GetBuffer())) { 48 IItem result = XmlParser.Deserialize(itemStream) as IItem; 49 return result; 50 } 51 } 52 } 53 } 54 37 55 [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] 38 56 class AlgorithmBroker: IAlgorithmBroker { … … 54 72 endpoint.Name = "AlgorithmBrokerEndpoint"; 55 73 NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None); 74 netTCPBinding.TransferMode = TransferMode.Streamed; 56 75 netTCPBinding.MaxReceivedMessageSize = int.MaxValue; 57 XmlDictionaryReaderQuotas quotas = new XmlDictionaryReaderQuotas();58 quotas.MaxArrayLength = int.MaxValue;59 netTCPBinding.ReaderQuotas = quotas;60 netTCPBinding.SendTimeout = new TimeSpan(600000000);61 netTCPBinding.ReceiveTimeout = new TimeSpan(600000000);62 76 endpoint.Binding = netTCPBinding; 63 77 int port = FindFreeTcpPort(); … … 92 106 #region IAlgorithmBroker Members 93 107 94 public void TransmitAlgorithm(MPITransportWrapper<IAlgorithm> algorithm, int updateInterval) { 95 Console.WriteLine("Transmitting Algorithm..."); 108 public void SetUpdateInterval(int updateInterval) { 109 Console.WriteLine("Set update interval..."); 110 96 111 int clients = MPI.Communicator.world.Group.Size - 1; 97 98 for(int i = 0; i < clients; i++) { 112 for (int i = 0; i < clients; i++) { 99 113 int client = i + 1; 100 communicator.Send<MPITransportWrapper<IAlgorithm>>(algorithm, client, 0);101 114 communicator.Send<int>(updateInterval, client, 1); 102 115 } 103 116 } 104 117 118 public void TransmitAlgorithm(Stream stream) { 119 Console.WriteLine("Transmitting Algorithm..."); 120 121 IAlgorithm alg = StreamingHelper.StreamItem(stream) as IAlgorithm; 122 MPITransportWrapper<IAlgorithm> algorithm = new MPITransportWrapper<IAlgorithm>(alg); 123 124 int clients = MPI.Communicator.world.Group.Size - 1; 125 for(int i = 0; i < clients; i++) { 126 int client = i + 1; 127 communicator.Send<MPITransportWrapper<IAlgorithm>>(algorithm, client, 0); 128 } 129 } 130 131 public object resultLocker = new object(); 105 132 public ItemList<ResultCollection> Results { get; set; } 106 133 107 public MPITransportWrapper<ItemList<ResultCollection>>GetResults() {134 public Stream GetResults() { 108 135 Console.WriteLine("Transmitting Results..."); 109 return new MPITransportWrapper<ItemList<ResultCollection>>(Results); 136 MemoryStream stream = new MemoryStream(); 137 lock (resultLocker) { 138 XmlGenerator.Serialize(Results, stream); 139 } 140 stream = new MemoryStream((stream as MemoryStream).GetBuffer()); 141 142 return stream; 110 143 } 111 144
Note: See TracChangeset
for help on using the changeset viewer.