当前位置: 首页 > 知识库问答 >
问题:

在RxJava中处理背压而不丢弃项目或序列化

金皓君
2023-03-14

简言之,有没有解决方案可以解决RxJava中的背压问题,而不需要删除项、序列化操作或无限缓冲?

把下面的任务当作一个有用的例子。

  1. 从磁盘读取数据到内存
  2. 压缩数据
  3. 通过网络传输压缩数据

简单的方法是在单个后台线程上顺序执行所有任务,如:

observeBlocksOfFileContents(file).
    .subscribeOn(backgroundScheduler)
    .map(compressBlock)
    .subscribe(transmitBlock);

虽然这样做没有问题,但从性能角度来看,这是次优的,因为运行时是所有三个操作的总和,而不是并行运行时的最大值:

observeBlocksOfFileContents(file).
    .subscribeOn(diskScheduler)
    .observeOn(cpuScheduler)
    .map(compressBlock)
    .observeOn(networkScheduler)
    .subscribe(transmitBlock);

然而,如果数据从磁盘读取的速度快于压缩和传输的速度,这可能会由于背压而失败。通常的背压解决方案是不可取的,原因如下:

  1. 丢弃项目:文件必须完整传输,没有丢失的片段
  2. 在单线程上串行:流水线的性能提升丢失
  3. 调用堆栈阻塞:RxJava不支持
  4. 增加观察缓冲区:内存消耗可能会变成文件大小的几倍
  5. 在不丢失Backpress的情况下重新实现观察异常:大量的工作和破坏流畅的API

还有其他解决方案吗?或者这根本不符合ReactiveX可观察模型?

共有1个答案

董高逸
2023-03-14

6) 实现observeBlocksOfFileContents,以支持背压。

文件系统已经是基于拉的(InputStream.read()发生在您想要的时候,而不是向您抛出),所以请考虑一个合理的块大小,并在每个请求中读取:

Observable.create(SyncOnSubscribe.createStateful(
    () -> new FileInputStream("file.dat")
    (in, out) -> {
        byte[] buf = new byte[4096];
        int r = in.read(buf);
        if (r < 0) {
            out.onCompleted();
        } else {
            if (r == buf.length) {
                out.onNext(buf);
            } else {
                byte[] buf2 = new byte[r];
                System.arraycopy(buf, 0, buf2, 0, r);
                out.onNext(buf2);
            }
        }

    }, 
    in -> in.close()
));

(为简洁起见,请尝试省略。)

 类似资料:
  • 我使用RxJava观察点击几个按钮。 这些订阅将在一个对象上调用不同的函数,这需要几毫秒的时间。这些功能是同步的。 问题是,当按下太多按钮时,会出现背压异常。对我来说,有效的方法是删除几个输入(最好是旧的输入)。RxJava有可能做到这一点吗?

  • 背压有问题。使用发布主题获取发射时的传感器事件,并需要在事务中订阅主题时将数据保存到数据库。 我一直在尝试使用。窗口(100)操作符,这样每当我连续收到100个传感器事件时,我就可以批量插入,但一次只能收到一个项目。订阅 不希望使用缓冲区运算符删除事件。正确的处理方法是什么?

  • 我是RxJava的初学者,我对“背压”的含义很好奇。 这是否意味着生产者在消费者背后施加压力? 还是意味着消费者正在向生产者施加压力?(相反方向的压力)

  • 最近开发了一个数据流消费者,它从PubSub订阅中读取数据,并将分组在同一窗口中的所有对象的组合输出到拼花文件。 当我在没有巨大负载的情况下进行测试时,一切似乎都很好。 然而,在执行了一些繁重的测试后,我可以看到,从发送到PubSub队列的1.000.000个事件中,只有1000个事件成功到达Parket! 根据不同阶段的多个墙时间,在应用窗口之前解析事件的墙时间似乎持续58分钟。写入拼花文件的最

  • 我有一份flink的工作,从Kafka读取数据,执行某些聚合,并将结果写入elasticsearch索引。我看到震源上有很高的背压。高背压导致数据从Kafka缓慢读取,我看到数据在网络堆栈中排队(netstat RecvQ显示源Kafka连接中有上万字节的数据,数据最终被读取),这反过来会导致数据在延迟后沉入elasticsearch,并且延迟持续增加。 源每分钟产生约17500条记录,Flink

  • 我读了一些RxJava中的背压文档,但我找不到详细的解释,比如它是如何在库中内部发生的,每个人都只是总结说“生产者”太快,“消费者”太慢。 例如,如下面的代码: 我已经看过了RxJava源代码,所以我的理解是,在主线程中,我们将每毫秒发出一次事件,一旦发出,我们就将值传递给系统。出来println(i)方法,并将其扔进newhead调度器的线程池,然后在可运行程序中运行该方法。 所以我的问题是,异