当前位置: 首页 > 知识库问答 >
问题:

在netty 4.1中正确释放引用计数的ByteBuf对象

施飞雨
2023-03-14

因此,我们目前正在将基于MQTT的消息后端中的Netty3.x升级到Netty4.1。在我们的应用程序中,我们使用自定义MQTT消息解码器和编码器。

对于我们的解码器,我目前使用的是一个ByteToMessageDecoder,如下所示:

public class MqttMessageDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 2) {
            return;
        }

        .....
        .....
        .....

        byte[] data = new byte[msglength];
        in.resetReaderIndex();
        in.readBytes(data);
        MessageInputStream mis = new MessageInputStream(
                new ByteArrayInputStream(data));
        Message msg = mis.readMessage();
        out.add(msg);
        ReferenceCountUtil.release(in);
    }
}

其中消息是我们的自定义对象,它被传递到下一个ChannelHandlerChannelRead()。正如您所看到的,当我从传入的Bytebuf对象中创建Message对象时,就完成了中的传入的Bytebuf对象。那么,既然Bytebuf在netty中是引用计数的,那么我需要在这里通过调用referenceCountutil.release(in)来释放in对象是正确的吗?理想情况下,根据医生的说法,这似乎是正确的。然而,当我这样做的时候,我似乎面临着例外:

Wed May 24 io.netty.channel.DefaultChannelPipeline:? WARN netty-workers-7 An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.channel.ChannelPipelineException: com.bsb.hike.mqtt.MqttMessageDecoder.handlerRemoved() has thrown an exception.
    at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:631) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.channel.DefaultChannelPipeline.destroyDown(DefaultChannelPipeline.java:867) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.channel.DefaultChannelPipeline.access$300(DefaultChannelPipeline.java:45) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.channel.DefaultChannelPipeline$9.run(DefaultChannelPipeline.java:874) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:339) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:374) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_72-internal]
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:111) ~[netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.handlerRemoved(ByteToMessageDecoder.java:217) ~[netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:626) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    ... 7 common frames omitted

这告诉我,当子通道关闭时,管道中的所有处理程序都相继移除。当这个解码器处理程序关闭时,我们显式地释放附加到这个解码器的Bytebuf,当调用下面的方法时,它会导致IllegalReferenceCountException异常。

这是AbstractReferenceCountedBytebuf#版本:

@Override
    public boolean release() {
        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt == 0) {
                throw new IllegalReferenceCountException(0, -1);
            }

            if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
                if (refCnt == 1) {
                    deallocate();
                    return true;
                }
                return false;
            }
        }
    }

那么,释放bytebuf对象的正确方法是什么,以避免遇到此问题?

new ServerBootstrap().childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

作为Ferrybig答案的附加项,ByTetomessageDecoder#ChannelRead自行处理传入的Bytebuf的释放。请参见finally块-

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

如果入站的bytebuf被传输到管道中的下一个通道处理程序,则此bytebuf的引用计数通过bytebuf#retain而增加,因此,如果解码器之后的下一个处理程序是业务处理程序(通常是这样),则需要释放该bytebuf对象,以避免任何内存泄漏。这里的文档中也提到了这一点。

共有1个答案

晏和风
2023-03-14

并不是所有的处理程序都要求销毁bytebuf中传递的内容。ByTetomessageDecoder就是其中之一。

这是因为该处理程序收集多个传入字节,并将它们作为1个连续的字节流公开给您的应用程序,以便于编码,并且不需要自己处理这些块

请记住,您仍然需要通过使用readbytesreadslice手动释放您创建的任何bytebufs,正如Javadoc所述。

 类似资料:
  • 问题内容: 我有一个eclipse插件,它使用Jacob连接到COM组件。但是,在我完全关闭插件后,.exe文件仍然挂在Windows进程中。 我用于初始化,并确保在关闭应用程序之前为我创建的每个COM对象都调用了该对象,并在最后调用了该对象。 我是否遗漏了什么? 问题答案: TD2JIRA转换器也有同样的问题。最终必须修补Jacob文件之一才能释放对象。之后,一切顺利。 我的客户端logout(

  • 我有自定义的ExecutorService,其中包含一个可用于中断提交给ExecutorSerice的任务,如果他们花了太长时间,我把完备类放在这篇文章的末尾。 这工作正常,除了有时中断本身会引起问题,所以我把一个不稳定的布尔取消标志添加到一个新的CanceableWork类中,并把它们子类为this,这样它们就可以检查并停止自己,如果它们的布尔值已经被删除了发送到真实。请注意,它们是提交给执行器

  • 晚上好!我是一名律师,我经常要计算被判刑的人多久才能获得福利,比如假释。 它的工作原理如下: 首先,我需要得到一些主要变量,比如那个人开始服刑的那一天(他被捕的那一天)。这将是第一项福利的基准日期。假设有人在2014年11月12日被捕。 我必须做的第二件事是知道每项罪行的判决是什么(有时这个人被判犯有不止一项罪行,对于每项罪行,都有不同的计算方法。假设这个人被判犯有两项罪行: 对于第一项罪行(这是

  • 问题内容: 如果我这样做: 在那个func里面引用foo是错误的吗? 问题答案: 很好,只有在上下文变化时才需要注意(在局部指针变量的情况下): 还请参见:https : //www.goinggo.net/2014/06/pitfalls-with-closures-in- go.html

  • 我和Netty有一个http服务。对于一组请求,在http正文中只有“{}”的相同答复。我有一个想法,可以避免为每个这样的请求创建新的缓冲区,所以我使用了: 在我的SimpleChannelInboundHandler中。它只适用于第一个查询,之后我开始有 所以看起来缓冲区在第一次回复后会自动释放。有这样的缓冲的正确方法是什么?

  • 问题内容: 考虑简单的Django模型和: 使用参与者总数来注释事件查询很容易: 如何用筛选的参与者计数进行注释? 我需要查询所有事件,而与参与者人数无关,例如,我不需要按带注释的结果进行过滤。如果有参与者,那没关系,我只需要带有0注释的值即可。 文档中的示例在这里不起作用,因为它从查询中排除了对象,而不是使用注释了对象0。 更新。Django 1.8具有新的条件表达式功能,因此我们现在可以像这样