Changeset 6401
- Timestamp:
- 06/09/11 12:52:48 (14 years ago)
- Location:
- branches/MPI
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/MPI/HeuristicLab.MPIAlgorithmRunner/3.3/AlgorithmBroker.cs
r6400 r6401 33 33 using System.Threading; 34 34 using System.Xml; 35 using System.IO; 36 using HeuristicLab.Persistence.Default.Xml; 35 37 36 38 namespace HeuristicLab.MPIAlgorithmRunner { 39 public class StreamingHelper { 40 public static IItem StreamItem(Stream stream) { 41 using (MemoryStream memoryStream = new MemoryStream()) { 42 byte[] buffer = new byte[256]; 43 int read; 44 while ((read = stream.Read(buffer, 0, buffer.Length)) != 0) 45 memoryStream.Write(buffer, 0, read); 46 47 using (MemoryStream itemStream = new MemoryStream(memoryStream.GetBuffer())) { 48 IItem result = XmlParser.Deserialize(itemStream) as IItem; 49 return result; 50 } 51 } 52 } 53 } 54 37 55 [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] 38 56 class AlgorithmBroker: IAlgorithmBroker { … … 54 72 endpoint.Name = "AlgorithmBrokerEndpoint"; 55 73 NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None); 74 netTCPBinding.TransferMode = TransferMode.Streamed; 56 75 netTCPBinding.MaxReceivedMessageSize = int.MaxValue; 57 XmlDictionaryReaderQuotas quotas = new XmlDictionaryReaderQuotas();58 quotas.MaxArrayLength = int.MaxValue;59 netTCPBinding.ReaderQuotas = quotas;60 netTCPBinding.SendTimeout = new TimeSpan(600000000);61 netTCPBinding.ReceiveTimeout = new TimeSpan(600000000);62 76 endpoint.Binding = netTCPBinding; 63 77 int port = FindFreeTcpPort(); … … 92 106 #region IAlgorithmBroker Members 93 107 94 public void TransmitAlgorithm(MPITransportWrapper<IAlgorithm> algorithm, int updateInterval) { 95 Console.WriteLine("Transmitting Algorithm..."); 108 public void SetUpdateInterval(int updateInterval) { 109 Console.WriteLine("Set update interval..."); 110 96 111 int clients = MPI.Communicator.world.Group.Size - 1; 97 98 for(int i = 0; i < clients; i++) { 112 for (int i = 0; i < clients; i++) { 99 113 int client = i + 1; 100 communicator.Send<MPITransportWrapper<IAlgorithm>>(algorithm, client, 0);101 114 communicator.Send<int>(updateInterval, client, 1); 102 115 } 103 116 } 104 117 118 public void TransmitAlgorithm(Stream stream) { 119 Console.WriteLine("Transmitting Algorithm..."); 120 121 IAlgorithm alg = StreamingHelper.StreamItem(stream) as IAlgorithm; 122 MPITransportWrapper<IAlgorithm> algorithm = new MPITransportWrapper<IAlgorithm>(alg); 123 124 int clients = MPI.Communicator.world.Group.Size - 1; 125 for(int i = 0; i < clients; i++) { 126 int client = i + 1; 127 communicator.Send<MPITransportWrapper<IAlgorithm>>(algorithm, client, 0); 128 } 129 } 130 131 public object resultLocker = new object(); 105 132 public ItemList<ResultCollection> Results { get; set; } 106 133 107 public MPITransportWrapper<ItemList<ResultCollection>>GetResults() {134 public Stream GetResults() { 108 135 Console.WriteLine("Transmitting Results..."); 109 return new MPITransportWrapper<ItemList<ResultCollection>>(Results); 136 MemoryStream stream = new MemoryStream(); 137 lock (resultLocker) { 138 XmlGenerator.Serialize(Results, stream); 139 } 140 stream = new MemoryStream((stream as MemoryStream).GetBuffer()); 141 142 return stream; 110 143 } 111 144 -
branches/MPI/HeuristicLab.MPIAlgorithmRunner/3.3/IAlgorithmBroker.cs
r6398 r6401 28 28 using HeuristicLab.Operators.MPISupport; 29 29 using HeuristicLab.Core; 30 using System.IO; 30 31 31 32 namespace HeuristicLab.MPIAlgorithmRunner { … … 33 34 public interface IAlgorithmBroker { 34 35 [OperationContract] 35 void TransmitAlgorithm(MPITransportWrapper<IAlgorithm> algorithm, int updateInterval); 36 void SetUpdateInterval(int updateInterval); 37 38 [OperationContract] 39 void TransmitAlgorithm(Stream stream); 36 40 37 41 [OperationContract] 38 MPITransportWrapper<ItemList<ResultCollection>>GetResults();42 Stream GetResults(); 39 43 40 44 [OperationContract] -
branches/MPI/HeuristicLab.MPIAlgorithmRunner/3.3/Program.cs
r6398 r6401 77 77 SetJobStatus(address); 78 78 79 ItemList<ResultCollection> results = new ItemList<ResultCollection>();80 81 79 bool[] finished = new bool[clients]; 82 80 int finishedCount = 0; 83 81 84 82 while (finishedCount != clients) { 83 ItemList<ResultCollection> results = new ItemList<ResultCollection>(); 84 85 85 for (int i = 0; i < clients; i++) { 86 86 if (!finished[i]) { … … 105 105 106 106 Console.WriteLine("Update results"); 107 broker.Results = results; 107 lock (broker.resultLocker) 108 broker.Results = results; 108 109 } 109 110 … … 114 115 broker.ExitWaitHandle.WaitOne(); 115 116 Console.WriteLine("Finished."); 117 Thread.Sleep(1000); 116 118 service.Close(); 117 119 } else { -
branches/MPI/HeuristicLab.MPIEngine/3.3/MPIEngine.cs
r6400 r6401 229 229 task.StdOutFilePath = "stdout.txt"; 230 230 task.StdErrFilePath = "stderr.txt"; 231 task.WorkDirectory = path; 232 task.MinimumNumberOfCores = task.MaximumNumberOfCores = 3;231 task.WorkDirectory = path; 232 task.MinimumNumberOfCores = task.MaximumNumberOfCores = cpuPerNode * requestedNodes.Count; 233 233 job.AddTask(task); 234 234 … … 260 260 261 261 NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None); 262 XmlDictionaryReaderQuotas quotas = new XmlDictionaryReaderQuotas(); 263 quotas.MaxArrayLength = int.MaxValue; 264 netTCPBinding.ReaderQuotas = quotas; 262 netTCPBinding.TransferMode = TransferMode.Streamed; 265 263 netTCPBinding.MaxReceivedMessageSize = int.MaxValue; 266 netTCPBinding.SendTimeout = new TimeSpan(600000000);267 netTCPBinding.ReceiveTimeout = new TimeSpan(600000000);268 264 ChannelFactory<IAlgorithmBroker> factory = new ChannelFactory<IAlgorithmBroker>(netTCPBinding, address); 269 265 IAlgorithmBroker proxy = factory.CreateChannel(); 270 266 271 proxy.TransmitAlgorithm(new MPITransportWrapper<IAlgorithm>(algorithm), updateInterval); 267 Stream stream = new MemoryStream(); 268 XmlGenerator.Serialize(algorithm, stream); 269 stream = new MemoryStream((stream as MemoryStream).GetBuffer()); 270 271 proxy.TransmitAlgorithm(stream); 272 proxy.SetUpdateInterval(updateInterval); 272 273 273 274 while (!proxy.IsAlgorithmTerminated()) { 274 275 cancellationToken.ThrowIfCancellationRequested(); 275 276 276 ItemList<ResultCollection> results = proxy.GetResults().InnerItem;277 ItemList<ResultCollection> results = StreamingHelper.StreamItem(proxy.GetResults()) as ItemList<ResultCollection>; 277 278 278 279 ResultCollection resultCollection = (globalScope.Variables["Results"].Value as ResultCollection); … … 292 293 throw e; 293 294 } 294 finally {295 /*ISchedulerJob schedulerJob = scheduler.OpenJob(job.Id);296 if (schedulerJob != null &&297 (schedulerJob.State == JobState.Running || schedulerJob.State == JobState.Queued)) {298 scheduler.CancelJob(job.Id, "Cancelled");299 } */300 }301 295 } 302 296 } -
branches/MPI/HeuristicLab.Operators.MPISupport/3.3/MPITransportWrapper.cs
r6394 r6401 45 45 46 46 public MPITransportWrapper(T item) { 47 innerItem = item ;47 innerItem = item.Clone() as T; 48 48 } 49 49
Note: See TracChangeset
for help on using the changeset viewer.