当前位置: 首页 > 编程笔记 >

rx-java onBackpressureXXX运算子

单于轶
2023-03-14
本文向大家介绍rx-java onBackpressureXXX运算子,包括了rx-java onBackpressureXXX运算子的使用技巧和注意事项,需要的朋友参考一下

示例

大多数开发人员在应用程序失败时会遇到背压,MissingBackpressureException并且异常通常指向observeOn运算符。实际原因通常是对的非背压使用PublishSubject,timer()或者interval()是通过创建的自定义运算符create()。

有几种处理此类情况的方法。

增加缓冲区大小

有时,此类溢出是由于突发来源而发生的。突然,用户点击屏幕的速度过快observeOn,Android的默认16元素内部缓冲区溢出。

现在,RxJava最新版本中的大多数对背压敏感的运算符都允许程序员指定其内部缓冲区的大小。相关参数通常被称为bufferSize,prefetch或capacityHint。给定介绍中的溢出示例,我们可以增加的缓冲区大小observeOn以为所有值留出足够的空间。

PublishSubject<Integer> source = PublishSubject.create();

source.observeOn(Schedulers.computation(), 1024 * 1024)
      .subscribe(e -> { }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

但是请注意,通常这可能只是临时解决方案,因为如果源过度生成预测的缓冲区大小,则仍然会发生溢出。在这种情况下,可以使用以下运算符之一。

使用标准运算符批量/跳过值

如果可以分批更有效地处理源数据,则可以MissingBackpressureException使用标准批处理运算符之一(按大小和/或时间)来减少源数据的可能性。

PublishSubject<Integer> source = PublishSubject.create();

source
      .buffer(1024)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(list -> { 
          list.parallelStream().map(e -> e * e).first();
      }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

如果某些值可以忽略,一个可以使用的采样(以时间或其他可观察)和节流运营商(throttleFirst,throttleLast,throttleWithTimeout)。

PublishSubject<Integer> source = PublishSubject.create();

source
      .sample(1, TimeUnit.MILLISECONDS)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

请注意,这些运算符仅会降低下游的价值接收率,因此它们可能仍会导致MissingBackpressureException。

onBackpressureBuffer()

这个无参数形式的运算符在上游源和下游运算符之间重新引入了一个无界缓冲区。不受限制意味着只要JVM不会耗尽内存,它就可以处理几乎所有来自突发性源的数据。

 Observable.range(1, 1_000_000)
           .onBackpressureBuffer()
           .observeOn(Schedulers.computation(), 8)
           .subscribe(e -> { }, Throwable::printStackTrace);

在这个例子中,observeOn去与一个非常低的缓冲区大小尚没有MissingBackpressureException因为onBackpressureBuffer吸收了全国各地的小它批100万个价值观和手observeOn。

但是请注意,onBackpressureBuffer它以无限制的方式消耗其源,即不对其施加任何背压。其结果是,甚至range将完全实现诸如背压支撑源。

还有4个额外的重载 onBackpressureBuffer

onBackpressureBuffer(int capacity)

这是一个有界版本,BufferOverflowError在其缓冲区达到给定容量的情况下发出信号。

Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

随着越来越多的运算符现在允许设置其缓冲区大小,该运算符的相关性正在降低。对于其余的部分,这提供了一个机会,即通过使用onBackpressureBuffer大于默认值的数字来“扩展其内部缓冲区” 。

onBackpressureBuffer(int容量,Action0 onOverflow)

如果发生溢出,此重载将调用(共享)操作。它的用处非常有限,因为除了当前调用堆栈以外,没有提供有关溢出的其他信息。

onBackpressureBuffer(int容量,Action0 onOverflow,BackpressureOverflow.Strategy策略)

这种过载实际上更有用,因为它让我们定义了在达到容量后该怎么做。该BackpressureOverflow.Strategy实际上是一个接口,但类BackpressureOverflow报价4个代表典型的行动是实现静态字段:

  • ON_OVERFLOW_ERROR:这是前两个重载的默认行为,表示 BufferOverflowException

  • ON_OVERFLOW_DEFAULT:目前与 ON_OVERFLOW_ERROR

  • ON_OVERFLOW_DROP_LATEST :如果发生溢出,则将仅忽略当前值,并且在下游请求后仅传递旧值。

  • ON_OVERFLOW_DROP_OLDEST :将最早的元素拖放到缓冲区中,然后将当前值添加到缓冲区中。

Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

请注意,最后两个策略在丢失元素时会导致流中的不连续。此外,它们不会发出信号BufferOverflowException。

onBackpressureDrop()

每当下游未准备好接收值时,此运算符将从序列中删除该elemenet。可以将其视为onBackpressureBuffer具有策略的0容量ON_OVERFLOW_DROP_LATEST。

当可以安全地忽略来源中的值(例如鼠标移动或当前GPS位置信号)时,此运算符非常有用,因为以后还会有更多最新值。

 component.mouseMoves()
 .onBackpressureDrop()
 .observeOn(Schedulers.computation(), 1)
 .subscribe(event -> compute(event.x, event.y));

与源运算符结合使用可能会很有用interval()。例如,如果要执行一些定期的后台任务,但每次迭代的持续时间可能长于周期,则可以放掉多余的间隔通知,因为稍后会有更多通知:

 Observable.interval(1, TimeUnit.MINUTES)
 .onBackpressureDrop()
 .observeOn(Schedulers.io())
 .doOnNext(e -> networkCall.doStuff())
 .subscribe(v -> { }, Throwable::printStackTrace);

此运算符存在一个重载:onBackpressureDrop(Action1<? super T> onDrop)  在调用(共享)操作的情况下,将删除值。此变体允许清除值本身(例如,释放关联的资源)。

onBackpressureLatest()

最终运算符仅保留最新值,并实际上覆盖较早的未交付值。可以将其视为onBackpressureBuffer容量为1且策略为的的变体ON_OVERFLOW_DROP_OLDEST。

onBackpressureDrop与之不同的是,如果下游碰巧落后,总会有可供消费的价值。在某些类似遥测的情况下,这可能很有用,在这种情况下,数据可能会以某种突发html" target="_blank">模式出现,但只有最新的数据才有意义。

例如,如果用户在屏幕上单击很多,我们仍然希望对其最新输入做出反应。

component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);

采用onBackpressureDrop在这种情况下会导致在最后点击被丢弃和离开用户不知道为什么没有执行业务逻辑的情况。

 类似资料:
  • 给定一个可观测的源S,我如何要求RxJava/Rx生成可观测的D,即: 毫无延迟地从S发出第一个项目 在发射每个项目之后和发射下一个项目L之前等待至少T秒,其中L是S在等待期间发射的最后一个项目 如果S在等待期间T(从点#2开始)没有产生任何项目,则在它在S中应用后立即发出下一个项目 大理石图: 我想使用: 示例运算符,但它不满足#3的要求 Debounce运算符,但它也不满足#3的要求 Thro

  • 本文向大家介绍rx-java 运营商介绍,包括了rx-java 运营商介绍的使用技巧和注意事项,需要的朋友参考一下 示例 可以使用运算符来操纵从Observable到的对象流Subscriber。 输出为: 在map操作者改变了Integer可观察到String可观察到的,由此操作对象的流动。 运算符链 多个运算符可以chained一起使用,以进行更强大的转换和操纵。 可以在Observable和

  • RX

    rx 是一个可扩展的、现代的、极简主义的像素编辑器 ,在 rust 中实现。rx 是免费软件,在 GPLv3 下获得许可。   特性 内置精灵动画支持,带实时预览。 同时处理多个文件。 可扩展的命令系统。 可使用简单的基于文本的语言进行配置。 支持 HiDPI。 用户界面缩放。 撤消/重做任何编辑。 动画 GIF 输出。 多刷/同步编辑。 画笔过滤,又名“像素完美”模式。 像素操作的视觉模式。 安

  • 这是可行的,但当我删除将源代码转换为BlockingObservable的时,程序执行并结束时没有输出。 我通常查看大理石图来正确理解事情:http://reactivex.io/documentation/operators/zip.html 在最后一句中,它说:它只会发射出与可观察源发射出的项数一样多的项,而可观察源发射出的项数最少。 这是否意味着Observable在不到1秒的时间内发出所有

  • RX 文件管理器是一款功能强大的 UWP 文件管理器,其 UI 广泛使用亚克力效果,拥有多彩的个性化系统。 功能 • 内置一些基础的文件查看器 • 内置 Zip、Tar、Gz 格式的压缩和解压功能 • 支持通过蓝牙或 WIFI 共享文件 • 支持多标签页和多进程模式 • 支持有限度替代 Windows Explorer • 支持网络驱动器、USB 设备和 MTP 设备 • 基础的音视频文件转码功能

  • RxAngular RxAngular offers a comprehensive toolset for handling fully reactive Angular applications with the main focus on runtimeperformance and template rendering. RxAngular is divided into differen