Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Problems.ExternalEvaluation/3.3/Drivers/EvaluationTCPChannel.cs @ 3890

Last change on this file since 3890 was 3890, checked in by abeham, 14 years ago

#866

  • Fixed EvaluationTCPChannel
    • Message delimiter and message are sent in the same call to socket.Send
  • Added EvaluationTCPChannelView
File size: 6.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Net;
24using System.Net.Sockets;
25using Google.ProtocolBuffers;
26using HeuristicLab.Common;
27using HeuristicLab.Core;
28using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
29
30namespace HeuristicLab.Problems.ExternalEvaluation {
31  [Item("EvaluationTCPChannel", "A channel that creates a TCP connection over a network.")]
32  [StorableClass]
33  public class EvaluationTCPChannel : EvaluationChannel {
34    public const int MAX_VARINT32_SIZE = 5;
35
36    [Storable]
37    private string ipAddress;
38    public string IpAddress {
39      get { return ipAddress; }
40      set {
41        bool changed = !ipAddress.Equals(value);
42        ipAddress = value;
43        if (changed)
44          OnIpAddressChanged();
45      }
46    }
47    [Storable]
48    private int port;
49    public int Port {
50      get { return port; }
51      set {
52        bool changed = port != value;
53        port = value;
54        if (changed)
55          OnPortChanged();
56      }
57    }
58    private Socket socket;
59
60    public EvaluationTCPChannel() : this(String.Empty, 0) { }
61    public EvaluationTCPChannel(string ip, int port)
62      : base() {
63      this.ipAddress = ip;
64      this.port = port;
65    }
66
67    public override IDeepCloneable Clone(Cloner cloner) {
68      EvaluationTCPChannel clone = (EvaluationTCPChannel)base.Clone(cloner);
69      clone.ipAddress = ipAddress;
70      clone.port = port;
71      return clone;
72    }
73
74    #region IExternalEvaluationChannel Members
75
76    public override void Open() {
77      socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
78      socket.Connect(IPAddress.Parse(ipAddress), port);
79      if (socket.Connected) {
80        base.Open();
81        OnConnected();
82      }
83    }
84
85    public override void Send(IMessage message) {
86      try {
87        byte[] buffer = EncodeDelimited(message);
88        socket.Send(buffer);
89      } catch (SocketException) {
90        Close();
91      } catch (ObjectDisposedException) {
92        socket = null;
93        Close();
94      }
95    }
96
97    private byte[] EncodeDelimited(IMessage message) {
98      int messageSize = message.SerializedSize;
99      int headerSize = GetVarint32EncodedSize(messageSize);
100      byte[] buffer = new byte[headerSize + messageSize];
101      CodedOutputStream tmp = CodedOutputStream.CreateInstance(buffer);
102      tmp.WriteRawVarint32((uint)messageSize);
103      message.WriteTo(tmp);
104      return buffer;
105    }
106
107    private int GetVarint32EncodedSize(int size) {
108      // Varints in Protocol Buffers are encoded using the 7 lower order bits (the MSB indicates continuation (=1) or termination (=0))
109      int sizeByteCount = 1;
110      int limit = 128;
111      while (size > limit) {
112        sizeByteCount++;
113        limit *= 128;
114      }
115      return sizeByteCount;
116    }
117
118    public override IMessage Receive(IBuilder builder) {
119      try {
120        byte[] buffer = GetMessageBuffer();
121        return builder.WeakMergeFrom(ByteString.CopyFrom(buffer)).WeakBuild();
122      } catch (SocketException) {
123        Close();
124      } catch (ObjectDisposedException) {
125        socket = null;
126        Close();
127      }
128      return null;
129    }
130
131    private byte[] GetMessageBuffer() {
132      byte[] buffer;
133      int messageSize = GetReceivedMessageSize(out buffer);
134      int headerSize = GetVarint32EncodedSize(messageSize);
135      int messageBytesInHeader = Math.Min(messageSize, buffer.Length - headerSize);
136      byte[] messageBuffer = new byte[messageSize];
137      Array.Copy(buffer, headerSize, messageBuffer, 0, messageBytesInHeader);
138      ReceiveMessage(messageBuffer, messageBytesInHeader, messageSize - messageBytesInHeader);
139      return messageBuffer;
140    }
141
142    private int GetReceivedMessageSize(out byte[] buffer) {
143      buffer = new byte[MAX_VARINT32_SIZE];
144      socket.Receive(buffer);
145      CodedInputStream tmp = CodedInputStream.CreateInstance(buffer);
146      return (int)tmp.ReadRawVarint32();
147    }
148
149    private void ReceiveMessage(byte[] buffer, int offset, int size) {
150      while (size > 0) {
151        int received = socket.Receive(buffer, offset, size, SocketFlags.None);
152        offset += received;
153        size -= received;
154      }
155    }
156
157    public override void Close() {
158      if (socket != null) {
159        if (socket.Connected)
160          socket.Disconnect(false);
161        socket.Close();
162        socket = null;
163      }
164      bool wasInitialized = IsInitialized;
165      base.Close();
166      if (wasInitialized) OnDiconnected();
167    }
168
169    #endregion
170
171    #region Events
172    public event EventHandler IpAddressChanged;
173    protected void OnIpAddressChanged() {
174      EventHandler handler = IpAddressChanged;
175      if (handler != null) handler(this, EventArgs.Empty);
176    }
177    public event EventHandler PortChanged;
178    protected void OnPortChanged() {
179      EventHandler handler = PortChanged;
180      if (handler != null) handler(this, EventArgs.Empty);
181    }
182    public event EventHandler Connected;
183    protected void OnConnected() {
184      EventHandler handler = Connected;
185      if (handler != null) handler(this, EventArgs.Empty);
186    }
187    public event EventHandler Disconnected;
188    protected void OnDiconnected() {
189      EventHandler handler = Disconnected;
190      if (handler != null) handler(this, EventArgs.Empty);
191    }
192    #endregion
193  }
194}
Note: See TracBrowser for help on using the repository browser.