当前位置: 首页 > 知识库问答 >
问题:

如何使用reactor netty TcpClient链接多个发送和接收操作

慕嘉茂
2023-03-14

我需要在由顺序发送组成的TCP连接中执行自定义握手-

这是我所拥有的:

    DisposableServer server = TcpServer.create()
            .host("localhost")
            .port(4059)
            .wiretap(true)
            .handle((nettyInbound, nettyOutbound) ->
                    nettyInbound.receive().asByteArray().flatMap(bytes -> {
                        log.info("Server inbound: {}", bytes);
                        if (Arrays.equals(bytes, new byte[]{1, 2, 3})) {
                            nettyOutbound.sendByteArray(Mono.just(new byte[]{7, 6, 5})).then().subscribe();
                        } else if (Arrays.equals(bytes, new byte[]{5, 6, 7})) {
                            nettyOutbound.sendByteArray(Mono.just(new byte[]{9, 8, 7})).then().subscribe();
                        }
                        return Mono.empty();
                    }))
            .bindNow();

    TcpClient.create()
            .host("localhost")
            .port(4059)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
            .wiretap(true)
            .connect()
            .flatMap(connection ->
                    connection.outbound().sendByteArray(Mono.just(new byte[]{1, 2, 3}))
                            .then(connection.inbound().receive().asByteArray().next().flatMap(bytes -> {
                                log.info("bytes {}", bytes);
                                return Mono.empty();
                            })).sendByteArray(Mono.just(new byte[]{5, 6, 7}))
                            .then(connection.inbound().receive().asByteArray().next().flatMap(bytes -> {
                                log.info("bytes {}", bytes);
                                return Mono.empty();
                            }))
                            .then()
            )
            .subscribe();

    server.onDispose().block();

最大的问题是TcpClient的第二个receiveflatMap未被执行,并且在日志中wireltap正确显示数据已读取,但未发送到该flatMap

日志还显示客户端发送了多条相同的消息:

14:25:04.394 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpClient - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] WRITE: 3B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03                                        |...             |
+--------+-------------------------------------------------+----------------+
14:25:04.480 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpClient - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] FLUSH
14:25:04.509 [reactor-tcp-nio-2] DEBUG reactor.netty.channel.FluxReceive - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
14:25:04.525 [reactor-tcp-nio-3] DEBUG reactor.netty.tcp.TcpServer - [id: 0xf6dd2f6d, L:/127.0.0.1:4059 - R:/127.0.0.1:52044] READ: 3B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03                                        |...             |
+--------+-------------------------------------------------+----------------+
14:25:04.527 [reactor-tcp-nio-3] INFO com.example.TcpTest - Server inbound: [1, 2, 3]
14:25:04.528 [reactor-tcp-nio-3] DEBUG reactor.netty.tcp.TcpServer - [id: 0xf6dd2f6d, L:/127.0.0.1:4059 - R:/127.0.0.1:52044] WRITE: 3B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 07 06 05                                        |...             |
+--------+-------------------------------------------------+----------------+
14:25:04.528 [reactor-tcp-nio-3] DEBUG reactor.netty.tcp.TcpServer - [id: 0xf6dd2f6d, L:/127.0.0.1:4059 - R:/127.0.0.1:52044] FLUSH
14:25:04.529 [reactor-tcp-nio-3] DEBUG reactor.netty.tcp.TcpServer - [id: 0xf6dd2f6d, L:/127.0.0.1:4059 - R:/127.0.0.1:52044] READ COMPLETE
14:25:04.529 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpClient - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] READ: 3B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 07 06 05                                        |...             |
+--------+-------------------------------------------------+----------------+
14:25:04.530 [reactor-tcp-nio-2] INFO com.example.TcpTest - bytes [7, 6, 5]
14:25:04.531 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpClient - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] WRITE: 3B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03                                        |...             |
+--------+-------------------------------------------------+----------------+
14:25:04.536 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpClient - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] FLUSH
14:25:04.536 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpClient - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] WRITE: 3B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 05 06 07                                        |...             |
+--------+-------------------------------------------------+----------------+
14:25:04.536 [reactor-tcp-nio-3] DEBUG reactor.netty.tcp.TcpServer - [id: 0xf6dd2f6d, L:/127.0.0.1:4059 - R:/127.0.0.1:52044] READ: 3B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03                                        |...             |
+--------+-------------------------------------------------+----------------+
14:25:04.536 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpClient - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] FLUSH
14:25:04.536 [reactor-tcp-nio-3] INFO com.example.TcpTest - Server inbound: [1, 2, 3]
14:25:04.536 [reactor-tcp-nio-3] DEBUG reactor.netty.tcp.TcpServer - [id: 0xf6dd2f6d, L:/127.0.0.1:4059 - R:/127.0.0.1:52044] WRITE: 3B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 07 06 05                                        |...             |
+--------+-------------------------------------------------+----------------+
14:25:04.537 [reactor-tcp-nio-2] DEBUG reactor.netty.channel.FluxReceive - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] Subscribing inbound receiver [pending: 0, cancelled:true, inboundDone: false]
14:25:04.537 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpClient - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] READ COMPLETE
14:25:04.537 [reactor-tcp-nio-3] DEBUG reactor.netty.tcp.TcpServer - [id: 0xf6dd2f6d, L:/127.0.0.1:4059 - R:/127.0.0.1:52044] FLUSH
14:25:04.537 [reactor-tcp-nio-3] DEBUG reactor.netty.tcp.TcpServer - [id: 0xf6dd2f6d, L:/127.0.0.1:4059 - R:/127.0.0.1:52044] READ COMPLETE
14:25:04.537 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpClient - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] READ: 3B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 07 06 05                                        |...             |
+--------+-------------------------------------------------+----------------+
14:25:04.537 [reactor-tcp-nio-3] DEBUG reactor.netty.tcp.TcpServer - [id: 0xf6dd2f6d, L:/127.0.0.1:4059 - R:/127.0.0.1:52044] READ: 3B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 05 06 07                                        |...             |
+--------+-------------------------------------------------+----------------+
14:25:04.538 [reactor-tcp-nio-3] INFO com.example.TcpTest - Server inbound: [5, 6, 7]
14:25:04.538 [reactor-tcp-nio-3] DEBUG reactor.netty.tcp.TcpServer - [id: 0xf6dd2f6d, L:/127.0.0.1:4059 - R:/127.0.0.1:52044] WRITE: 3B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 09 08 07                                        |...             |
+--------+-------------------------------------------------+----------------+
14:25:04.538 [reactor-tcp-nio-3] DEBUG reactor.netty.tcp.TcpServer - [id: 0xf6dd2f6d, L:/127.0.0.1:4059 - R:/127.0.0.1:52044] FLUSH
14:25:04.538 [reactor-tcp-nio-2] DEBUG reactor.netty.channel.FluxReceive - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] Dropping frame PooledUnsafeDirectByteBuf(ridx: 0, widx: 3, cap: 1024), 0 in buffer
14:25:04.539 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpClient - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] READ COMPLETE
14:25:04.539 [reactor-tcp-nio-3] DEBUG reactor.netty.tcp.TcpServer - [id: 0xf6dd2f6d, L:/127.0.0.1:4059 - R:/127.0.0.1:52044] READ COMPLETE
14:25:04.539 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpClient - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] READ: 3B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 09 08 07                                        |...             |
+--------+-------------------------------------------------+----------------+
14:25:04.539 [reactor-tcp-nio-2] DEBUG reactor.netty.channel.FluxReceive - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] Dropping frame PooledUnsafeDirectByteBuf(ridx: 0, widx: 3, cap: 512), 0 in buffer
14:25:04.539 [reactor-tcp-nio-2] DEBUG reactor.netty.tcp.TcpClient - [id: 0x3d2ab8be, L:/127.0.0.1:52044 - R:localhost/127.0.0.1:4059] READ COMPLETE

