当前位置: 首页 > 知识库问答 >
问题:

Netty从消息长度开始处理ASCII消息的最佳方法?

郑燕七
2023-03-14

我有一个Netty4.x应用程序,它需要发送和接收ASCII消息,从一个固定长度(10位数,零填充)字段开始,包含以#字符为单位的消息大小。消息如下:

0000000059{message_info={message_type=login}|login_id=abc|password=}
0000000114{message_info={message_type=pricefeed_toggle}|instrument_id={feedcode=1234|market=xyz}|toggle=true|best_only=true}

共有1个答案

陶烨赫
2023-03-14

也有同样的问题,我的解决方案是扩展LengthFieldBasedFrameDecoder并重写getUnadjustedFrameLength方法。这是我的课:

public class StringLengthFieldBasedFrameDecoder extends LengthFieldBasedFrameDecoder {
    private Charset charset;

    public StringLengthFieldBasedFrameDecoder(
            int maxFrameLength,
            int lengthFieldOffset, int lengthFieldLength) {
        this(maxFrameLength, lengthFieldOffset, lengthFieldLength, 0, 0);
    }

    public StringLengthFieldBasedFrameDecoder(
            int maxFrameLength,
            int lengthFieldOffset, int lengthFieldLength,
            int lengthAdjustment, int initialBytesToStrip) {
        this(
                maxFrameLength,
                lengthFieldOffset, lengthFieldLength, lengthAdjustment,
                initialBytesToStrip, true);
    }

    public StringLengthFieldBasedFrameDecoder(
            int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
            int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        this(
                ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength,
                lengthAdjustment, initialBytesToStrip, failFast, Charset.forName("US-ASCII"));
    }

    public StringLengthFieldBasedFrameDecoder(
            ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
            int lengthAdjustment, int initialBytesToStrip, boolean failFast, Charset charset) {
        super(byteOrder, maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
        this.charset = charset;
    }

    /**
     * Decodes the specified region of the buffer into an unadjusted frame length.  This implementation will
     * read a String of length bytes from the ByteBuf at the given offset. This string will then be parsed into a
     * long using the charset specified on initialization (default "US-ASCII"). Note that this method must not
     * modify the state of the specified buffer (e.g. {@code readerIndex}, {@code writerIndex}, and the content of
     * the buffer.)
     *
     * @throws io.netty.handler.codec.DecoderException if failed to decode the specified region
     */
    protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
        try {
            return Long.parseLong(buf.toString(offset, length, charset));
        } catch (NumberFormatException nfe) {
            throw new DecoderException(nfe);
        }
    }
}

和一些测试代码:

public class StringLengthFieldBasedFrameDecoderTest {
    @DataProvider
    private static final Object[][] getTestData() throws UnsupportedEncodingException {
        try {
            String message = "Hello World!";
            ByteBuf asciiLength = Unpooled.buffer();
            asciiLength.writeBytes(String.format("%08d", message.length()).getBytes("US-ASCII"));
            asciiLength.writeBytes(message.getBytes("US-ASCII"));

            ByteBuf utf16LELength = Unpooled.buffer();
            utf16LELength.writeBytes(String.format("%08d", message.length()).getBytes("UTF-16LE"));
            utf16LELength.writeBytes(message.getBytes("US-ASCII"));

            return new Object[][]{
                    {new StringLengthFieldBasedFrameDecoder(1024, 0, 8, 0, 8), asciiLength, message},
                    {new StringLengthFieldBasedFrameDecoder(ByteOrder.nativeOrder(), 1024, 0, 16, 0, 16, true, Charset.forName("UTF-16LE")), utf16LELength, message}
            };
        } catch (UnsupportedEncodingException uee) {
            System.out.println(uee.getMessage());
            throw uee;
        }
    }

    @Test(dataProvider = "getTestData")
    public void testReturnsCorrectMessage(StringLengthFieldBasedFrameDecoder decoder, ByteBuf buffer, String expectedMessage) {   
        EmbeddedChannel channel = new EmbeddedChannel(decoder);
        channel.writeInbound(buffer);

        Assert.assertTrue(channel.finish());
        ByteBuf input = (ByteBuf) channel.readInbound();
        assertEquals(input.toString(0, input.readableBytes(), Charset.forName("US-ASCII")), expectedMessage);
    }
}
 类似资料:
  • 我是Netty的新手,需要以自定义方式处理消息。我有以下接口: 现在我从客户端收到一些数据,并希望对其进行处理。我正在实现

  • 我正在使用netty构建一个应用程序。在应用程序中,我需要处理传入和传出的消息。要求是应用程序将发送的任何消息都应由特定的处理程序处理,进入应用程序的任何消息都应由另一个特定的处理程序处理。但是,我希望在两个处理程序之间交换消息,以便能够跟踪发送的消息响应,因为请求消息也将发送到应用程序。 请任何想法hwo实施这样的要求。这个问题听起来可能不相关,但这就是我得到的,我还不是一个网络极客。我读到的关

  • 虽然auto.offset.reset的值是最新的,但使用者从属于2天前的消息开始,然后就会赶上最新的消息。 我错过了什么?

  • 通常,我希望将消息发送到另一个路由来处理它,但我不希望为后续步骤修改该消息。做这件事最好的方法是什么? 我发现的另一个选择是使用异步sedaendpoint,它将原始消息返回给生产者并处理副本,但这会引入异步行为,而异步行为可能并不总是可取的。 看来一定有更好的办法?

  • 问题内容: 我正在将node.js mosca MQTT代理用于某些物联网(iot)应用程序。 https://github.com/mcollina/mosca mosca代理可以接收的最大消息长度是多少?限制消息长度的因素有哪些? 如果要增加消息长度,是否可以修改配置参数,或者可以更改代码的哪一部分? 问题答案: 您在这里问的内容还不是很清楚,所以我会回答两种可能性。 实际主题字符串的长度最多

  • 我有一个来捕获可能发生的: 这里是有问题的对象: 如果在和中发送以下JSON正文: 如何处理在websocket消息上引发的?