当前位置: 首页 > 知识库问答 >
问题:

订阅者如何通过反应性回拉压力控制发布者?

吉和同
2023-03-14

我有一个发布者,它的发布速度可能比订阅者处理数据的速度更快。为了解决这个问题,我开始背压。因为我不想丢弃任何数据,所以我使用反应性回拉压力。我的理解是,订阅者能够告诉发布者何时发布更多数据,如本文及以下段落所述。

出版者是一个可流动的系统,它以同步的方式并行工作,然后合并成一个连续的可流动系统。数据应该被缓冲最多10个元素,当缓冲区满时,Flowable不应该发布更多数据,并等待下一个请求。

订阅者是一个一次性订阅者,在开始时请求10项。每个消耗的项目都需要一些计算,然后将请求一个新项目。

我的MWE看起来像这样:

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 200; i++) {
    src.add(i);
}
Flowable.fromIterable(src)
        .parallel(10, 1)
        .runOn(Schedulers.from(Executors.newFixedThreadPool(10)))
        .flatMap(i -> Single.fromCallable(() -> {
            System.out.println("publisher: " + i);
            Thread.sleep(200);
            return i;
        }).toFlowable())
        .sequential(1)
        .onBackpressureBuffer(10)
        .observeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.newThread())
        .doOnError(Throwable::printStackTrace)
        .subscribeWith(new DisposableSubscriber<Integer>() {
            @Override
            protected void onStart() {
                request(10);
            }
            @Override
            public void onNext(Integer integer) {
                System.out.println("subscriber: " + integer);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                request(1);
            }
            @Override
            public void onError(Throwable t) {
            }
            @Override
            public void onComplete() {
            }
        });
try {
    Thread.sleep(1000000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

我希望这段代码能做到以下几点:订阅者请求前10项。出版商出版了前10个项目。然后订阅者在onNext中进行计算,并请求发布者发布的更多项目。

实际发生的情况:起初,发布者似乎无限制地发布项目。在某个时候,例如,在发布了14个项目后,订阅者处理了它的第一个项目。在这种情况下,发布者继续发布项目。在大约30个发布项目后,抛出一个io.reactivex.exceptions.MisSingBackpressureExctive: Buffer已满,流结束。

我的问题是:我做错了什么?我怎么能让订阅者控制发布者是否以及何时发布数据?显然,我做错了一些可怕的事情。否则,预期不会与现实有如此大的差异。

上述MWE的输出示例:

publisher: 5
publisher: 7
publisher: 8
publisher: 0
publisher: 2
publisher: 6
publisher: 9
publisher: 3
publisher: 4
publisher: 1
publisher: 18
publisher: 17
publisher: 15
subscriber: 0
publisher: 11
publisher: 10
publisher: 19
publisher: 13
publisher: 14
publisher: 12
publisher: 16
publisher: 27
publisher: 28
publisher: 23
publisher: 21
publisher: 29
publisher: 20
publisher: 25
publisher: 22
publisher: 26
io.reactivex.exceptions.MissingBackpressureException: Buffer is full

共有1个答案

司空劲
2023-03-14

我不是Rx方面的专家,但让我试试看<代码>观测(…)有自己的默认缓冲区大小128。所以,从一开始,它就会从上游请求比缓冲区能容纳的更多的数据。

observeOn(…)接受可选的缓冲区大小覆盖,但即使您提供了它,ParallelFlowable也将调用您的flatMap(…) 方法的频率比您想要的要高。我不是100%确定为什么,也许它有自己的内部缓冲,它在将rails合并回sequential时执行。

我认为你可以通过使用flatMap(…)更接近你想要的行为 而不是parralel(…) ,提供maxConcurrency参数。

要记住的另一件事是,您不想调用subscribeOn(…) -这意味着影响整个上游的可流动性。所以如果你已经在调用parallel(…)。runOn(…) ,它没有效果,否则效果将是意外的。

有了以上的武装,我认为这会让你更接近你所寻找的:

    List<Integer> src = new ArrayList<>();
    for (int i = 0; i < 200; i++) {
        src.add(i);
    }
    Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
    Flowable.fromIterable(src)
            .flatMap(
                    i -> Flowable.just( i )
                            .subscribeOn(scheduler) // here subscribeOn(...) affects just this nested Flowable
                            .map( __ -> {
                                System.out.println("publisher: " + i);
                                Thread.sleep(200);
                                return i;
                            } ),
                    10) // max concurrency
            .observeOn(Schedulers.newThread(), false, 10) // override buffer size
            .doOnError(Throwable::printStackTrace)
            .subscribeWith(new DisposableSubscriber<Integer>() {
                @Override
                protected void onStart() {
                    request(10);
                }
                @Override
                public void onNext(Integer integer) {
                    System.out.println("subscriber: " + integer);
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    request(1);
                }
                @Override
                public void onError(Throwable t) {
                }
                @Override
                public void onComplete() {
                }
            });
    try {
        Thread.sleep(1000000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

 类似资料:
  • 目前,我已经开始使用ActiveMQ处理JMS主题。我已经通过JAVA代码(如下所述)创建了发布者和持久订阅者,并且在订阅者端也收到了消息。 Publisher.Java 订阅者.java 我对以下主题有一些疑问, 如何检查有多少订阅者使用 Java JMS 在主题中主动查找消息? 如何从主题中获取活动和持久订阅者列表? 我们是否可以删除主题中发布的消息? 在这些情况下帮助我。 提前致谢。

  • 我最初有测试帐户,用于测试Android中的应用内购买,但后来我删除了测试帐户,并将应用从测试版转移到生产版。根据留档,两个测试账户都试图通过谷歌游戏应用程序取消他们的测试订阅。尽管谷歌Play应用程序表示订阅已经取消,但它仍然活跃。即使试图卸载应用程序。如何删除测试订阅? 我已经从Play Store中删除了beta测试apk并禁用了测试。我已经从授权页面以及授权beta测试人员列表中删除了所有

  • 我想用Java实现各种各样的发布者/订阅者模式,但目前已经没有主意了。 有1个发布者和N个订阅者,发布者发布对象,然后每个订阅者需要按照正确的顺序对每个对象进行一次且仅处理一次。发布者和每个订阅者在自己的线程中运行。 在我最初的实现中,每个订阅者都有自己的阻塞队列,发布者将对象放入每个订阅者的队列中。这可以正常工作,但如果任何订阅者的队列已满,发布者将被阻塞。这会导致性能下降,因为每个订阅者处理对

  • 简介 Redis 的列表类型键可以用来实现队列,并且支持阻塞式读取,所以 Redis 能够非常容易的实现一个高性能的优先队列。同时在更高层面上,Redis 还支持“发布/订阅”的消息模式,可以基于此构建一个聊天系统。 发布示例 发布(Publish)即将消息发布到频道中。示例代码: // 发送消息 Redis::publish('chan-1', 'Hello, World!'); // 发送消息

  • 我有一个问题非常困扰我。Redis发布/订阅功能的实际用途是什么?我只能想到通过TCP(本地或分布式)进行进程间通信,但其他的就不多了。 有人能证明我错了吗。

  • 在我们的代码中,回答服务间HTTP请求的典型句柄函数如下所示: 如果我没有理解错的话,这意味着每次调用时,都需要实例化整个管道(通常具有巨大的stacktraces)(并因此在以后收集)。 我的问题是:我能不能以某种方式“准备”一次整个流,以后再重用它? 进一步思考我可以做的是: 无论如何,我怀疑这不是的用意。