Changeset 6401


Ignore:
Timestamp:
06/09/11 12:52:48 (9 years ago)
Author:
svonolfe
Message:

Implemented result and algorithm streaming (#1542)

Location:
branches/MPI
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • branches/MPI/HeuristicLab.MPIAlgorithmRunner/3.3/AlgorithmBroker.cs

    r6400 r6401  
    3333using System.Threading;
    3434using System.Xml;
     35using System.IO;
     36using HeuristicLab.Persistence.Default.Xml;
    3537                     
    3638namespace HeuristicLab.MPIAlgorithmRunner {
     39  public class StreamingHelper {
     40    public static IItem StreamItem(Stream stream) {
     41      using (MemoryStream memoryStream = new MemoryStream()) {
     42        byte[] buffer = new byte[256];
     43        int read;
     44        while ((read = stream.Read(buffer, 0, buffer.Length)) != 0)
     45          memoryStream.Write(buffer, 0, read);
     46
     47        using (MemoryStream itemStream = new MemoryStream(memoryStream.GetBuffer())) {
     48          IItem result = XmlParser.Deserialize(itemStream) as IItem;
     49          return result;
     50        }
     51      }
     52    }
     53  }
     54
    3755  [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
    3856  class AlgorithmBroker: IAlgorithmBroker {
     
    5472      endpoint.Name = "AlgorithmBrokerEndpoint";
    5573      NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None);
     74      netTCPBinding.TransferMode = TransferMode.Streamed;
    5675      netTCPBinding.MaxReceivedMessageSize = int.MaxValue;
    57       XmlDictionaryReaderQuotas quotas = new XmlDictionaryReaderQuotas();
    58       quotas.MaxArrayLength = int.MaxValue;
    59       netTCPBinding.ReaderQuotas = quotas;
    60       netTCPBinding.SendTimeout = new TimeSpan(600000000);
    61       netTCPBinding.ReceiveTimeout = new TimeSpan(600000000);
    6276      endpoint.Binding = netTCPBinding;
    6377      int port = FindFreeTcpPort();
     
    92106    #region IAlgorithmBroker Members
    93107
    94     public void TransmitAlgorithm(MPITransportWrapper<IAlgorithm> algorithm, int updateInterval) {
    95       Console.WriteLine("Transmitting Algorithm...");
     108    public void SetUpdateInterval(int updateInterval) {
     109      Console.WriteLine("Set update interval...");
     110
    96111      int clients = MPI.Communicator.world.Group.Size - 1;
    97 
    98       for(int i = 0; i < clients; i++) {
     112      for (int i = 0; i < clients; i++) {
    99113        int client = i + 1;
    100         communicator.Send<MPITransportWrapper<IAlgorithm>>(algorithm, client, 0);
    101114        communicator.Send<int>(updateInterval, client, 1);
    102115      }
    103116    }
    104117
     118    public void TransmitAlgorithm(Stream stream) {
     119      Console.WriteLine("Transmitting Algorithm...");
     120
     121      IAlgorithm alg = StreamingHelper.StreamItem(stream) as IAlgorithm;
     122      MPITransportWrapper<IAlgorithm> algorithm = new MPITransportWrapper<IAlgorithm>(alg);
     123
     124      int clients = MPI.Communicator.world.Group.Size - 1;
     125      for(int i = 0; i < clients; i++) {
     126        int client = i + 1;
     127        communicator.Send<MPITransportWrapper<IAlgorithm>>(algorithm, client, 0);
     128      }
     129    }
     130
     131    public object resultLocker = new object();
    105132    public ItemList<ResultCollection> Results { get; set; }
    106133
    107     public MPITransportWrapper<ItemList<ResultCollection>> GetResults() {
     134    public Stream GetResults() {
    108135      Console.WriteLine("Transmitting Results...");
    109       return new MPITransportWrapper<ItemList<ResultCollection>>(Results);
     136      MemoryStream stream = new MemoryStream();
     137      lock (resultLocker) {
     138        XmlGenerator.Serialize(Results, stream);
     139      }
     140      stream = new MemoryStream((stream as MemoryStream).GetBuffer());
     141
     142      return stream;
    110143    }
    111144
  • branches/MPI/HeuristicLab.MPIAlgorithmRunner/3.3/IAlgorithmBroker.cs

    r6398 r6401  
    2828using HeuristicLab.Operators.MPISupport;
    2929using HeuristicLab.Core;
     30using System.IO;
    3031
    3132namespace HeuristicLab.MPIAlgorithmRunner {
     
    3334  public interface IAlgorithmBroker {
    3435    [OperationContract]
    35     void TransmitAlgorithm(MPITransportWrapper<IAlgorithm> algorithm, int updateInterval);
     36    void SetUpdateInterval(int updateInterval);
     37   
     38    [OperationContract]
     39    void TransmitAlgorithm(Stream stream);
    3640
    3741    [OperationContract]
    38     MPITransportWrapper<ItemList<ResultCollection>> GetResults();
     42    Stream GetResults();
    3943
    4044    [OperationContract]
  • branches/MPI/HeuristicLab.MPIAlgorithmRunner/3.3/Program.cs

    r6398 r6401  
    7777          SetJobStatus(address);
    7878
    79           ItemList<ResultCollection> results = new ItemList<ResultCollection>();
    80          
    8179          bool[] finished = new bool[clients];
    8280          int finishedCount = 0;
    8381
    8482          while (finishedCount != clients) {
     83            ItemList<ResultCollection> results = new ItemList<ResultCollection>();
     84
    8585            for (int i = 0; i < clients; i++) {
    8686              if (!finished[i]) {
     
    105105
    106106            Console.WriteLine("Update results");
    107             broker.Results = results;
     107            lock (broker.resultLocker)
     108              broker.Results = results;
    108109          }
    109110
     
    114115          broker.ExitWaitHandle.WaitOne();
    115116          Console.WriteLine("Finished.");
     117          Thread.Sleep(1000);
    116118          service.Close();
    117119        } else {
  • branches/MPI/HeuristicLab.MPIEngine/3.3/MPIEngine.cs

    r6400 r6401  
    229229        task.StdOutFilePath = "stdout.txt";
    230230        task.StdErrFilePath = "stderr.txt";
    231         task.WorkDirectory = path; 
    232         task.MinimumNumberOfCores = task.MaximumNumberOfCores = 3;
     231        task.WorkDirectory = path;
     232        task.MinimumNumberOfCores = task.MaximumNumberOfCores = cpuPerNode * requestedNodes.Count;
    233233        job.AddTask(task);
    234234
     
    260260
    261261          NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None);
    262           XmlDictionaryReaderQuotas quotas = new XmlDictionaryReaderQuotas();
    263           quotas.MaxArrayLength = int.MaxValue;
    264           netTCPBinding.ReaderQuotas = quotas;
     262          netTCPBinding.TransferMode = TransferMode.Streamed;
    265263          netTCPBinding.MaxReceivedMessageSize = int.MaxValue;
    266           netTCPBinding.SendTimeout = new TimeSpan(600000000);
    267           netTCPBinding.ReceiveTimeout = new TimeSpan(600000000);
    268264          ChannelFactory<IAlgorithmBroker> factory = new ChannelFactory<IAlgorithmBroker>(netTCPBinding, address);
    269265          IAlgorithmBroker proxy = factory.CreateChannel();
    270266
    271           proxy.TransmitAlgorithm(new MPITransportWrapper<IAlgorithm>(algorithm), updateInterval);
     267          Stream stream = new MemoryStream();
     268          XmlGenerator.Serialize(algorithm, stream);
     269          stream = new MemoryStream((stream as MemoryStream).GetBuffer());
     270
     271          proxy.TransmitAlgorithm(stream);
     272          proxy.SetUpdateInterval(updateInterval);
    272273
    273274          while (!proxy.IsAlgorithmTerminated()) {
    274275            cancellationToken.ThrowIfCancellationRequested();
    275276
    276             ItemList<ResultCollection> results = proxy.GetResults().InnerItem;
     277            ItemList<ResultCollection> results = StreamingHelper.StreamItem(proxy.GetResults()) as ItemList<ResultCollection>;
    277278
    278279            ResultCollection resultCollection = (globalScope.Variables["Results"].Value as ResultCollection);
     
    292293          throw e;
    293294        }
    294         finally {
    295            /*ISchedulerJob schedulerJob = scheduler.OpenJob(job.Id);
    296            if (schedulerJob != null &&
    297              (schedulerJob.State == JobState.Running || schedulerJob.State == JobState.Queued)) {
    298                scheduler.CancelJob(job.Id, "Cancelled");           
    299            } */
    300         }
    301295      }
    302296    }
  • branches/MPI/HeuristicLab.Operators.MPISupport/3.3/MPITransportWrapper.cs

    r6394 r6401  
    4545
    4646    public MPITransportWrapper(T item) {
    47       innerItem = item;
     47      innerItem = item.Clone() as T;
    4848    }
    4949
Note: See TracChangeset for help on using the changeset viewer.