- Timestamp:
- 06/02/10 16:18:39 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.Problems.ExternalEvaluation/3.3/Drivers/EvaluationTCPChannel.cs
r3883 r3890 32 32 [StorableClass] 33 33 public class EvaluationTCPChannel : EvaluationChannel { 34 public const int MAX_VARINT32_SIZE = 5; 35 34 36 [Storable] 35 37 private string ipAddress; 38 public string IpAddress { 39 get { return ipAddress; } 40 set { 41 bool changed = !ipAddress.Equals(value); 42 ipAddress = value; 43 if (changed) 44 OnIpAddressChanged(); 45 } 46 } 36 47 [Storable] 37 48 private int port; 49 public int Port { 50 get { return port; } 51 set { 52 bool changed = port != value; 53 port = value; 54 if (changed) 55 OnPortChanged(); 56 } 57 } 38 58 private Socket socket; 39 59 … … 57 77 socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 58 78 socket.Connect(IPAddress.Parse(ipAddress), port); 59 if (socket.Connected) 79 if (socket.Connected) { 60 80 base.Open(); 81 OnConnected(); 82 } 61 83 } 62 84 63 85 public override void Send(IMessage message) { 64 int size = message.SerializedSize; 65 socket.Send(System.BitConverter.GetBytes(size)); 66 socket.Send(message.ToByteArray()); 86 try { 87 byte[] buffer = EncodeDelimited(message); 88 socket.Send(buffer); 89 } catch (SocketException) { 90 Close(); 91 } catch (ObjectDisposedException) { 92 socket = null; 93 Close(); 94 } 95 } 96 97 private byte[] EncodeDelimited(IMessage message) { 98 int messageSize = message.SerializedSize; 99 int headerSize = GetVarint32EncodedSize(messageSize); 100 byte[] buffer = new byte[headerSize + messageSize]; 101 CodedOutputStream tmp = CodedOutputStream.CreateInstance(buffer); 102 tmp.WriteRawVarint32((uint)messageSize); 103 message.WriteTo(tmp); 104 return buffer; 105 } 106 107 private int GetVarint32EncodedSize(int size) { 108 // Varints in Protocol Buffers are encoded using the 7 lower order bits (the MSB indicates continuation (=1) or termination (=0)) 109 int sizeByteCount = 1; 110 int limit = 128; 111 while (size > limit) { 112 sizeByteCount++; 113 limit *= 128; 114 } 115 return sizeByteCount; 67 116 } 68 117 69 118 public override IMessage Receive(IBuilder builder) { 70 byte[] sizeBuffer = new byte[4]; 71 socket.Receive(sizeBuffer); 72 int size = System.BitConverter.ToInt32(sizeBuffer, 0); 73 byte[] messageBuffer = new byte[size]; 74 int offset = 0; 75 while (offset < size) 76 offset += socket.Receive(messageBuffer, offset, size, SocketFlags.None); 77 return builder.WeakMergeFrom(ByteString.CopyFrom(messageBuffer)).WeakBuild(); 119 try { 120 byte[] buffer = GetMessageBuffer(); 121 return builder.WeakMergeFrom(ByteString.CopyFrom(buffer)).WeakBuild(); 122 } catch (SocketException) { 123 Close(); 124 } catch (ObjectDisposedException) { 125 socket = null; 126 Close(); 127 } 128 return null; 129 } 130 131 private byte[] GetMessageBuffer() { 132 byte[] buffer; 133 int messageSize = GetReceivedMessageSize(out buffer); 134 int headerSize = GetVarint32EncodedSize(messageSize); 135 int messageBytesInHeader = Math.Min(messageSize, buffer.Length - headerSize); 136 byte[] messageBuffer = new byte[messageSize]; 137 Array.Copy(buffer, headerSize, messageBuffer, 0, messageBytesInHeader); 138 ReceiveMessage(messageBuffer, messageBytesInHeader, messageSize - messageBytesInHeader); 139 return messageBuffer; 140 } 141 142 private int GetReceivedMessageSize(out byte[] buffer) { 143 buffer = new byte[MAX_VARINT32_SIZE]; 144 socket.Receive(buffer); 145 CodedInputStream tmp = CodedInputStream.CreateInstance(buffer); 146 return (int)tmp.ReadRawVarint32(); 147 } 148 149 private void ReceiveMessage(byte[] buffer, int offset, int size) { 150 while (size > 0) { 151 int received = socket.Receive(buffer, offset, size, SocketFlags.None); 152 offset += received; 153 size -= received; 154 } 78 155 } 79 156 80 157 public override void Close() { 81 socket.Disconnect(false); 82 socket.Close(); 158 if (socket != null) { 159 if (socket.Connected) 160 socket.Disconnect(false); 161 socket.Close(); 162 socket = null; 163 } 164 bool wasInitialized = IsInitialized; 83 165 base.Close(); 166 if (wasInitialized) OnDiconnected(); 84 167 } 85 168 86 169 #endregion 170 171 #region Events 172 public event EventHandler IpAddressChanged; 173 protected void OnIpAddressChanged() { 174 EventHandler handler = IpAddressChanged; 175 if (handler != null) handler(this, EventArgs.Empty); 176 } 177 public event EventHandler PortChanged; 178 protected void OnPortChanged() { 179 EventHandler handler = PortChanged; 180 if (handler != null) handler(this, EventArgs.Empty); 181 } 182 public event EventHandler Connected; 183 protected void OnConnected() { 184 EventHandler handler = Connected; 185 if (handler != null) handler(this, EventArgs.Empty); 186 } 187 public event EventHandler Disconnected; 188 protected void OnDiconnected() { 189 EventHandler handler = Disconnected; 190 if (handler != null) handler(this, EventArgs.Empty); 191 } 192 #endregion 87 193 } 88 194 }
Note: See TracChangeset
for help on using the changeset viewer.