#region License Information
/* HeuristicLab
* Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net;
using System.Net.Sockets;
using System.ServiceModel;
using System.ServiceModel.Description;
using HeuristicLab.Operators.MPISupport;
using HeuristicLab.Optimization;
using HeuristicLab.Core;
using System.Threading;
using System.Xml;
using System.IO;
using HeuristicLab.Persistence.Default.Xml;
namespace HeuristicLab.MPIAlgorithmRunner {
public class StreamingHelper {
public static IItem StreamItem(Stream stream) {
using (MemoryStream memoryStream = new MemoryStream()) {
byte[] buffer = new byte[256];
int read;
while ((read = stream.Read(buffer, 0, buffer.Length)) != 0)
memoryStream.Write(buffer, 0, read);
using (MemoryStream itemStream = new MemoryStream(memoryStream.GetBuffer())) {
IItem result = XmlParser.Deserialize(itemStream) as IItem;
return result;
}
}
}
}
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
class AlgorithmBroker: IAlgorithmBroker {
private static int FindFreeTcpPort() {
TcpListener l = new TcpListener(IPAddress.Parse("127.0.0.1"), 0);
l.Start();
int port = ((IPEndPoint)l.LocalEndpoint).Port;
l.Stop();
return port;
}
private static ServiceHost CreateService(MPI.Communicator communicator) {
AlgorithmBroker broker = new AlgorithmBroker(communicator);
ServiceHost service = new ServiceHost(broker);
ContractDescription contract = ContractDescription.GetContract(typeof(IAlgorithmBroker));
contract.ContractType = typeof(IAlgorithmBroker);
ServiceEndpoint endpoint = new ServiceEndpoint(contract);
endpoint.Name = "AlgorithmBrokerEndpoint";
NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None);
netTCPBinding.TransferMode = TransferMode.Streamed;
netTCPBinding.MaxReceivedMessageSize = int.MaxValue;
endpoint.Binding = netTCPBinding;
int port = FindFreeTcpPort();
endpoint.Address = new EndpointAddress("net.tcp://localhost:" + port + "/AlgorithmBroker");
service.AddServiceEndpoint(endpoint);
ServiceDebugBehavior debug = service.Description.Behaviors.Find();
debug.IncludeExceptionDetailInFaults = true;
return service;
}
public static ServiceHost StartService(MPI.Communicator communicator) {
try {
ServiceHost service = CreateService(communicator);
service.Open();
return service;
}
catch (AddressAlreadyInUseException) {
return StartService(communicator);
}
}
private MPI.Communicator communicator;
public AlgorithmBroker(MPI.Communicator communicator) {
this.communicator = communicator;
ExitWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
}
#region IAlgorithmBroker Members
public void SetUpdateInterval(int updateInterval) {
Console.WriteLine("Set update interval...");
int clients = MPI.Communicator.world.Group.Size - 1;
for (int i = 0; i < clients; i++) {
int client = i + 1;
communicator.Send(updateInterval, client, 1);
}
}
public void TransmitAlgorithm(Stream stream) {
Console.WriteLine("Transmitting Algorithm...");
IAlgorithm alg = StreamingHelper.StreamItem(stream) as IAlgorithm;
MPITransportWrapper algorithm = new MPITransportWrapper(alg);
int clients = MPI.Communicator.world.Group.Size - 1;
for(int i = 0; i < clients; i++) {
int client = i + 1;
communicator.Send>(algorithm, client, 0);
}
}
public object resultLocker = new object();
public ItemList Results { get; set; }
public Stream GetResults() {
Console.WriteLine("Transmitting Results...");
MemoryStream stream = new MemoryStream();
lock (resultLocker) {
XmlGenerator.Serialize(Results, stream);
}
stream = new MemoryStream((stream as MemoryStream).GetBuffer());
return stream;
}
public EventWaitHandle ExitWaitHandle { get; private set; }
public bool Terminated { get; set; }
public bool IsAlgorithmTerminated() {
lock (ExitWaitHandle) {
if (Terminated)
ExitWaitHandle.Set();
return Terminated;
}
}
#endregion
}
}