我正在为我的项目制作Netty原型。我试图在Netty之上实现一个简单的面向文本/字符串的协议。在我的管道中,我使用了以下内容:
public class TextProtocolPipelineFactory implements ChannelPipelineFactory
{
@Override
public ChannelPipeline getPipeline() throws Exception
{
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
// Add the text line codec combination first,
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(2000000, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// and then business logic.
pipeline.addLast("handler", new TextProtocolHandler());
return pipeline;
}
}
我有一个DelimiterBaseFrameDecoder、一个字符串解码器和一个字符串编码器在管道中。
由于此设置,我的传入消息被拆分为多个字符串。这导致多次调用我的处理程序的“messageReceived”方法。这很好。但是,这需要我在内存中累积这些消息,并在收到消息的最后一个字符串包时重新构造消息。
我的问题是,什么是“积累字符串”然后“将它们重新构造成最终消息”的最具内存效率的方法。到目前为止,我有3个选择。它们是:
>
使用StringBuilder累积和toString构造。(这给出了最差的内存性能。事实上,对于具有大量并发用户的大型有效负载,这给出了不可接受的性能)
通过ByteArrayOutputStream累加到ByteArray中,然后使用字节数组构造(这比选项1的性能要好得多,但它仍然占用大量内存)
累积到动态通道缓冲区中,并使用toString(字符集)构造。我还没有分析过这个设置,但我很好奇这与上面两个选项相比如何。是否有人使用动态通道缓冲区解决了此问题?
我是Netty的新手,我可能在架构上做错了什么。您的意见将不胜感激。
提前谢谢Sohil
添加自定义帧解码器的实现供Norman查看
public final class TextProtocolFrameDecoder extends FrameDecoder
{
public static ChannelBuffer messageDelimiter()
{
return ChannelBuffers.wrappedBuffer(new byte[] {'E','O','F'});
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,ChannelBuffer buffer)
throws Exception
{
int eofIndex = find(buffer, messageDelimiter());
if(eofIndex != -1)
{
ChannelBuffer frame = buffer.readBytes(buffer.readableBytes());
return frame;
}
return null;
}
private static int find(ChannelBuffer haystack, ChannelBuffer needle) {
for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i ++) {
int haystackIndex = i;
int needleIndex;
for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex ++) {
if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {
break;
} else {
haystackIndex ++;
if (haystackIndex == haystack.writerIndex() &&
needleIndex != needle.capacity() - 1) {
return -1;
}
}
}
if (needleIndex == needle.capacity()) {
// Found the needle from the haystack!
return i - haystack.readerIndex();
}
}
return -1;
}
}
我认为如果您实现自己的帧解码器,您将获得最佳性能。这将允许您缓冲所有数据,直到您真正需要将其分派给链中的下一个处理程序为止。请参考框架文件。
如果您不想自己处理CRLF的检测,也可以保留DelimiterBasedFrameDecoder,只需在其后面添加一个自定义FrameDecoder来组装表示一行文本的ChannelBuffers。
在这两种情况下,FrameDecoder都会尽可能地减少内存拷贝,尽量只“包装”缓冲区,而不是每次都复制它们。
也就是说,如果你想有最好的性能去与第一种方法,如果你想它容易去与第二;)
我有一个复杂的endpoint,需要在spring boot中为REST服务实现。它看起来如下所示: 到此endpoint的POST将创建一个新的采购订单。段必须始终在段之前,但段可以独立存在。 采购订单
我正在使用netty构建一个应用程序。在应用程序中,我需要处理传入和传出的消息。要求是应用程序将发送的任何消息都应由特定的处理程序处理,进入应用程序的任何消息都应由另一个特定的处理程序处理。但是,我希望在两个处理程序之间交换消息,以便能够跟踪发送的消息响应,因为请求消息也将发送到应用程序。 请任何想法hwo实施这样的要求。这个问题听起来可能不相关,但这就是我得到的,我还不是一个网络极客。我读到的关
我有一个离散事件流进入我的系统,我需要根据每个事件的内容应用规则。另外,我想对这些流事件应用复杂的事件处理。 约束1.这些规则是用户提供的,并将动态更改。2.每当应用规则时,我不想重新启动我的系统。3.HA 4.只有成熟的开源解决方案 可能的方式...1.在Storm螺栓内运行Esper CEP 2。让口水流到Storm螺栓里 > 这会处理单事件规则和复杂事件吗?规则更改是否需要我的Storm重新
问题内容: 我想在后台线程中运行一些Runnable。我想使用Handler,因为它便于延迟。我的意思是 凡 可运行 应当运行 后台 线程。是否可以创建这样的处理程序?是否在某个地方有“背景” Looper,或者该如何创建? PS我知道如何使用自定义类扩展Thread,但是比处理程序方式需要更多的编码工作。因此,请不要发布其他解决方案或类似的内容 如果Handler能以“干净”的方式做到这一点,我
我想在后台线程中运行一些Runnable。我想使用Handler,因为它方便延迟。我的意思是 runnable应该在后台线程中运行。有可能创造这样的处理器吗?某个地方有没有“背景”Looper或者我怎么才能创建它? 附言:我知道如何使用自定义类扩展Thread来做到这一点,但它需要更多的编码工作,而不是以处理程序的方式进行。因此,请不要发布其他解决方案或类似内容 我只是想知道汉德勒是否能以“干净”
在使用AsyncRabbitTemplate时,我在GZip/GUnzip消息处理方面遇到了问题。 使用同步模板设置时一切正常,如下所示: 但是,当我设置像这样的异步模板时: 回复消息没有正确解压缩,我得到了这个错误消息 我已尝试为AsyncRabbitTemplate提供一个配置好的DirectReplyToMessageListenerContainer,但没有帮助 这只会导致以下错误: [错