当前位置: 首页 > 知识库问答 >
问题:

回收累积字节缓冲区发布器

章哲彦
2023-03-14

我们正在迁移到Spring WebFlux(带有Reactornetty)。应用程序使用HTTP协议和Spring控制器。目前,我们有一种过渡解决方案,它将入站IO缓冲区累积到合成字节buf中,而不进行复制(然后将其作为输入流进行处理)。Reactornetty为我们提供直接字节缓冲区。因此,为这些缓冲区调用release()是至关重要的。最初我们有代码:

public static Mono<CompositeByteBuf> collectToComposite(Publisher<DataBuffer> data) {
  return Flux.from(data).reduce(
      EMPTY,
      (CompositeByteBuf acc, DataBuffer buffer) -> {
        ByteBuf byteBuf = toByteBuf(buffer);
        CompositeByteBuf composite = (acc == EMPTY) ? byteBuf.alloc().compositeBuffer(256) : acc;
        composite.addComponent(true, byteBuf);
        return composite;
      }
  ).map(composite -> composite != EMPTY ? composite : createEmptyComposite());
}

在处理得到的复合缓冲液后进行释放。

但当上游发布器发出错误信号时,这种方法会导致泄漏。因此,在下一次尝试中,我们尝试用这样的方式处理错误并释放缓冲区(省略一些角落案例处理):

public static Mono<CompositeByteBuf> collectToComposite(Publisher<DataBuffer> data) {
  // such code is not suitable for multiple subscribers
  class CompositeHolder {
    CompositeByteBuf composite;

    void addComponent(ByteBuf component) {
      if (composite == null) {
        composite = component.alloc().compositeBuffer(256);
      }
      composite.addComponent(true, component);
    }
  }
  CompositeHolder holder = new CompositeHolder();
  return Flux.from(data)
      .doOnNext(buffer -> holder.addComponent(toByteBuf(buffer)))
      .doOnError(e -> holder.composite.release())
      .then(Mono.fromSupplier(() -> holder.composite));
}

但在那之后,我们意识到有必要在订阅取消时回收缓冲区(当底层连接关闭时发生)。首先想到的是使用doOn取消运算符,但实际上不能保证我们不能为相同的请求调用doOnErrordoOn取消回调。因此,简单的解决方案要求我们明确检查缓冲区之前是否已释放。

现在我被卡住了。我不知道如何处理这个案子,避免额外的复杂性。

共有1个答案

太叔涵亮
2023-03-14

doFinally是一个doOn操作符,每当源代码出错、完成或取消订阅时都会调用它。此外,它保证回调只执行一次(在错误取消的情况下)。

您提供消费者

 类似资料:
  • 问题内容: 有没有一种方法可以使用BufferedReader读取ByteBuffer而不必先将其转换为String?我想读取相当大的ByteBuffer作为文本行,并且出于性能方面的考虑,我想避免将其写入磁盘。在ByteBuffer上调用toString不起作用,因为生成的String太大(它抛出java.lang.OutOfMemoryError:Java堆空间)。我本来以为API中会有一些东

  • 问题内容: 从管道获取可能会返回StreamSpliterators.WrappingSpliterator的实例。例如,获取以下内容: 鉴于以上所述,当我们通过的方法(在本例中为StreamSpliterators.WrappingSpliterator的一个实例)单独遍历元素时,它将首先将项目累积到内部缓冲区中,然后再使用这些项目,如我们在StreamSpliterators.java中所看到

  • 有没有一种方法可以在请求处理程序中获得反应堆网络中使用的字节缓冲区分配器?类似于如何在纯Netty中执行?

  • 我目前正在处理一些小的endian二进制数据,我已经到了一个尴尬的地步,需要将奇数字节转换成整数值。 现在使用类,我能够很好地使用函数读取int和long,它们分别读取4和8个字节。 然而,在本例中,我需要读取三个字节,并将它们转换为int。我尝试过使用(2字节1字节),但我认为这不是正确的方法。 我猜我需要对字节进行位移位才能得到正确的int值,但我总是对位移位感到困惑。 此外,我还以为字节缓冲

  • 我正在尝试更新Android BluetoothChat示例的代码,以使用Protobuf进行更结构化的数据交换。我还需要byte[]数组字段来发送任意数据,如图像字节数组,但在尝试编译时。proto文件,我得到以下错误。 协议文件/蓝牙消息。proto:8:18:应为字段名。 下面是我的. proto文件。 stackoverflow上的其他几个帖子提到byte[]可以用作文件,下面的页面也说了

  • 动态添加对象的属性 Vue中,动态新增对象的属性时,不能直接添加。正确的做法是:Vue.set(obj,key,value)。参考链接:# 判断一个checkbox是否被选中 <!-- v-model里的内容是变量,变量里的值可能是 true 后者 false --> <input type="checkbox" v-model="isSelected"> <!-- 选中时,值为 true。未选