有人能给我指出正确的方向吗如何正确地链TcpClient发送-

共有1个答案

秦承允
2023-03-14

我明白了。问题中的解决方案不起作用的原因是您无法调用连接。入站()。receive()两次,但仅一次,整个通信流必须在该调用中处理。

我写了一篇文章,给出了正确的解决方案:如何通过使用反应器网络通过TCP实现自定义握手协议

 类似资料:
  • 问题内容: 我正在使用套接字连接我的Android应用程序(客户端)和Java后端服务器。每次与服务器通信时,我都希望从客户端发送两个数据变量。 1)某种消息(由用户通过界面定义) 2)消息的语言(由用户通过界面定义) 我该如何发送这些消息,以便服务器将每个消息解释为一个单独的实体? 在读取了服务器端的数据并做出了适当的结论之后,我想向客户端返回一条消息。(我想我会没事的) 因此,我的两个问题是如

  • 我想使用Qt UDP(不是TCP)套接字传输文件。所以我这样写代码: 看来要传输的Qt UDP数据包的最小大小是8192字节。接收方总是得到第一个数据包,但不能接收其他数据包。 我对Qt和网络编程的经验很少,所以我不知道我的猜想对不对。你能告诉我如何改变这些代码来支持在第一个数据包之后接收数据包,这样我就可以传输大数据了吗?

  • 问题内容: 有人可以帮助我使用Python中的AT命令发送和接收SMS吗? 万一重要,我正在使用Fedora 8。 哪款手机搭配Linux(诺基亚,索尼爱立信,三星等)会更好?所有电话都支持使用AT命令发送和接收SMS吗? 问题答案: 以下是一些示例代码,可以帮助您入门(在Python 3000中): 您还需要做两件事: 用适当的格式编码消息(主要是GSM 03.38,unicode.org上有一

  • 我希望我的Java应用程序在不使用任何额外硬件设备的情况下发送和接收短信,而且它必须是免费的。 我进行了搜索,但我只找到了标题,我找到了一些类似SMSLib的东西,但另一方面,我没有找到学习这些的教程或书籍。 我还发现了SMSLib代码,但不明白: 发送消息/短信代码 阅读信息/短信代码

  • http://Socket.io允许你触发或响应自定义的事件,除了connect,message,disconnect这些事件的名字不能使用之外,你可以触发任何自定义的事件名称。 服务器端 // 注意,io(<端口号>) 将为你创建一个http服务。 var io = require('socket.io')(80); io.on('connection', function (socket)

  • 我想做以下事情: 感谢任何回应。 我的单元测试如下: 演示方法用于组装tlv(我们的自定义协议)对象。 关键日志如下: 创建的TcpClient工作不好,我不知道如何更正,有人能告诉我一种正确的方法吗。