当前位置: 首页 > 知识库问答 >
问题:

可观察与可流动 rxJava2

慎望
2023-03-14

我一直在看新的rx java 2,我不太确定我是否理解了< code >背压的概念...

我知道我们有没有背压支持的可观察和有背压支持的可流动

因此,基于示例,假设我有可流动间隔

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

这将在大约128个值之后崩溃,很明显,我的消费速度比获取物品要慢。

但是< code>Observable也是如此

     Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

这将不会崩溃,即使我把一些消费延迟,它仍然工作。为了使< code >可流动工作,假设我将< code>onBackpressureDrop操作符,崩溃消失了,但是也没有发出所有的值。

所以我目前在脑海中找不到答案的基本问题是,当我可以使用普通的可观察仍然可以在不管理缓冲区的情况下接收所有值时,为什么我要关心反向压力?或者从另一方面来说,反向压力给我带来了哪些有利于管理和处理消耗的优势?

共有3个答案

萧永望
2023-03-14

您的 Flowable 在没有背压处理的情况下在发出 128 个值后崩溃,这一事实并不意味着它总是会在 128 个值之后崩溃:有时它会在 10 个值后崩溃,有时它根本不会崩溃。我相信这就是你用Operaperable尝试这个例子时发生的事情 - 碰巧没有背压,所以你的代码正常工作,下次它可能不会。RxJava 2的不同之处在于,可观察s中不再有背压的概念,也没有办法处理它。如果您正在设计一个可能需要显式背压处理的反应序列 - 那么 Flowable 是您的最佳选择。

熊哲圣
2023-03-14

背压是指当你的可观察对象(发布者)创建的事件多于你的订阅者能够处理的事件时。因此,您可能会让订户错过事件,或者您可能会得到一个巨大的事件队列,最终导致html" target="_blank">内存不足。< code >可流动考虑了背压。< code>Observable不会。就是这样。

这让我想起一个漏斗,当它有太多的液体溢出。可流动有助于避免这种情况发生:

在巨大背压下:

但是使用可流动的,背压要小得多 :

Rxjava2有一些背压策略,您可以根据您的用例使用。通过策略,我的意思是Rxjava2提供了一种方法来处理由于溢出(背压)而无法处理的对象。

以下是策略。我不会一一介绍,但例如,如果你不想担心溢出的物品,你可以使用这样的放置策略:

可观察到的流量(背压策略.DROP)

据我所知,队列中应该有128项的限制,之后可能会有溢出(背压)。即使不是128,也接近这个数字。希望这能帮助到某人。

如果您需要更改128的缓冲区大小,看起来可以这样做(但是要注意内存限制:

myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.  

在软件开发中,背压策略通常意味着您告诉发射器放慢一点,因为消费者无法处理您的发射事件的速度。

逄念
2023-03-14

背压在实践中体现的是有界缓冲区,Flowable.observeOn有一个包含128个元素的缓冲区,它会尽可能快地耗尽。您可以单独增加此缓冲区大小以处理突发源,所有背压管理实践仍然适用于1. x。Observable.observeOn有一个无界缓冲区,它会不断收集元素,您的应用程序可能会运行内存溢出。

例如,您可以使用可观察性

  • 处理图形用户界面事件
  • 处理短序列(总共少于 1000 个元素)

例如,您可以使用可流动

  • 冷源和非定时源
  • 生成器如源
  • 网络和数据库访问器
 类似资料:
  • 问题内容: 我一直在寻找新的rx java 2,但我不确定我是否已经明白了这个主意… 我知道我们所拥有的并没有支持。 因此,基于例如,可以说我有有: 在大约128个值之后,这将崩溃,这很明显我消耗的速度比获取项目要慢。 但是,我们有相同的 即使我延迟使用它,它仍然完全不会崩溃。为了工作,可以说我放了一个运算符,崩溃已经消失了,但并不是所有值都被发出。 因此,我目前在脑海中找不到答案的基本问题是,为

  • 问题内容: 学习了Observables之后,我发现它们与Node.js流非常相似。两者都有一种机制,可在新数据到达,发生错误或没有更多数据(EOF)时通知使用者。 我很想了解两者之间的概念/功能差异。谢谢! 问题答案: 无论 观测量 和node.js中的 流 让你解决同样的根本问题:异步处理值的序列。我认为,两者之间的主要区别与激发其外观的环境有关。该上下文反映在术语和API中。 在 Obser

  • 问题内容: 我一直在阅读Observer模式,以保持UI处于最新状态,但仍然看不到它的用途。即使在我的特定对象中通知了我的MainActivity然后运行update();方法我仍然无法使用Pet对象来获取更新值,因为该对象是在Oncreate中创建的…而我只是无法创建新对象,因为那时变量会有所不同..这是我的实施,它似乎不起作用。 观察者/ MainActivity 可观察/宠物 问题答案: 首

  • 我正在尝试开发我的第一个RxJava例子 我有一个带有文本框和三个按钮的主要活动。第一个按钮初始化单独类中的整数。第二个按钮订阅一个可观察量,该可观察量假定正在观察整数。第三个按钮将整数的值减小 1。 这是我的密码 和班级 当我尝试使用 订阅时,它只是给了我 的值(即 6),然后它给了我完成! 然后我尝试使用,认为我需要使用,只是而不是,但后来我得到了一个返回的空的,然后再次完成! 有人能帮助我从

  • 导览 本小节主要介绍 Apache ShardingSphere 可观察性的相关功能 应用性能监控集成

  • 我已经做javascript有一段时间了。然而,我对Angular 2还比较陌生,所以我对可观测的东西几乎一无所知。当我读到对可观测事物的描述时,它们听起来很像我已经知道的回调。当我问谷歌时,有回调和promise之间的比较,也有promise和可观察之间的比较。然而,我找不到回调和可观察之间的任何比较。 回调和可观察之间有什么区别?