Free cookie consent management tool by TermsFeed Policy Generator

source: branches/MPI/HeuristicLab.MPIAlgorithmRunner/3.3/AlgorithmBroker.cs @ 7403

Last change on this file since 7403 was 6401, checked in by svonolfe, 14 years ago

Implemented result and algorithm streaming (#1542)

File size: 5.4 KB
RevLine 
[6388]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.Net;
27using System.Net.Sockets;
28using System.ServiceModel;
29using System.ServiceModel.Description;
30using HeuristicLab.Operators.MPISupport;
31using HeuristicLab.Optimization;
32using HeuristicLab.Core;
33using System.Threading;
[6394]34using System.Xml;
[6401]35using System.IO;
36using HeuristicLab.Persistence.Default.Xml;
[6388]37                     
38namespace HeuristicLab.MPIAlgorithmRunner {
[6401]39  public class StreamingHelper {
40    public static IItem StreamItem(Stream stream) {
41      using (MemoryStream memoryStream = new MemoryStream()) {
42        byte[] buffer = new byte[256];
43        int read;
44        while ((read = stream.Read(buffer, 0, buffer.Length)) != 0)
45          memoryStream.Write(buffer, 0, read);
46
47        using (MemoryStream itemStream = new MemoryStream(memoryStream.GetBuffer())) {
48          IItem result = XmlParser.Deserialize(itemStream) as IItem;
49          return result;
50        }
51      }
52    }
53  }
54
[6388]55  [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
56  class AlgorithmBroker: IAlgorithmBroker {
57    private static int FindFreeTcpPort() {
58      TcpListener l = new TcpListener(IPAddress.Parse("127.0.0.1"), 0);
59      l.Start();
60      int port = ((IPEndPoint)l.LocalEndpoint).Port;
61      l.Stop();
62      return port;
63    }
64
65    private static ServiceHost CreateService(MPI.Communicator communicator) {
66      AlgorithmBroker broker = new AlgorithmBroker(communicator);
67      ServiceHost service = new ServiceHost(broker);
68
69      ContractDescription contract = ContractDescription.GetContract(typeof(IAlgorithmBroker));
70      contract.ContractType = typeof(IAlgorithmBroker);
71      ServiceEndpoint endpoint = new ServiceEndpoint(contract);
72      endpoint.Name = "AlgorithmBrokerEndpoint";
73      NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None);
[6401]74      netTCPBinding.TransferMode = TransferMode.Streamed;
[6394]75      netTCPBinding.MaxReceivedMessageSize = int.MaxValue;
[6388]76      endpoint.Binding = netTCPBinding;
77      int port = FindFreeTcpPort();
78      endpoint.Address = new EndpointAddress("net.tcp://localhost:" + port + "/AlgorithmBroker");
79      service.AddServiceEndpoint(endpoint);
[6394]80      ServiceDebugBehavior debug = service.Description.Behaviors.Find<ServiceDebugBehavior>();
81      debug.IncludeExceptionDetailInFaults = true;
[6388]82
83      return service;
84    }
85
86    public static ServiceHost StartService(MPI.Communicator communicator) {
87      try {
88        ServiceHost service = CreateService(communicator);
89
90        service.Open();
91
92        return service;
93      }
94      catch (AddressAlreadyInUseException) {
95        return StartService(communicator);
96      }
97    }
98
99    private MPI.Communicator communicator;
100
101    public AlgorithmBroker(MPI.Communicator communicator) {
102      this.communicator = communicator;
103      ExitWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
104    }
105
106    #region IAlgorithmBroker Members
107
[6401]108    public void SetUpdateInterval(int updateInterval) {
109      Console.WriteLine("Set update interval...");
110
111      int clients = MPI.Communicator.world.Group.Size - 1;
112      for (int i = 0; i < clients; i++) {
113        int client = i + 1;
114        communicator.Send<int>(updateInterval, client, 1);
115      }
116    }
117
118    public void TransmitAlgorithm(Stream stream) {
[6388]119      Console.WriteLine("Transmitting Algorithm...");
[6401]120
121      IAlgorithm alg = StreamingHelper.StreamItem(stream) as IAlgorithm;
122      MPITransportWrapper<IAlgorithm> algorithm = new MPITransportWrapper<IAlgorithm>(alg);
123
[6394]124      int clients = MPI.Communicator.world.Group.Size - 1;
[6388]125      for(int i = 0; i < clients; i++) {
126        int client = i + 1;
127        communicator.Send<MPITransportWrapper<IAlgorithm>>(algorithm, client, 0);
128      }
129    }
130
[6401]131    public object resultLocker = new object();
[6388]132    public ItemList<ResultCollection> Results { get; set; }
133
[6401]134    public Stream GetResults() {
[6388]135      Console.WriteLine("Transmitting Results...");
[6401]136      MemoryStream stream = new MemoryStream();
137      lock (resultLocker) {
138        XmlGenerator.Serialize(Results, stream);
139      }
140      stream = new MemoryStream((stream as MemoryStream).GetBuffer());
141
142      return stream;
[6388]143    }
144
145    public EventWaitHandle ExitWaitHandle { get; private set; }
146
147    public bool Terminated { get; set; }
148
149    public bool IsAlgorithmTerminated() {
150      lock (ExitWaitHandle) {
151        if (Terminated)
152          ExitWaitHandle.Set();
153        return Terminated;
154      }
155    }
156    #endregion
157  }
158}
Note: See TracBrowser for help on using the repository browser.