我有一个Netty4.x应用程序,它需要发送和接收ASCII消息,从一个固定长度(10位数,零填充)字段开始,包含以#字符为单位的消息大小。消息如下:
0000000059{message_info={message_type=login}|login_id=abc|password=}
0000000114{message_info={message_type=pricefeed_toggle}|instrument_id={feedcode=1234|market=xyz}|toggle=true|best_only=true}
也有同样的问题,我的解决方案是扩展LengthFieldBasedFrameDecoder并重写getUnadjustedFrameLength方法。这是我的课:
public class StringLengthFieldBasedFrameDecoder extends LengthFieldBasedFrameDecoder {
private Charset charset;
public StringLengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength) {
this(maxFrameLength, lengthFieldOffset, lengthFieldLength, 0, 0);
}
public StringLengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip) {
this(
maxFrameLength,
lengthFieldOffset, lengthFieldLength, lengthAdjustment,
initialBytesToStrip, true);
}
public StringLengthFieldBasedFrameDecoder(
int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
this(
ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength,
lengthAdjustment, initialBytesToStrip, failFast, Charset.forName("US-ASCII"));
}
public StringLengthFieldBasedFrameDecoder(
ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip, boolean failFast, Charset charset) {
super(byteOrder, maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
this.charset = charset;
}
/**
* Decodes the specified region of the buffer into an unadjusted frame length. This implementation will
* read a String of length bytes from the ByteBuf at the given offset. This string will then be parsed into a
* long using the charset specified on initialization (default "US-ASCII"). Note that this method must not
* modify the state of the specified buffer (e.g. {@code readerIndex}, {@code writerIndex}, and the content of
* the buffer.)
*
* @throws io.netty.handler.codec.DecoderException if failed to decode the specified region
*/
protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
try {
return Long.parseLong(buf.toString(offset, length, charset));
} catch (NumberFormatException nfe) {
throw new DecoderException(nfe);
}
}
}
和一些测试代码:
public class StringLengthFieldBasedFrameDecoderTest {
@DataProvider
private static final Object[][] getTestData() throws UnsupportedEncodingException {
try {
String message = "Hello World!";
ByteBuf asciiLength = Unpooled.buffer();
asciiLength.writeBytes(String.format("%08d", message.length()).getBytes("US-ASCII"));
asciiLength.writeBytes(message.getBytes("US-ASCII"));
ByteBuf utf16LELength = Unpooled.buffer();
utf16LELength.writeBytes(String.format("%08d", message.length()).getBytes("UTF-16LE"));
utf16LELength.writeBytes(message.getBytes("US-ASCII"));
return new Object[][]{
{new StringLengthFieldBasedFrameDecoder(1024, 0, 8, 0, 8), asciiLength, message},
{new StringLengthFieldBasedFrameDecoder(ByteOrder.nativeOrder(), 1024, 0, 16, 0, 16, true, Charset.forName("UTF-16LE")), utf16LELength, message}
};
} catch (UnsupportedEncodingException uee) {
System.out.println(uee.getMessage());
throw uee;
}
}
@Test(dataProvider = "getTestData")
public void testReturnsCorrectMessage(StringLengthFieldBasedFrameDecoder decoder, ByteBuf buffer, String expectedMessage) {
EmbeddedChannel channel = new EmbeddedChannel(decoder);
channel.writeInbound(buffer);
Assert.assertTrue(channel.finish());
ByteBuf input = (ByteBuf) channel.readInbound();
assertEquals(input.toString(0, input.readableBytes(), Charset.forName("US-ASCII")), expectedMessage);
}
}
我是Netty的新手,需要以自定义方式处理消息。我有以下接口: 现在我从客户端收到一些数据,并希望对其进行处理。我正在实现
我正在使用netty构建一个应用程序。在应用程序中,我需要处理传入和传出的消息。要求是应用程序将发送的任何消息都应由特定的处理程序处理,进入应用程序的任何消息都应由另一个特定的处理程序处理。但是,我希望在两个处理程序之间交换消息,以便能够跟踪发送的消息响应,因为请求消息也将发送到应用程序。 请任何想法hwo实施这样的要求。这个问题听起来可能不相关,但这就是我得到的,我还不是一个网络极客。我读到的关
虽然auto.offset.reset的值是最新的,但使用者从属于2天前的消息开始,然后就会赶上最新的消息。 我错过了什么?
通常,我希望将消息发送到另一个路由来处理它,但我不希望为后续步骤修改该消息。做这件事最好的方法是什么? 我发现的另一个选择是使用异步sedaendpoint,它将原始消息返回给生产者并处理副本,但这会引入异步行为,而异步行为可能并不总是可取的。 看来一定有更好的办法?
问题内容: 我正在将node.js mosca MQTT代理用于某些物联网(iot)应用程序。 https://github.com/mcollina/mosca mosca代理可以接收的最大消息长度是多少?限制消息长度的因素有哪些? 如果要增加消息长度,是否可以修改配置参数,或者可以更改代码的哪一部分? 问题答案: 您在这里问的内容还不是很清楚,所以我会回答两种可能性。 实际主题字符串的长度最多
我有一个来捕获可能发生的: 这里是有问题的对象: 如果在和中发送以下JSON正文: 如何处理在websocket消息上引发的?