Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/09/11 12:52:48 (13 years ago)
Author:
svonolfe
Message:

Implemented result and algorithm streaming (#1542)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/MPI/HeuristicLab.MPIAlgorithmRunner/3.3/AlgorithmBroker.cs

    r6400 r6401  
    3333using System.Threading;
    3434using System.Xml;
     35using System.IO;
     36using HeuristicLab.Persistence.Default.Xml;
    3537                     
    3638namespace HeuristicLab.MPIAlgorithmRunner {
     39  public class StreamingHelper {
     40    public static IItem StreamItem(Stream stream) {
     41      using (MemoryStream memoryStream = new MemoryStream()) {
     42        byte[] buffer = new byte[256];
     43        int read;
     44        while ((read = stream.Read(buffer, 0, buffer.Length)) != 0)
     45          memoryStream.Write(buffer, 0, read);
     46
     47        using (MemoryStream itemStream = new MemoryStream(memoryStream.GetBuffer())) {
     48          IItem result = XmlParser.Deserialize(itemStream) as IItem;
     49          return result;
     50        }
     51      }
     52    }
     53  }
     54
    3755  [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
    3856  class AlgorithmBroker: IAlgorithmBroker {
     
    5472      endpoint.Name = "AlgorithmBrokerEndpoint";
    5573      NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None);
     74      netTCPBinding.TransferMode = TransferMode.Streamed;
    5675      netTCPBinding.MaxReceivedMessageSize = int.MaxValue;
    57       XmlDictionaryReaderQuotas quotas = new XmlDictionaryReaderQuotas();
    58       quotas.MaxArrayLength = int.MaxValue;
    59       netTCPBinding.ReaderQuotas = quotas;
    60       netTCPBinding.SendTimeout = new TimeSpan(600000000);
    61       netTCPBinding.ReceiveTimeout = new TimeSpan(600000000);
    6276      endpoint.Binding = netTCPBinding;
    6377      int port = FindFreeTcpPort();
     
    92106    #region IAlgorithmBroker Members
    93107
    94     public void TransmitAlgorithm(MPITransportWrapper<IAlgorithm> algorithm, int updateInterval) {
    95       Console.WriteLine("Transmitting Algorithm...");
     108    public void SetUpdateInterval(int updateInterval) {
     109      Console.WriteLine("Set update interval...");
     110
    96111      int clients = MPI.Communicator.world.Group.Size - 1;
    97 
    98       for(int i = 0; i < clients; i++) {
     112      for (int i = 0; i < clients; i++) {
    99113        int client = i + 1;
    100         communicator.Send<MPITransportWrapper<IAlgorithm>>(algorithm, client, 0);
    101114        communicator.Send<int>(updateInterval, client, 1);
    102115      }
    103116    }
    104117
     118    public void TransmitAlgorithm(Stream stream) {
     119      Console.WriteLine("Transmitting Algorithm...");
     120
     121      IAlgorithm alg = StreamingHelper.StreamItem(stream) as IAlgorithm;
     122      MPITransportWrapper<IAlgorithm> algorithm = new MPITransportWrapper<IAlgorithm>(alg);
     123
     124      int clients = MPI.Communicator.world.Group.Size - 1;
     125      for(int i = 0; i < clients; i++) {
     126        int client = i + 1;
     127        communicator.Send<MPITransportWrapper<IAlgorithm>>(algorithm, client, 0);
     128      }
     129    }
     130
     131    public object resultLocker = new object();
    105132    public ItemList<ResultCollection> Results { get; set; }
    106133
    107     public MPITransportWrapper<ItemList<ResultCollection>> GetResults() {
     134    public Stream GetResults() {
    108135      Console.WriteLine("Transmitting Results...");
    109       return new MPITransportWrapper<ItemList<ResultCollection>>(Results);
     136      MemoryStream stream = new MemoryStream();
     137      lock (resultLocker) {
     138        XmlGenerator.Serialize(Results, stream);
     139      }
     140      stream = new MemoryStream((stream as MemoryStream).GetBuffer());
     141
     142      return stream;
    110143    }
    111144
Note: See TracChangeset for help on using the changeset viewer.