Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/09/11 12:52:48 (14 years ago)
Author:
svonolfe
Message:

Implemented result and algorithm streaming (#1542)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • TabularUnified branches/MPI/HeuristicLab.MPIEngine/3.3/MPIEngine.cs

    r6400 r6401  
    229229        task.StdOutFilePath = "stdout.txt";
    230230        task.StdErrFilePath = "stderr.txt";
    231         task.WorkDirectory = path; 
    232         task.MinimumNumberOfCores = task.MaximumNumberOfCores = 3;
     231        task.WorkDirectory = path;
     232        task.MinimumNumberOfCores = task.MaximumNumberOfCores = cpuPerNode * requestedNodes.Count;
    233233        job.AddTask(task);
    234234
     
    260260
    261261          NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None);
    262           XmlDictionaryReaderQuotas quotas = new XmlDictionaryReaderQuotas();
    263           quotas.MaxArrayLength = int.MaxValue;
    264           netTCPBinding.ReaderQuotas = quotas;
     262          netTCPBinding.TransferMode = TransferMode.Streamed;
    265263          netTCPBinding.MaxReceivedMessageSize = int.MaxValue;
    266           netTCPBinding.SendTimeout = new TimeSpan(600000000);
    267           netTCPBinding.ReceiveTimeout = new TimeSpan(600000000);
    268264          ChannelFactory<IAlgorithmBroker> factory = new ChannelFactory<IAlgorithmBroker>(netTCPBinding, address);
    269265          IAlgorithmBroker proxy = factory.CreateChannel();
    270266
    271           proxy.TransmitAlgorithm(new MPITransportWrapper<IAlgorithm>(algorithm), updateInterval);
     267          Stream stream = new MemoryStream();
     268          XmlGenerator.Serialize(algorithm, stream);
     269          stream = new MemoryStream((stream as MemoryStream).GetBuffer());
     270
     271          proxy.TransmitAlgorithm(stream);
     272          proxy.SetUpdateInterval(updateInterval);
    272273
    273274          while (!proxy.IsAlgorithmTerminated()) {
    274275            cancellationToken.ThrowIfCancellationRequested();
    275276
    276             ItemList<ResultCollection> results = proxy.GetResults().InnerItem;
     277            ItemList<ResultCollection> results = StreamingHelper.StreamItem(proxy.GetResults()) as ItemList<ResultCollection>;
    277278
    278279            ResultCollection resultCollection = (globalScope.Variables["Results"].Value as ResultCollection);
     
    292293          throw e;
    293294        }
    294         finally {
    295            /*ISchedulerJob schedulerJob = scheduler.OpenJob(job.Id);
    296            if (schedulerJob != null &&
    297              (schedulerJob.State == JobState.Running || schedulerJob.State == JobState.Queued)) {
    298                scheduler.CancelJob(job.Id, "Cancelled");           
    299            } */
    300         }
    301295      }
    302296    }
Note: See TracChangeset for help on using the changeset viewer.