背压是在Observable处理管道中时,某些异步阶段无法足够快地处理值,因此需要一种方法来告诉上游生产者放慢速度。
需要背压的经典情况是生产者是热源时:
PublishSubject<Integer> source = PublishSubject.create(); source .observeOn(Schedulers.computation()) .subscribe(v -> compute(v), Throwable::printStackTrace); for (int i = 0; i < 1_000_000; i++) { source.onNext(i); } Thread.sleep(10_000);
在此示例中,主线程将向在后台线程上对其进行处理的最终消费者生产100万件商品。该方法可能会compute(int)花费一些时间,但Observable运算符链的开销也可能会增加处理项目的时间。但是,带有for循环的生产线程不知道这一点,并且会继续执行onNext。
在内部,异步运算符具有缓冲区来保存此类元素,直到可以对其进行处理为止。在经典Rx.NET和早期的RxJava中,这些缓冲区是无界的,这意味着它们可能会容纳该示例中几乎所有的100万个元素。例如,当程序中有10亿个元素或同一100万个序列出现1000次时,问题就开始出现,OutOfMemoryError并通常由于过度的GC开销而导致速度变慢。
与错误处理成为一流公民并通过onErrorXXX运算符(通过运算符)来处理错误的方式类似,背压是程序员必须考虑和处理(通过onBackpressureXXX运算符)的数据流的另一个属性。
除PublishSubject上述之外,还有其他一些不支持背压的运算符,主要是由于功能原因。例如,运算符interval会定期发出值,将其反向加压会导致相对于挂钟的时间发生偏移。
在现代RxJava中,大多数异步运算符现在都具有一个有界的内部缓冲区,就像observeOn上面一样,任何尝试溢出该缓冲区的操作都将使用终止整个序列MissingBackpressureException。每个运算符的文档中都有关于其反压行为的描述。
但是,背压在常规冷序中会更微妙地出现(不会也不应屈服MissingBackpressureException)。如果第一个示例被重写:
Observable.range(1, 1_000_000) .observeOn(Schedulers.computation()) .subscribe(v -> compute(v), Throwable::printStackTrace); Thread.sleep(10_000);
没有错误,并且使用少量内存即可使所有操作顺利进行。这样做的原因是,许多源运算符可以按需“生成”值,因此运算符observeOn可以告诉range生成的observeOn缓冲区最多可以一次拥有这么多的值,而不会溢出。
该协商基于协同例程的计算机科学概念(我叫你,你叫我)。运算符通过调用其(inner 's)range以Producer接口实现的形式observeOn向其发送回调。作为回报,带有值的调用将告诉它允许产生(即,它)许多其他元素。然后,有责任在正确的时间以正确的值调用该方法,以保持数据畅通但不会溢出。SubscribersetProducerobserveOnProducer.request(n)rangeonNextobserveOnrequest
很少需要在最终用户中表达背压(因为它们相对于其直接上游是同步的,并且背压自然是由于调用堆栈阻塞而发生的),但可能更容易理解它的工作原理:
Observable.range(1, 1_000_000) .subscribe(new Subscriber<Integer>() { @Override public void onStart() { request(1); } public void onNext(Integer v) { compute(v); request(1); } @Override public void onError(Throwable ex) { ex.printStackTrace(); } @Override public void onCompleted() { System.out.println("Done!"); } });
在此,onStart实现指示range生成其第一个值,然后在中接收该值onNext。一旦compute(int)完成,将另一个值,然后从请求range。在的天真的实现中range,此类调用将递归调用onNext,StackOverflowError这当然是不可取的。
为避免这种情况,运营商使用所谓的蹦床逻辑来防止此类可重入的呼叫。用range的术语,它将记住在request(1)调用时有一个调用onNext(),一旦onNext()返回,它将进行onNext()下一回合并使用下一个整数值进行调用。因此,如果将两者交换,该示例仍然可以正常工作:
@Override public void onNext(Integer v) { request(1); compute(v); }
但是,这不适用于onStart。尽管Observable基础结构保证每次将最多Subscriber调用一次,但是对的调用request(1)可能会立即触发元素的发射。如果在request(1)需要调用之后具有初始化逻辑,则onNext可能会出现以下异常:
Observable.range(1, 1_000_000) .subscribe(new Subscriber<Integer>() { String name; @Override public void onStart() { request(1); name = "RangeExample"; } @Override public void onNext(Integer v) { compute(name.length + v); request(1); } // ...休息是一样的 });
在这种同步情况下,NullPointerException将在仍然执行时立即抛出a onStart。如果对的调用request(1)触发了onNext对其他线程的异步调用,并name在onNext竞赛中读取了将其写入onStart后的情况,则会发生更细微的错误request。
因此,应该在此onStart之前或之前进行所有字段初始化,然后调用request()last。request()在必要时,in运算符的实现可确保适当的事前发生关系(或换句话说,内存释放或完全隔离)。
本文向大家介绍rx-java 运营商介绍,包括了rx-java 运营商介绍的使用技巧和注意事项,需要的朋友参考一下 示例 可以使用运算符来操纵从Observable到的对象流Subscriber。 输出为: 在map操作者改变了Integer可观察到String可观察到的,由此操作对象的流动。 运算符链 多个运算符可以chained一起使用,以进行更强大的转换和操纵。 可以在Observable和
本文向大家介绍rx-java PublishSubject,包括了rx-java PublishSubject的使用技巧和注意事项,需要的朋友参考一下 示例 PublishSubject只向观察者发送那些在订阅时间之后由源Observable发出的对象。 一个简单的PublishSubject例子: 输出: 在上面的示例中,aPublishSubject订阅了一个Observable类似于时钟的,
rx 是一个可扩展的、现代的、极简主义的像素编辑器 ,在 rust 中实现。rx 是免费软件,在 GPLv3 下获得许可。 特性 内置精灵动画支持,带实时预览。 同时处理多个文件。 可扩展的命令系统。 可使用简单的基于文本的语言进行配置。 支持 HiDPI。 用户界面缩放。 撤消/重做任何编辑。 动画 GIF 输出。 多刷/同步编辑。 画笔过滤,又名“像素完美”模式。 像素操作的视觉模式。 安
本文向大家介绍rx-java 基础科目,包括了rx-java 基础科目的使用技巧和注意事项,需要的朋友参考一下 示例 SubjectRxJava中的A是既是Observable和又是的类Observer。这基本上意味着它可以充当Observable和将输入传递给订阅者,以及Observer从另一个Observable获取输入。 上面打印了“你好,世界!” 使用进行控制台Subjects。 说明 代
本文向大家介绍rx-java onBackpressureXXX运算子,包括了rx-java onBackpressureXXX运算子的使用技巧和注意事项,需要的朋友参考一下 示例 大多数开发人员在应用程序失败时会遇到背压,MissingBackpressureException并且异常通常指向observeOn运算符。实际原因通常是对的非背压使用PublishSubject,timer()或者i
本文向大家介绍rx-java 基本范例,包括了rx-java 基本范例的使用技巧和注意事项,需要的朋友参考一下 示例 调度程序是有关处理单元的RxJava抽象。调度程序可以由Executor服务支持,但是您可以实现自己的调度程序实现。 AScheduler应该满足此要求: 应该顺序处理未延迟的任务(FIFO顺序) 任务可以延迟 Scheduler可以在某些运算符(例如:)中将A用作参数delay,