当前位置: 首页 > 知识库问答 >
问题:

Netty ByteToMessageDecoder无法接收以不同tcp数据包发送的消息

米浩穰
2023-03-14

自定义ByteToMessageEncoder不接收在相同tcp连接中发送的字节,而是接收在不同tcp消息中发送的字节。

我被指派解决一个问题,一个几年前的系统开始出现不良行为。在我看来,有一个tcp服务器是由其他一些开发人员用netty编写的,它接收带有静态长度头和可变长度主体的二进制消息。正文长度由a头字段定义,该字段告诉消息类型。我们维护一个消息类型及其长度的地图。

所面临的问题是,在正确解码头部并知道主体长度之后,相同的解码器希望主体出现在相同的ByteBuf中(即来自byteChannel的一个fireChannelRead事件)。

然而,有时缓冲区中没有足够的东西,解码器就放弃了。但是下次调用decode-method时,正文字节会出现,并被错误地解释为头部,从而使解码器不同步。

使用netty来组装消息的正确方法是什么,这些消息的字节可能会以较小的块插入?

以下是当前解码器的基本知识。

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Message message = decode(ctx, in); 
        if (message != null) {
            out.add(message);
        }
    }

    protected Message decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in.readableBytes() < MessageHeader.SIZE) {
            return null;
        }
        ByteBuf headerBytes = in.readBytes(MessageHeader.SIZE);
        MessageHeader header = MessageHeader.decode(headerBytes, new ProcessingTracker(MessagePart.HEADER));
        if (header == null) {
            ctx.disconnect().sync();
            logger.debug("Disconnected from channel");
            return null;
        }
        int bodySize = header.getMessageType().getMessageBodySize();
        if (!waitingForBytes(in, bodySize, READ_TRY_TIMES)) {
            ctx.disconnect().sync();
            logger.debug("Disconnected from channel");
            return null;
        }
        ByteBuf messageBytes = in.readBytes(bodySize);
        messageBytes.resetReaderIndex();
        Message message = Message.decode(header, messageBytes, 0);
        return message;
    }


    public boolean waitingForBytes(ByteBuf in, int bodySize, int counter) {
        if (counter == 0) {
            logger.warn("Didn't get enough bytes of message body in MessagDecoder. Giving up and disconnecting from remote peer.");
            return false;
        }
        logger.debug(String.format("Readable bytes in buffer %d, expected %d", in.readableBytes(), bodySize));

        if (in.readableBytes() < bodySize) {

            try {
                Thread.sleep(20L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return waitingForBytes(in, bodySize, counter - 1);
        } else {
            return true;
        }
    }

共有1个答案

姬浩渺
2023-03-14

你的代码有多个问题...

首先,不允许在代码中调用sync(),因为这样将使EventLoop“死锁”。

其次,您不能在这里使用WaitingForBytes,因为它基本上会使EventLoop上的所有其他IO过时,这意味着您将永远不会继续执行任何IO。在像Netty这样的a框架中,千万不要阻塞EventLoop线程,因为这基本上会导致一切都过时,没有任何进展。

 类似资料:
  • 我不知道是网络配置还是我的软件出了问题。 这是监听代码: 奇怪的是,在wireshark上,我可以看到:数据包已从发送到,并且设备已对此数据包作出响应--来自的数据包已发送到。使用bind(0.0.0.0,端口)似乎不能涵盖。我迷路了,一点主意都没有。 ifconfig为:

  • 我一直在开发一个简单的python套接字聊天室,客户端和服务器可以在其中相互发送消息。我遇到的问题是服务器和客户端一次只能发送一条消息。我希望它能像任何其他聊天室一样工作,在那里我可以在发送消息时收到消息,任何帮助都会有很大帮助

  • 我试着做一个简单的服务器-客户机套接字通信,但服务器似乎无法正常工作。无论何时发送或接收,我都会收到此错误: 非插座上的插座操作:非插座上的插座操作 奇怪的是,这只会在服务器发送时出现。客户似乎还可以:

  • 我的问题是为什么接收端没有得到发送的数据包? 注意:我的目标是在建立的连接上发送带有错误校验和的TCP数据包,并由不scapy的TCP服务器接收,提前谢谢!!

  • 我在JAVA中通过TCP接收字节数据包时遇到了一些问题。我的TCPServer类发送207字节的数据包。当我发送一个数据包时,控制台中的程序显示“读取207字节的数据包”然后停下来。在下一个数据包继续执行时,显示“多重测量”和“读取1868767867字节数据包”。之后,接收将永远停止。我不知道它为什么接收1868767867字节。我在wireshark中检查它,服务器总是发送207字节。 这是我