Free cookie consent management tool by TermsFeed Policy Generator

Changeset 3890


Ignore:
Timestamp:
06/02/10 16:18:39 (14 years ago)
Author:
abeham
Message:

#866

  • Fixed EvaluationTCPChannel
    • Message delimiter and message are sent in the same call to socket.Send
  • Added EvaluationTCPChannelView
Location:
trunk/sources
Files:
3 added
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Problems.ExternalEvaluation.Views/3.3/HeuristicLab.Problems.ExternalEvaluation.Views-3.3.csproj

    r3881 r3890  
    9595      <DependentUpon>EvaluationProcessChannelView.cs</DependentUpon>
    9696    </Compile>
     97    <Compile Include="EvaluationTCPChannelView.cs">
     98      <SubType>UserControl</SubType>
     99    </Compile>
     100    <Compile Include="EvaluationTCPChannelView.Designer.cs">
     101      <DependentUpon>EvaluationTCPChannelView.cs</DependentUpon>
     102    </Compile>
    97103    <Compile Include="HeuristicLabProblemsExternalEvaluationViewsPlugin.cs" />
    98104    <Compile Include="Properties\AssemblyInfo.cs" />
     
    148154      <DependentUpon>EvaluationServiceClientView.cs</DependentUpon>
    149155    </EmbeddedResource>
     156    <EmbeddedResource Include="EvaluationTCPChannelView.resx">
     157      <DependentUpon>EvaluationTCPChannelView.cs</DependentUpon>
     158    </EmbeddedResource>
    150159    <EmbeddedResource Include="SolutionMessageBuilderView.resx">
    151160      <DependentUpon>SolutionMessageBuilderView.cs</DependentUpon>
  • trunk/sources/HeuristicLab.Problems.ExternalEvaluation/3.3/Drivers/EvaluationTCPChannel.cs

    r3883 r3890  
    3232  [StorableClass]
    3333  public class EvaluationTCPChannel : EvaluationChannel {
     34    public const int MAX_VARINT32_SIZE = 5;
     35
    3436    [Storable]
    3537    private string ipAddress;
     38    public string IpAddress {
     39      get { return ipAddress; }
     40      set {
     41        bool changed = !ipAddress.Equals(value);
     42        ipAddress = value;
     43        if (changed)
     44          OnIpAddressChanged();
     45      }
     46    }
    3647    [Storable]
    3748    private int port;
     49    public int Port {
     50      get { return port; }
     51      set {
     52        bool changed = port != value;
     53        port = value;
     54        if (changed)
     55          OnPortChanged();
     56      }
     57    }
    3858    private Socket socket;
    3959
     
    5777      socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    5878      socket.Connect(IPAddress.Parse(ipAddress), port);
    59       if (socket.Connected)
     79      if (socket.Connected) {
    6080        base.Open();
     81        OnConnected();
     82      }
    6183    }
    6284
    6385    public override void Send(IMessage message) {
    64       int size = message.SerializedSize;
    65       socket.Send(System.BitConverter.GetBytes(size));
    66       socket.Send(message.ToByteArray());
     86      try {
     87        byte[] buffer = EncodeDelimited(message);
     88        socket.Send(buffer);
     89      } catch (SocketException) {
     90        Close();
     91      } catch (ObjectDisposedException) {
     92        socket = null;
     93        Close();
     94      }
     95    }
     96
     97    private byte[] EncodeDelimited(IMessage message) {
     98      int messageSize = message.SerializedSize;
     99      int headerSize = GetVarint32EncodedSize(messageSize);
     100      byte[] buffer = new byte[headerSize + messageSize];
     101      CodedOutputStream tmp = CodedOutputStream.CreateInstance(buffer);
     102      tmp.WriteRawVarint32((uint)messageSize);
     103      message.WriteTo(tmp);
     104      return buffer;
     105    }
     106
     107    private int GetVarint32EncodedSize(int size) {
     108      // Varints in Protocol Buffers are encoded using the 7 lower order bits (the MSB indicates continuation (=1) or termination (=0))
     109      int sizeByteCount = 1;
     110      int limit = 128;
     111      while (size > limit) {
     112        sizeByteCount++;
     113        limit *= 128;
     114      }
     115      return sizeByteCount;
    67116    }
    68117
    69118    public override IMessage Receive(IBuilder builder) {
    70       byte[] sizeBuffer = new byte[4];
    71       socket.Receive(sizeBuffer);
    72       int size = System.BitConverter.ToInt32(sizeBuffer, 0);
    73       byte[] messageBuffer = new byte[size];
    74       int offset = 0;
    75       while (offset < size)
    76         offset += socket.Receive(messageBuffer, offset, size, SocketFlags.None);
    77       return builder.WeakMergeFrom(ByteString.CopyFrom(messageBuffer)).WeakBuild();
     119      try {
     120        byte[] buffer = GetMessageBuffer();
     121        return builder.WeakMergeFrom(ByteString.CopyFrom(buffer)).WeakBuild();
     122      } catch (SocketException) {
     123        Close();
     124      } catch (ObjectDisposedException) {
     125        socket = null;
     126        Close();
     127      }
     128      return null;
     129    }
     130
     131    private byte[] GetMessageBuffer() {
     132      byte[] buffer;
     133      int messageSize = GetReceivedMessageSize(out buffer);
     134      int headerSize = GetVarint32EncodedSize(messageSize);
     135      int messageBytesInHeader = Math.Min(messageSize, buffer.Length - headerSize);
     136      byte[] messageBuffer = new byte[messageSize];
     137      Array.Copy(buffer, headerSize, messageBuffer, 0, messageBytesInHeader);
     138      ReceiveMessage(messageBuffer, messageBytesInHeader, messageSize - messageBytesInHeader);
     139      return messageBuffer;
     140    }
     141
     142    private int GetReceivedMessageSize(out byte[] buffer) {
     143      buffer = new byte[MAX_VARINT32_SIZE];
     144      socket.Receive(buffer);
     145      CodedInputStream tmp = CodedInputStream.CreateInstance(buffer);
     146      return (int)tmp.ReadRawVarint32();
     147    }
     148
     149    private void ReceiveMessage(byte[] buffer, int offset, int size) {
     150      while (size > 0) {
     151        int received = socket.Receive(buffer, offset, size, SocketFlags.None);
     152        offset += received;
     153        size -= received;
     154      }
    78155    }
    79156
    80157    public override void Close() {
    81       socket.Disconnect(false);
    82       socket.Close();
     158      if (socket != null) {
     159        if (socket.Connected)
     160          socket.Disconnect(false);
     161        socket.Close();
     162        socket = null;
     163      }
     164      bool wasInitialized = IsInitialized;
    83165      base.Close();
     166      if (wasInitialized) OnDiconnected();
    84167    }
    85168
    86169    #endregion
     170
     171    #region Events
     172    public event EventHandler IpAddressChanged;
     173    protected void OnIpAddressChanged() {
     174      EventHandler handler = IpAddressChanged;
     175      if (handler != null) handler(this, EventArgs.Empty);
     176    }
     177    public event EventHandler PortChanged;
     178    protected void OnPortChanged() {
     179      EventHandler handler = PortChanged;
     180      if (handler != null) handler(this, EventArgs.Empty);
     181    }
     182    public event EventHandler Connected;
     183    protected void OnConnected() {
     184      EventHandler handler = Connected;
     185      if (handler != null) handler(this, EventArgs.Empty);
     186    }
     187    public event EventHandler Disconnected;
     188    protected void OnDiconnected() {
     189      EventHandler handler = Disconnected;
     190      if (handler != null) handler(this, EventArgs.Empty);
     191    }
     192    #endregion
    87193  }
    88194}
Note: See TracChangeset for help on using the changeset viewer.