当前位置: 首页 > 知识库问答 >
问题:

PublishProcessor是如何实现的。offer()知道下游没有消耗之前的排放量吗?

梁丘扬
2023-03-14

我使用PublishProcessor。提供()从上游排放。我读到(*),如果订阅者没有准备好接收下一个事件,它可能会返回false,因为PublishProcessor不协调背压。我想知道它是如何工作的。PublishProcessor如何知道订阅者还没有准备好?

我了解到上游可能知道下游通过反应性拉动处理排放的潜力:

someObservable.subscribe(new Subscriber<t>() {
    @Override
    public void onStart() {
      request(1);
    }

    @Override
    public void onCompleted() {
      // gracefully handle sequence-complete
    }

    @Override
    public void onError(Throwable e) {
      // gracefully handle error
    }

    @Override
    public void onNext(t n) {
      // do something with the emitted item "n"
      // request another item:
      request(1);
    }
});

https://github.com/ReactiveX/RxJava/wiki/Backpressure

但我的订阅看起来是这样的:

publishProcessor.subscribe(new Consumer<T>() {
                @Override
                public void accept(T t) throws Exception {
                    // do IO
                }
            }, Log::submitCrash);

是否在内部调用请求(1);accept()完成后,还是不执行反应性拉取?我试着读代码,但似乎不是这样。该消费者被传递给LambdaSubscriber。还有兰博达。onNext()不调用请求(n)。

此订阅html" target="_blank">方法的javadoc:

 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) 

说明:操作符以无限制的方式使用源{@code Publisher}(即,没有对其施加反压力)。

所以它没有提到反应性拉力。以无限制的方式使用源是否意味着对该订阅应用了无限制的背压缓冲区,或者根本没有背压缓冲区?

PublishProcessor是否还有其他机制可以知道其订阅者是否已经完成了对已发出值的消耗?

(*)

PublishProcessor既是一个可流动的处理器,也是一个可流动的处理器,但它不协调不同订户之间以及上游源和订户之间的背压。如果通过onNext(对象)接收上游项目,如果订阅者尚未准备好接收项目,则该订阅者将通过MissingBackpressureException终止。为了避免这种情况,请使用offer(Object),如果返回false,请稍后重试。

http://reactivex.io/RxJava/javadoc/io/reactivex/processors/PublishProcessor.html

共有1个答案

范稳
2023-03-14

每个订户都会收到一个特殊的订阅,该订阅跟踪消费者发出的请求金额,如果PublishProcessor可以在订户上调用onNext,则会减少该金额。

如果跟踪的金额为零,并在报价中检查,则认为消费者尚未准备好。

如果您同步使用PublishProcessor,并从onNext中发出请求,跟踪金额将始终大于零,因此报价可以通过。

但是,如果您使用异步,例如,通过应用观察,现在处理器和用户之间有一个有界缓冲区,该缓冲区可能会填满,跟踪的请求量可能最终为零,从而阻止offer发出更多信号。

无限消费意味着消费者发出请求(Long.MAX_VALUE),这被解释为消费者准备好接收任意数量的商品PublishProcessor本身会这样做,以防您将其订阅给另一家Publisher

一般来说,unbounded并不意味着无限缓冲,因为特定消费者可能会在onNext调用线程上同步丢弃、批处理、采样或处理项目,因此不会发生溢出。

 类似资料:
  • 本文向大家介绍你知道 Kafka 是如何做到消息的有序性?相关面试题,主要包含被问及你知道 Kafka 是如何做到消息的有序性?时的应答技巧和注意事项,需要的朋友参考一下 kafka 中的每个 partition 中的消息在写入时都是有序的,而且单独一个 partition 只能由一个消费者去消费,可以在里面保证消息的顺序性。但是分区之间的消息是不保证有序的。

  • 有没有大神知道这样的分段器效果如何实现? 有没有大神知道这样的分段器效果如何实现? 点击左侧按钮时右侧边框变为45度曲线,点击右侧反之,这个咋整?

  • 需求参考 不需要第三方软件,直接点击客服就能进入页面,直接通讯 这是怎么实现的呢?

  • 我正在尝试使用Cooja模拟器模拟无线传感器网络。我想观察RPL协议在Contiki-OS中消耗的内存量。我使用sky-mote进行模拟,并创建了一个包含许多它们的网络。有没有什么方法可以观察RPL协议为保留邻居和路由表而消耗的内存量?

  • 我正在寻找代码(任何语言)的一个基本图形列表,可以通过拖放重新排序。所以这个功能确实是http://jqueryui.com/sortable/,但是直接写在帧缓冲区/Canvas上,而不需要任何框架(或者最多是低级别的“put pixel”库),可能不是在HTML/JS中(除非它是只包含CSS的Canvas)。 越简单越好,因为我将在汇编程序中使用它,如果不需要,我不想重新发明轮子。

  • 游戏进行外消耗道具 用法:传入需要消耗的道具列表已经对应的数量,则可以使用道具 消耗成功后,后台回吐消耗成功、失败的道具列表。如果消耗成功,则顺带返回一个流水号seq,用于标识此次消耗,此序列号可以用来进行回滚操作 var itemlist = [ { "id":1, //道具id "num":1, //数量 },