简言之,有没有解决方案可以解决RxJava中的背压问题,而不需要删除项、序列化操作或无限缓冲?
把下面的任务当作一个有用的例子。
简单的方法是在单个后台线程上顺序执行所有任务,如:
observeBlocksOfFileContents(file).
.subscribeOn(backgroundScheduler)
.map(compressBlock)
.subscribe(transmitBlock);
虽然这样做没有问题,但从性能角度来看,这是次优的,因为运行时是所有三个操作的总和,而不是并行运行时的最大值:
observeBlocksOfFileContents(file).
.subscribeOn(diskScheduler)
.observeOn(cpuScheduler)
.map(compressBlock)
.observeOn(networkScheduler)
.subscribe(transmitBlock);
然而,如果数据从磁盘读取的速度快于压缩和传输的速度,这可能会由于背压而失败。通常的背压解决方案是不可取的,原因如下:
还有其他解决方案吗?或者这根本不符合ReactiveX可观察模型?
6) 实现observeBlocksOfFileContents,以支持背压。
文件系统已经是基于拉的(InputStream.read()发生在您想要的时候,而不是向您抛出),所以请考虑一个合理的块大小,并在每个请求中读取:
Observable.create(SyncOnSubscribe.createStateful(
() -> new FileInputStream("file.dat")
(in, out) -> {
byte[] buf = new byte[4096];
int r = in.read(buf);
if (r < 0) {
out.onCompleted();
} else {
if (r == buf.length) {
out.onNext(buf);
} else {
byte[] buf2 = new byte[r];
System.arraycopy(buf, 0, buf2, 0, r);
out.onNext(buf2);
}
}
},
in -> in.close()
));
(为简洁起见,请尝试省略。)
我使用RxJava观察点击几个按钮。 这些订阅将在一个对象上调用不同的函数,这需要几毫秒的时间。这些功能是同步的。 问题是,当按下太多按钮时,会出现背压异常。对我来说,有效的方法是删除几个输入(最好是旧的输入)。RxJava有可能做到这一点吗?
背压有问题。使用发布主题获取发射时的传感器事件,并需要在事务中订阅主题时将数据保存到数据库。 我一直在尝试使用。窗口(100)操作符,这样每当我连续收到100个传感器事件时,我就可以批量插入,但一次只能收到一个项目。订阅 不希望使用缓冲区运算符删除事件。正确的处理方法是什么?
我是RxJava的初学者,我对“背压”的含义很好奇。 这是否意味着生产者在消费者背后施加压力? 还是意味着消费者正在向生产者施加压力?(相反方向的压力)
最近开发了一个数据流消费者,它从PubSub订阅中读取数据,并将分组在同一窗口中的所有对象的组合输出到拼花文件。 当我在没有巨大负载的情况下进行测试时,一切似乎都很好。 然而,在执行了一些繁重的测试后,我可以看到,从发送到PubSub队列的1.000.000个事件中,只有1000个事件成功到达Parket! 根据不同阶段的多个墙时间,在应用窗口之前解析事件的墙时间似乎持续58分钟。写入拼花文件的最
我有一份flink的工作,从Kafka读取数据,执行某些聚合,并将结果写入elasticsearch索引。我看到震源上有很高的背压。高背压导致数据从Kafka缓慢读取,我看到数据在网络堆栈中排队(netstat RecvQ显示源Kafka连接中有上万字节的数据,数据最终被读取),这反过来会导致数据在延迟后沉入elasticsearch,并且延迟持续增加。 源每分钟产生约17500条记录,Flink
我读了一些RxJava中的背压文档,但我找不到详细的解释,比如它是如何在库中内部发生的,每个人都只是总结说“生产者”太快,“消费者”太慢。 例如,如下面的代码: 我已经看过了RxJava源代码,所以我的理解是,在主线程中,我们将每毫秒发出一次事件,一旦发出,我们就将值传递给系统。出来println(i)方法,并将其扔进newhead调度器的线程池,然后在可运行程序中运行该方法。 所以我的问题是,异