我想做以下事情:
TcpClient client = init();
String response1 = sendMsg(TcpClient client, String msg1);
String response2 = sendMsg(TcpClient client, String msg2);
close(TcpClient client);
感谢任何回应。
我的单元测试如下:
@Test
public void tcpServerHandlesTlvOverSsl() throws Exception {
final CountDownLatch latch = new CountDownLatch(3);
SelfSignedCertificate cert = new SelfSignedCertificate();
SslContextBuilder serverOptions = SslContextBuilder.forServer(cert.certificate(), cert.privateKey());
SslContext clientOptions = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
final TcpServer server =
TcpServer.create()
.host("localhost")
.secure(sslContextSpec -> sslContextSpec.sslContext(serverOptions));
// this is an relay tcp server
DisposableServer connectedServer =
server
.doOnConnection(connection -> {
// tlv is our customized protocol
connection.addHandlerLast("tlvEncoder", new TlvEncoder());
connection.addHandlerLast("tlvDecoder", new TlvDecoder());
// something like creating a new tcp client to the target server should be done here
})
.handle((in, out) ->
out.sendObject(
in
.receiveObject()
.log("[Server received]")
.ofType(TlvMessage.class)
.flatMap(tlv -> {
// do some work on each received message
fixTlv(tlv);
log.debug("Server fixed the tlv message: {}", tlv.toString());
return Mono.just(tlv);
})
.doOnNext(tlv -> {
if (isLegalType(tlv.getType())) {
latch.countDown();
}
// then forward the fixed tlv message to the target server by using the new created tcp client
log.debug("Server forwards to the target server: {}", tlv.toString());
// do something else with the response from the target server
if (TYPE_UPSTREAM_FINISH == tlv.getType()) {
// the closing flag is accounted
log.debug("Server closing client to the target server.");
// closing the new created tcp client to the target server
}
})
)
)
.wiretap(true)
.bindNow();
assertNotNull(connectedServer);
// simulating a client to the relay server
final TcpClient client = TcpClient.create()
.host("localhost")
.port(connectedServer.address().getPort())
.secure(spec -> spec.sslContext(clientOptions));
Connection connectedClient = client
.doOnConnected(connection -> {
connection.addHandlerLast("tlvEncoder", new TlvEncoder());
connection.addHandlerLast("tlvDecoder", new TlvDecoder());
})
.doOnDisconnected(connection -> log.info("Client disconnected"))
.option(ChannelOption.SO_KEEPALIVE, true)
.handle((in, out) -> {
in
.receiveObject()
.log("[Client received]")
.ofType(TlvMessage.class)
.doOnNext(tlv -> {
// do some work on each response from the relay server
if (isLegalType(tlv.getType())) {
latch.countDown();
}
});
return out
// simulating requests to the relay server
.sendObject(Flux.just(
demo(TYPE_UPSTREAM_INIT, "{\"param\":\"{\\\"device-id\\\":\\\"3F1806118800C678\\\"}\",\"pid\":\"-1001\",\"clientip\":\"xxx.xxx.xxx.xxx\",\"sn\":\"91598059-4d06-447a-878f-551637bcaf89_7\"}"),
demo(TYPE_UPSTREAM_FINISH, "{\"param\":\"{\\\"device-id\\\":\\\"3A1806118800C678\\\"}\",\"pid\":\"-1002\",\"clientip\":\"xxx.xxx.xxx.xxx\"}")
)
)
.neverComplete();
}
)
.wiretap(true)
.connectNow();
assertNotNull(connectedClient);
Thread.sleep(3000);
log.info("Client sends object after 3 seconds.");
connectedClient.outbound().sendObject(
Flux.just(
demo(TYPE_UPSTREAM_INIT, "{\"param\":\"{\\\"device-id\\\":\\\"3B1806118800C678\\\"}\",\"pid\":\"-1003\",\"clientip\":\"xxx.xxx.xxx.xxx\",\"sn\":\"91598059-dd06-447a-878f-551637bcaf89_7\"}"),
demo(TYPE_UPSTREAM_FINISH, "{\"param\":\"{\\\"device-id\\\":\\\"3C1806118800C678\\\"}\",\"pid\":\"-1004\",\"clientip\":\"xxx.xxx.xxx.xxx\"}")
)
);
assertTrue("Latch was counted down", latch.await(5, TimeUnit.SECONDS));
connectedClient.disposeNow();
connectedServer.disposeNow();
}
演示方法用于组装tlv(我们的自定义协议)对象。
关键日志如下:
17:16:21.104 [reactor-tcp-nio-1] DEBUG reactor.netty.tcp.TcpClient - [id: 0x019c3187] REGISTERED
17:16:21.105 [reactor-tcp-nio-1] DEBUG reactor.netty.tcp.TcpClient - [id: 0x019c3187] CONNECT: localhost/127.0.0.1:60092
17:16:21.117 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.SslProvider - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] SSL enabled using engine SSLEngineImpl
17:16:21.118 [reactor-tcp-nio-2] DEBUG reactor.netty.channel.BootstrapHandlers - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] Initialized pipeline DefaultChannelPipeline{(reactor.left.sslHandler = io.netty.handler.ssl.SslHandler), (reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (reactor.left.sslReader = reactor.netty.tcp.SslProvider$SslReadHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
17:16:21.121 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpServer - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] REGISTERED
17:16:21.121 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpServer - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] ACTIVE
17:16:21.170 [reactor-tcp-nio-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
17:16:21.170 [reactor-tcp-nio-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
17:16:21.170 [reactor-tcp-nio-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
17:16:21.170 [reactor-tcp-nio-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
17:16:21.186 [reactor-tcp-nio-1] DEBUG reactor.netty.tcp.TcpClient - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] ACTIVE
17:16:21.186 [reactor-tcp-nio-1] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] Registering pool release on close event for channel
17:16:21.188 [reactor-tcp-nio-1] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] Channel connected, now 1 active connections and 0 inactive connections
17:16:21.202 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpServer - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] READ COMPLETE
17:16:21.203 [reactor-tcp-nio-1] DEBUG io.netty.handler.ssl.util.InsecureTrustManagerFactory - Accepting a server certificate: CN=example.com
17:16:21.241 [reactor-tcp-nio-1] DEBUG reactor.netty.tcp.TcpClient - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] READ COMPLETE
17:16:21.247 [reactor-tcp-nio-2] DEBUG io.netty.handler.ssl.SslHandler - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
17:16:21.248 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpServer - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] USER_EVENT: SslHandshakeCompletionEvent(SUCCESS)
17:16:21.350 [reactor-tcp-nio-2] DEBUG reactor.netty.ReactorNetty - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] Added decoder [tlvEncoder] at the end of the user pipeline, full pipeline: [reactor.left.sslHandler, reactor.left.loggingHandler, tlvEncoder, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
17:16:21.351 [reactor-tcp-nio-2] DEBUG reactor.netty.ReactorNetty - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] Added decoder [tlvDecoder] at the end of the user pipeline, full pipeline: [reactor.left.sslHandler, reactor.left.loggingHandler, tlvEncoder, tlvDecoder, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
17:16:21.351 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpServer - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] Handler is being applied: com.baidu.iot.devicecloud.devicemanager.TcpServerTests$$Lambda$7/824208363@3252dfd9
17:16:21.367 [reactor-tcp-nio-2] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] Writing object
17:16:21.375 [reactor-tcp-nio-2] DEBUG reactor.netty.channel.FluxReceive - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
17:16:21.377 [reactor-tcp-nio-2] INFO [Server received] - onSubscribe(FluxReceive)
17:16:21.379 [reactor-tcp-nio-2] INFO [Server received] - request(256)
17:16:21.380 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpServer - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] READ COMPLETE
17:16:21.381 [reactor-tcp-nio-1] DEBUG io.netty.handler.ssl.SslHandler - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
17:16:21.381 [reactor-tcp-nio-1] DEBUG reactor.netty.tcp.TcpClient - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] USER_EVENT: SslHandshakeCompletionEvent(SUCCESS)
17:16:21.381 [reactor-tcp-nio-1] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] onStateChange(PooledConnection{channel=[id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092]}, [connected])
17:16:21.381 [reactor-tcp-nio-1] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] onStateChange(ChannelOperations{PooledConnection{channel=[id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092]}}, [configured])
17:16:21.381 [reactor-tcp-nio-1] DEBUG reactor.netty.ReactorNetty - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] Added decoder [tlvEncoder] at the end of the user pipeline, full pipeline: [reactor.left.sslHandler, reactor.left.loggingHandler, tlvEncoder, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
17:16:21.382 [reactor-tcp-nio-1] DEBUG reactor.netty.ReactorNetty - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] Added decoder [tlvDecoder] at the end of the user pipeline, full pipeline: [reactor.left.sslHandler, reactor.left.loggingHandler, tlvEncoder, tlvDecoder, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
17:16:21.382 [reactor-tcp-nio-1] DEBUG reactor.netty.tcp.TcpClient - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] Handler is being applied: com.baidu.iot.devicecloud.devicemanager.TcpServerTests$$Lambda$27/1754662105@7f8c9193
17:16:21.652 [reactor-tcp-nio-1] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] Writing object
17:16:21.662 [reactor-tcp-nio-1] DEBUG reactor.netty.tcp.TcpClient - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] WRITE: 143B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 00 89 00 00 00 7b 22 70 61 72 61 6d 22 3a 22 |......{"param":"|
|00000010| 7b 5c 22 64 65 76 69 63 65 2d 69 64 5c 22 3a 5c |{\"device-id\":\|
|00000020| 22 33 46 31 38 30 36 31 31 38 38 30 30 43 36 37 |"3F1806118800C67|
|00000030| 38 5c 22 7d 22 2c 22 70 69 64 22 3a 22 2d 31 30 |8\"}","pid":"-10|
|00000040| 30 31 22 2c 22 63 6c 69 65 6e 74 69 70 22 3a 22 |01","clientip":"|
|00000050| 78 78 78 2e 78 78 78 2e 78 78 78 2e 78 78 78 22 |xxx.xxx.xxx.xxx"|
|00000060| 2c 22 73 6e 22 3a 22 39 31 35 39 38 30 35 39 2d |,"sn":"91598059-|
|00000070| 34 64 30 36 2d 34 34 37 61 2d 38 37 38 66 2d 35 |4d06-447a-878f-5|
|00000080| 35 31 36 33 37 62 63 61 66 38 39 5f 37 22 7d |51637bcaf89_7"} |
+--------+-------------------------------------------------+----------------+
17:16:21.664 [reactor-tcp-nio-1] DEBUG reactor.netty.tcp.TcpClient - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] WRITE: 97B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 04 00 5b 00 00 00 7b 22 70 61 72 61 6d 22 3a 22 |..[...{"param":"|
|00000010| 7b 5c 22 64 65 76 69 63 65 2d 69 64 5c 22 3a 5c |{\"device-id\":\|
|00000020| 22 33 41 31 38 30 36 31 31 38 38 30 30 43 36 37 |"3A1806118800C67|
|00000030| 38 5c 22 7d 22 2c 22 70 69 64 22 3a 22 2d 31 30 |8\"}","pid":"-10|
|00000040| 30 32 22 2c 22 63 6c 69 65 6e 74 69 70 22 3a 22 |02","clientip":"|
|00000050| 78 78 78 2e 78 78 78 2e 78 78 78 2e 78 78 78 22 |xxx.xxx.xxx.xxx"|
|00000060| 7d |} |
+--------+-------------------------------------------------+----------------+
17:16:21.664 [reactor-tcp-nio-1] DEBUG reactor.netty.tcp.TcpClient - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] FLUSH
17:16:21.666 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpServer - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] READ: 240B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 00 89 00 00 00 7b 22 70 61 72 61 6d 22 3a 22 |......{"param":"|
|00000010| 7b 5c 22 64 65 76 69 63 65 2d 69 64 5c 22 3a 5c |{\"device-id\":\|
|00000020| 22 33 46 31 38 30 36 31 31 38 38 30 30 43 36 37 |"3F1806118800C67|
|00000030| 38 5c 22 7d 22 2c 22 70 69 64 22 3a 22 2d 31 30 |8\"}","pid":"-10|
|00000040| 30 31 22 2c 22 63 6c 69 65 6e 74 69 70 22 3a 22 |01","clientip":"|
|00000050| 78 78 78 2e 78 78 78 2e 78 78 78 2e 78 78 78 22 |xxx.xxx.xxx.xxx"|
|00000060| 2c 22 73 6e 22 3a 22 39 31 35 39 38 30 35 39 2d |,"sn":"91598059-|
|00000070| 34 64 30 36 2d 34 34 37 61 2d 38 37 38 66 2d 35 |4d06-447a-878f-5|
|00000080| 35 31 36 33 37 62 63 61 66 38 39 5f 37 22 7d 04 |51637bcaf89_7"}.|
|00000090| 00 5b 00 00 00 7b 22 70 61 72 61 6d 22 3a 22 7b |.[...{"param":"{|
|000000a0| 5c 22 64 65 76 69 63 65 2d 69 64 5c 22 3a 5c 22 |\"device-id\":\"|
|000000b0| 33 41 31 38 30 36 31 31 38 38 30 30 43 36 37 38 |3A1806118800C678|
|000000c0| 5c 22 7d 22 2c 22 70 69 64 22 3a 22 2d 31 30 30 |\"}","pid":"-100|
|000000d0| 32 22 2c 22 63 6c 69 65 6e 74 69 70 22 3a 22 78 |2","clientip":"x|
|000000e0| 78 78 2e 78 78 78 2e 78 78 78 2e 78 78 78 22 7d |xx.xxx.xxx.xxx"}|
+--------+-------------------------------------------------+----------------+
17:16:21.667 [reactor-tcp-nio-1] DEBUG reactor.netty.tcp.TcpClient - [id: 0x019c3187, L:/127.0.0.1:60093 - R:localhost/127.0.0.1:60092] READ COMPLETE
17:16:21.667 [reactor-tcp-nio-2] DEBUG com.baidu.iot.devicecloud.devicemanager.handler.TlvDecoder - TlvDecoder read type: 1
17:16:21.667 [reactor-tcp-nio-2] DEBUG com.baidu.iot.devicecloud.devicemanager.handler.TlvDecoder - TlvDecoder read length: 137
17:16:21.668 [reactor-tcp-nio-2] DEBUG com.baidu.iot.devicecloud.devicemanager.handler.TlvDecoder - TlvDecoder read data success: {"param":"{\"device-id\":\"3F1806118800C678\"}","pid":"-1001","clientip":"xxx.xxx.xxx.xxx","sn":"91598059-4d06-447a-878f-551637bcaf89_7"}
17:16:21.668 [reactor-tcp-nio-2] DEBUG com.baidu.iot.devicecloud.devicemanager.handler.TlvDecoder - TlvDecoder read type: 4
17:16:21.668 [reactor-tcp-nio-2] DEBUG com.baidu.iot.devicecloud.devicemanager.handler.TlvDecoder - TlvDecoder read length: 91
17:16:21.668 [reactor-tcp-nio-2] DEBUG com.baidu.iot.devicecloud.devicemanager.handler.TlvDecoder - TlvDecoder read data success: {"param":"{\"device-id\":\"3A1806118800C678\"}","pid":"-1002","clientip":"xxx.xxx.xxx.xxx"}
17:16:21.669 [reactor-tcp-nio-2] INFO [Server received] - onNext(TlvMessage(type=1, length=137, value={"param":"{\"device-id\":\"3F1806118800C678\"}","pid":"-1001","clientip":"xxx.xxx.xxx.xxx","sn":"91598059-4d06-447a-878f-551637bcaf89_7"}))
17:16:21.670 [reactor-tcp-nio-2] DEBUG com.baidu.iot.devicecloud.devicemanager.TcpServerTests - Server fixed the tlv message: TlvMessage(type=1, length=157, value={"param":"{\"device-id\":\"3F1806118800C678\"}","pid":"-1001","clientip":"xxx.xxx.xxx.xxx","sn":"91598059-4d06-447a-878f-551637bcaf89_7","fixed":"by server"})
17:16:21.671 [reactor-tcp-nio-2] DEBUG com.baidu.iot.devicecloud.devicemanager.TcpServerTests - Server forwards to the target server: TlvMessage(type=1, length=157, value={"param":"{\"device-id\":\"3F1806118800C678\"}","pid":"-1001","clientip":"xxx.xxx.xxx.xxx","sn":"91598059-4d06-447a-878f-551637bcaf89_7","fixed":"by server"})
17:16:21.671 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpServer - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] WRITE: 163B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 00 9d 00 00 00 7b 22 70 61 72 61 6d 22 3a 22 |......{"param":"|
|00000010| 7b 5c 22 64 65 76 69 63 65 2d 69 64 5c 22 3a 5c |{\"device-id\":\|
|00000020| 22 33 46 31 38 30 36 31 31 38 38 30 30 43 36 37 |"3F1806118800C67|
|00000030| 38 5c 22 7d 22 2c 22 70 69 64 22 3a 22 2d 31 30 |8\"}","pid":"-10|
|00000040| 30 31 22 2c 22 63 6c 69 65 6e 74 69 70 22 3a 22 |01","clientip":"|
|00000050| 78 78 78 2e 78 78 78 2e 78 78 78 2e 78 78 78 22 |xxx.xxx.xxx.xxx"|
|00000060| 2c 22 73 6e 22 3a 22 39 31 35 39 38 30 35 39 2d |,"sn":"91598059-|
|00000070| 34 64 30 36 2d 34 34 37 61 2d 38 37 38 66 2d 35 |4d06-447a-878f-5|
|00000080| 35 31 36 33 37 62 63 61 66 38 39 5f 37 22 2c 22 |51637bcaf89_7","|
|00000090| 66 69 78 65 64 22 3a 22 62 79 20 73 65 72 76 65 |fixed":"by serve|
|000000a0| 72 22 7d |r"} |
+--------+-------------------------------------------------+----------------+
17:16:21.671 [reactor-tcp-nio-2] INFO [Server received] - onNext(TlvMessage(type=4, length=91, value={"param":"{\"device-id\":\"3A1806118800C678\"}","pid":"-1002","clientip":"xxx.xxx.xxx.xxx"}))
17:16:21.671 [reactor-tcp-nio-2] DEBUG com.baidu.iot.devicecloud.devicemanager.TcpServerTests - Server fixed the tlv message: TlvMessage(type=4, length=111, value={"param":"{\"device-id\":\"3A1806118800C678\"}","pid":"-1002","clientip":"xxx.xxx.xxx.xxx","fixed":"by server"})
17:16:21.671 [reactor-tcp-nio-2] DEBUG com.baidu.iot.devicecloud.devicemanager.TcpServerTests - Server forwards to the target server: TlvMessage(type=4, length=111, value={"param":"{\"device-id\":\"3A1806118800C678\"}","pid":"-1002","clientip":"xxx.xxx.xxx.xxx","fixed":"by server"})
17:16:21.671 [reactor-tcp-nio-2] DEBUG com.baidu.iot.devicecloud.devicemanager.TcpServerTests - Server closing client to the target server.
17:16:21.672 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpServer - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] WRITE: 117B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 04 00 6f 00 00 00 7b 22 70 61 72 61 6d 22 3a 22 |..o...{"param":"|
|00000010| 7b 5c 22 64 65 76 69 63 65 2d 69 64 5c 22 3a 5c |{\"device-id\":\|
|00000020| 22 33 41 31 38 30 36 31 31 38 38 30 30 43 36 37 |"3A1806118800C67|
|00000030| 38 5c 22 7d 22 2c 22 70 69 64 22 3a 22 2d 31 30 |8\"}","pid":"-10|
|00000040| 30 32 22 2c 22 63 6c 69 65 6e 74 69 70 22 3a 22 |02","clientip":"|
|00000050| 78 78 78 2e 78 78 78 2e 78 78 78 2e 78 78 78 22 |xxx.xxx.xxx.xxx"|
|00000060| 2c 22 66 69 78 65 64 22 3a 22 62 79 20 73 65 72 |,"fixed":"by ser|
|00000070| 76 65 72 22 7d |ver"} |
+--------+-------------------------------------------------+----------------+
17:16:21.672 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpServer - [id: 0x424c0204, L:/127.0.0.1:60092 - R:/127.0.0.1:60093] READ COMPLETE
17:16:24.384 [main] INFO com.baidu.iot.devicecloud.devicemanager.TcpServerTests - Client sends object after 3 seconds.
java.lang.AssertionError: Latch was counted down
Process finished with exit code -1
创建的TcpClient工作不好,我不知道如何更正,有人能告诉我一种正确的方法吗。
我可以在您的代码中看到两个问题:
.handle((in, out) -> {
in
.receiveObject()
.log("[Client received]")
.ofType(TlvMessage.class)
.doOnNext(tlv -> {
// do some work on each response from the relay server
if (isLegalType(tlv.getType())) {
latch.countDown();
}
})
.subscribe();
return out
// simulating requests to the relay server
.sendObject(Flux.just(
demo(TYPE_UPSTREAM_INIT, "{\"param\":\"{\\\"device-id\\\":\\\"3F1806118800C678\\\"}\",\"pid\":\"-1001\",\"clientip\":\"xxx.xxx.xxx.xxx\",\"sn\":\"91598059-4d06-447a-878f-551637bcaf89_7\"}"),
demo(TYPE_UPSTREAM_FINISH, "{\"param\":\"{\\\"device-id\\\":\\\"3A1806118800C678\\\"}\",\"pid\":\"-1002\",\"clientip\":\"xxx.xxx.xxx.xxx\"}")
)
)
.neverComplete();
}
)
connectedClient.outbound().sendObject(
Flux.just(
demo(TYPE_UPSTREAM_INIT, "{\"param\":\"{\\\"device-id\\\":\\\"3B1806118800C678\\\"}\",\"pid\":\"-1003\",\"clientip\":\"xxx.xxx.xxx.xxx\",\"sn\":\"91598059-dd06-447a-878f-551637bcaf89_7\"}"),
demo(TYPE_UPSTREAM_FINISH, "{\"param\":\"{\\\"device-id\\\":\\\"3C1806118800C678\\\"}\",\"pid\":\"-1004\",\"clientip\":\"xxx.xxx.xxx.xxx\"}")
)
)
.then()
.subscribe();
通常在服务器发送一些数据时发生Message事件。服务器发送到客户端的消息可以包括纯文本消息,二进制数据或图像。无论何时发送数据,都会触发函数。 此事件充当客户端对服务器的耳朵。每当服务器发送数据时,都会触发事件。 以下代码段描述了打开Web Socket协议的连接。 还需要考虑使用Web套接字可以传输哪些类型的数据。Web套接字协议支持文本和二进制数据。就Javascript而言,文本指的是字符
我有1个活动和1个普通类,其中活动1接收消息,普通类发送消息。如何实施: 在活动一中。班 在Ordinary.class 如何发送空消息(1)的代码?
如何发送buf然后接收msg 方法 我正在尝试通过从连接出站发送msg并从入站接收msg然后返回消息Mono来实现此方法。但我只能在that(Publisher)方法中接收消息。它似乎无法返回数据Mono 我试过这个。 但它会一直阻塞,直到连接超时 我尝试了另一个代码。我添加了一个handle方法,并将响应放到map中。然后我可以得到单声道。fromSupply(),在映射处有一个while循环中
我需要在由顺序发送组成的TCP连接中执行自定义握手- 这是我所拥有的: 最大的问题是的第二个的未被执行,并且在日志中正确显示数据已读取,但未发送到该。 日志还显示客户端发送了多条相同的消息: 有人能给我指出正确的方向吗如何正确地链-
本文向大家介绍python 发送和接收ActiveMQ消息的实例,包括了python 发送和接收ActiveMQ消息的实例的使用技巧和注意事项,需要的朋友参考一下 ActiveMQ是java开发的消息中间件服务。可以支持多种协议(AMQP,MQTT,OpenWire,Stomp),默认的是OpenWire。而python与ActiveMQ的通信使用的是Stomp协议。而如果你的服务没有开启则需要配
我正在尝试做一些项目前的经验,对最新的反应堆网络版本,缺乏文档;我使用的是0.8.0.m3版本。 我用这个tcp服务器开发了一个简单的spring boot应用程序,它可以正确启动,并且看起来可以正常工作: 如果我尝试使用客户端进行测试,交互似乎是正确的,但我无法收到任何响应: 查看wiretap日志,客户机将每个int作为字符串单独发送,而服务器只接收一个聚合字符串“0123456789”并只发