#region License Information /* HeuristicLab * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net; using System.Net.Sockets; using System.ServiceModel; using System.ServiceModel.Description; using HeuristicLab.Operators.MPISupport; using HeuristicLab.Optimization; using HeuristicLab.Core; using System.Threading; using System.Xml; using System.IO; using HeuristicLab.Persistence.Default.Xml; namespace HeuristicLab.MPIAlgorithmRunner { public class StreamingHelper { public static IItem StreamItem(Stream stream) { using (MemoryStream memoryStream = new MemoryStream()) { byte[] buffer = new byte[256]; int read; while ((read = stream.Read(buffer, 0, buffer.Length)) != 0) memoryStream.Write(buffer, 0, read); using (MemoryStream itemStream = new MemoryStream(memoryStream.GetBuffer())) { IItem result = XmlParser.Deserialize(itemStream) as IItem; return result; } } } } [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] class AlgorithmBroker: IAlgorithmBroker { private static int FindFreeTcpPort() { TcpListener l = new TcpListener(IPAddress.Parse("127.0.0.1"), 0); l.Start(); int port = ((IPEndPoint)l.LocalEndpoint).Port; l.Stop(); return port; } private static ServiceHost CreateService(MPI.Communicator communicator) { AlgorithmBroker broker = new AlgorithmBroker(communicator); ServiceHost service = new ServiceHost(broker); ContractDescription contract = ContractDescription.GetContract(typeof(IAlgorithmBroker)); contract.ContractType = typeof(IAlgorithmBroker); ServiceEndpoint endpoint = new ServiceEndpoint(contract); endpoint.Name = "AlgorithmBrokerEndpoint"; NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None); netTCPBinding.TransferMode = TransferMode.Streamed; netTCPBinding.MaxReceivedMessageSize = int.MaxValue; endpoint.Binding = netTCPBinding; int port = FindFreeTcpPort(); endpoint.Address = new EndpointAddress("net.tcp://localhost:" + port + "/AlgorithmBroker"); service.AddServiceEndpoint(endpoint); ServiceDebugBehavior debug = service.Description.Behaviors.Find(); debug.IncludeExceptionDetailInFaults = true; return service; } public static ServiceHost StartService(MPI.Communicator communicator) { try { ServiceHost service = CreateService(communicator); service.Open(); return service; } catch (AddressAlreadyInUseException) { return StartService(communicator); } } private MPI.Communicator communicator; public AlgorithmBroker(MPI.Communicator communicator) { this.communicator = communicator; ExitWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset); } #region IAlgorithmBroker Members public void SetUpdateInterval(int updateInterval) { Console.WriteLine("Set update interval..."); int clients = MPI.Communicator.world.Group.Size - 1; for (int i = 0; i < clients; i++) { int client = i + 1; communicator.Send(updateInterval, client, 1); } } public void TransmitAlgorithm(Stream stream) { Console.WriteLine("Transmitting Algorithm..."); IAlgorithm alg = StreamingHelper.StreamItem(stream) as IAlgorithm; MPITransportWrapper algorithm = new MPITransportWrapper(alg); int clients = MPI.Communicator.world.Group.Size - 1; for(int i = 0; i < clients; i++) { int client = i + 1; communicator.Send>(algorithm, client, 0); } } public object resultLocker = new object(); public ItemList Results { get; set; } public Stream GetResults() { Console.WriteLine("Transmitting Results..."); MemoryStream stream = new MemoryStream(); lock (resultLocker) { XmlGenerator.Serialize(Results, stream); } stream = new MemoryStream((stream as MemoryStream).GetBuffer()); return stream; } public EventWaitHandle ExitWaitHandle { get; private set; } public bool Terminated { get; set; } public bool IsAlgorithmTerminated() { lock (ExitWaitHandle) { if (Terminated) ExitWaitHandle.Set(); return Terminated; } } #endregion } }