当前位置: 首页 > 知识库问答 >
问题:

在处理程序中组装一条复杂的消息

竺展
2023-03-14

我正在为我的项目制作Netty原型。我试图在Netty之上实现一个简单的面向文本/字符串的协议。在我的管道中,我使用了以下内容:

public class TextProtocolPipelineFactory implements ChannelPipelineFactory
{
@Override
public ChannelPipeline getPipeline() throws Exception 
{
    // Create a default pipeline implementation.
    ChannelPipeline pipeline = pipeline();

    // Add the text line codec combination first,
    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(2000000, Delimiters.lineDelimiter()));
    pipeline.addLast("decoder", new StringDecoder());
    pipeline.addLast("encoder", new StringEncoder());

    // and then business logic.
    pipeline.addLast("handler", new TextProtocolHandler());

    return pipeline;
}
}

我有一个DelimiterBaseFrameDecoder、一个字符串解码器和一个字符串编码器在管道中。

由于此设置,我的传入消息被拆分为多个字符串。这导致多次调用我的处理程序的“messageReceived”方法。这很好。但是,这需要我在内存中累积这些消息,并在收到消息的最后一个字符串包时重新构造消息。

我的问题是,什么是“积累字符串”然后“将它们重新构造成最终消息”的最具内存效率的方法。到目前为止,我有3个选择。它们是:

>

  • 使用StringBuilder累积和toString构造。(这给出了最差的内存性能。事实上,对于具有大量并发用户的大型有效负载,这给出了不可接受的性能)

    通过ByteArrayOutputStream累加到ByteArray中,然后使用字节数组构造(这比选项1的性能要好得多,但它仍然占用大量内存)

    累积到动态通道缓冲区中,并使用toString(字符集)构造。我还没有分析过这个设置,但我很好奇这与上面两个选项相比如何。是否有人使用动态通道缓冲区解决了此问题?

    我是Netty的新手,我可能在架构上做错了什么。您的意见将不胜感激。

    提前谢谢Sohil

    添加自定义帧解码器的实现供Norman查看

    public final class TextProtocolFrameDecoder extends FrameDecoder 
    {
    public static ChannelBuffer messageDelimiter() 
    {
          return ChannelBuffers.wrappedBuffer(new byte[] {'E','O','F'});
        }
    
    @Override
    protected Object decode(ChannelHandlerContext ctx, Channel channel,ChannelBuffer buffer) 
    throws Exception 
    {
        int eofIndex = find(buffer, messageDelimiter());
    
        if(eofIndex != -1)
        {
            ChannelBuffer frame = buffer.readBytes(buffer.readableBytes());
            return frame;
        }
    
        return null;
    }
    
    private static int find(ChannelBuffer haystack, ChannelBuffer needle) {
        for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i ++) {
            int haystackIndex = i;
            int needleIndex;
            for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex ++) {
                if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {
                    break;
                } else {
                    haystackIndex ++;
                    if (haystackIndex == haystack.writerIndex() &&
                        needleIndex != needle.capacity() - 1) {
                        return -1;
                    }
                }
            }
    
            if (needleIndex == needle.capacity()) {
                // Found the needle from the haystack!
                return i - haystack.readerIndex();
            }
        }
        return -1;
       }
      }
    
  • 共有1个答案

    督飞羽
    2023-03-14

    我认为如果您实现自己的帧解码器,您将获得最佳性能。这将允许您缓冲所有数据,直到您真正需要将其分派给链中的下一个处理程序为止。请参考框架文件

    如果您不想自己处理CRLF的检测,也可以保留DelimiterBasedFrameDecoder,只需在其后面添加一个自定义FrameDecoder来组装表示一行文本的ChannelBuffers。

    在这两种情况下,FrameDecoder都会尽可能地减少内存拷贝,尽量只“包装”缓冲区,而不是每次都复制它们。

    也就是说,如果你想有最好的性能去与第一种方法,如果你想它容易去与第二;)

     类似资料:
    • 我有一个复杂的endpoint,需要在spring boot中为REST服务实现。它看起来如下所示: 到此endpoint的POST将创建一个新的采购订单。段必须始终在段之前,但段可以独立存在。 采购订单

    • 我正在使用netty构建一个应用程序。在应用程序中,我需要处理传入和传出的消息。要求是应用程序将发送的任何消息都应由特定的处理程序处理,进入应用程序的任何消息都应由另一个特定的处理程序处理。但是,我希望在两个处理程序之间交换消息,以便能够跟踪发送的消息响应,因为请求消息也将发送到应用程序。 请任何想法hwo实施这样的要求。这个问题听起来可能不相关,但这就是我得到的,我还不是一个网络极客。我读到的关

    • 我有一个离散事件流进入我的系统,我需要根据每个事件的内容应用规则。另外,我想对这些流事件应用复杂的事件处理。 约束1.这些规则是用户提供的,并将动态更改。2.每当应用规则时,我不想重新启动我的系统。3.HA 4.只有成熟的开源解决方案 可能的方式...1.在Storm螺栓内运行Esper CEP 2。让口水流到Storm螺栓里 > 这会处理单事件规则和复杂事件吗?规则更改是否需要我的Storm重新

    • 问题内容: 我想在后台线程中运行一些Runnable。我想使用Handler,因为它便于延迟。我的意思是 凡 可运行 应当运行 后台 线程。是否可以创建这样的处理程序?是否在某个地方有“背景” Looper,或者该如何创建? PS我知道如何使用自定义类扩展Thread,但是比处理程序方式需要更多的编码工作。因此,请不要发布其他解决方案或类似的内容 如果Handler能以“干净”的方式做到这一点,我

    • 我想在后台线程中运行一些Runnable。我想使用Handler,因为它方便延迟。我的意思是 runnable应该在后台线程中运行。有可能创造这样的处理器吗?某个地方有没有“背景”Looper或者我怎么才能创建它? 附言:我知道如何使用自定义类扩展Thread来做到这一点,但它需要更多的编码工作,而不是以处理程序的方式进行。因此,请不要发布其他解决方案或类似内容 我只是想知道汉德勒是否能以“干净”

    • 在使用AsyncRabbitTemplate时,我在GZip/GUnzip消息处理方面遇到了问题。 使用同步模板设置时一切正常,如下所示: 但是,当我设置像这样的异步模板时: 回复消息没有正确解压缩,我得到了这个错误消息 我已尝试为AsyncRabbitTemplate提供一个配置好的DirectReplyToMessageListenerContainer,但没有帮助 这只会导致以下错误: [错