当前位置: 首页 > 知识库问答 >
问题:

ReactiveX背压无法按预期工作

红鸿运
2023-03-14

我正试图使一个流动的背压。我的想法是,在当前的一个项目完成处理之前,flowable的新项目不会被释放。我正在使用ResourceSubscriber和subscribeWidth()方法来实现这一点。

流的每个元素都在一个单独的线程池上异步处理。(这是我通过使用平面图/订阅实现的)

我希望每秒后的每个元素都将在被调用的订阅者的onNext方法之后发射。然而,当我试图运行此代码时,Flowable无法控制地发射元素。背压不起作用。

下面是重现问题的代码:

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.ResourceSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;

public class RxTest2 {

    private static final Logger log = LoggerFactory.getLogger(RxTest.class);

    static AtomicInteger integer = new AtomicInteger();

    public static void main(String[] args) {
        Flowable.generate(emitter -> {
            final int i1 = integer.incrementAndGet();
            if (i1 >= 20) {
                Thread.sleep(10000);
                System.exit(0);
            }
            emitter.onNext(i1);
        })
                .doOnNext(i -> log.info("Published: " + i))
                .flatMap(i -> Flowable.defer(() -> {
                    log.info("Starting consuming {}", i);
                    Thread.sleep(100);
                    log.info("Finished consuming {}", i);
                    return Flowable.just(i);
                }).subscribeOn(Schedulers.computation()))
                .doOnNext(i -> log.info("Consuming finished, result: " + i))
                .subscribeWith(new BackpressureSubscriber(2));
    }
}

class BackpressureSubscriber extends ResourceSubscriber<Object> {

    private static final Logger log = LoggerFactory.getLogger(BackpressureSubscriber.class);

    private final long initialRequest;

    public BackpressureSubscriber(final long initialRequest) {
        this.initialRequest = initialRequest;
    }

    @Override
    protected void onStart() {
        super.onStart();
        log.info("Starting execution with {} initial requests", initialRequest);
        request(initialRequest);
    }

    @Override
    public void onNext(final Object message) {
        log.info("On next for {}", message);
        request(1);
    }

    @Override
    public void onError(final Throwable throwable) {
        log.error("Unhandled error: ", throwable);
    }

    @Override
    public void onComplete() {
        log.info("On Complete");
    }
}

预期输出如下:

[main] INFO RxTest - Published: 1
[main] INFO RxTest - Published: 2
[RxComputationThreadPool-1] INFO RxTest - Starting consuming 1
[RxComputationThreadPool-1] INFO RxTest - Finished consuming 1
[RxComputationThreadPool-2] INFO RxTest - Starting consuming 2
[RxComputationThreadPool-1] INFO RxTest -  On next for 1
[main] INFO RxTest - Published: 3
[RxComputationThreadPool-1] INFO RxTest - Finished consuming 2

实际产量:

11:30:32.166 [main] INFO BackpressureSubscriber - Starting execution with 2 initial requests
11:30:32.170 [main] INFO RxTest - Published: 1
11:30:32.189 [main] INFO RxTest - Published: 2
11:30:32.189 [RxComputationThreadPool-1] INFO RxTest - Starting consuming 1
11:30:32.189 [RxComputationThreadPool-2] INFO RxTest - Starting consuming 2
11:30:32.189 [main] INFO RxTest - Published: 3
11:30:32.190 [main] INFO RxTest - Published: 4
11:30:32.190 [RxComputationThreadPool-3] INFO RxTest - Starting consuming 3
11:30:32.190 [main] INFO RxTest - Published: 5
11:30:32.190 [RxComputationThreadPool-4] INFO RxTest - Starting consuming 4
11:30:32.190 [main] INFO RxTest - Published: 6
11:30:32.190 [RxComputationThreadPool-5] INFO RxTest - Starting consuming 5
11:30:32.190 [main] INFO RxTest - Published: 7
11:30:32.191 [RxComputationThreadPool-6] INFO RxTest - Starting consuming 6
11:30:32.191 [main] INFO RxTest - Published: 8
11:30:32.191 [RxComputationThreadPool-7] INFO RxTest - Starting consuming 7
11:30:32.191 [main] INFO RxTest - Published: 9
11:30:32.191 [RxComputationThreadPool-8] INFO RxTest - Starting consuming 8
11:30:32.191 [main] INFO RxTest - Published: 10
11:30:32.191 [RxComputationThreadPool-9] INFO RxTest - Starting consuming 9
11:30:32.191 [main] INFO RxTest - Published: 11
11:30:32.191 [RxComputationThreadPool-10] INFO RxTest - Starting consuming 10
11:30:32.192 [main] INFO RxTest - Published: 12
11:30:32.192 [RxComputationThreadPool-11] INFO RxTest - Starting consuming 11
11:30:32.192 [main] INFO RxTest - Published: 13
11:30:32.192 [main] INFO RxTest - Published: 14
11:30:32.192 [RxComputationThreadPool-12] INFO RxTest - Starting consuming 12
11:30:32.192 [main] INFO RxTest - Published: 15
11:30:32.192 [main] INFO RxTest - Published: 16
11:30:32.192 [main] INFO RxTest - Published: 17
11:30:32.192 [main] INFO RxTest - Published: 18
11:30:32.192 [main] INFO RxTest - Published: 19
11:30:32.294 [RxComputationThreadPool-2] INFO RxTest - Finished consuming 2
11:30:32.294 [RxComputationThreadPool-1] INFO RxTest - Finished consuming 1
11:30:32.294 [RxComputationThreadPool-1] INFO RxTest - Consuming finished, result: 1
11:30:32.294 [RxComputationThreadPool-1] INFO BackpressureSubscriber - On next for 1

在库版本上测试:

2.2.19 2.1.2

就我所了解的ReactiveX文档而言,我认为这是RX错误。不过,我可能错了,如果您能指出,我将不胜感激

共有1个答案

段晨
2023-03-14

flatMap实际上成批地从上游请求,并将缓冲项目,直到下游html" target="_blank">请求它们。这个事实足以描述你所看到的行为。如果将bufferSize设置为1,您可能会看到预期的行为。有一个重载允许您设置bufferSize

此外,flatMap还有一个maxConcurrent参数,如果您意识到flatMap实际上是一个map,那么merge将应用于map给出的流。merge一次只能实际订阅有限数量的源,即maxConcurrentbufferSizemaxConcurrent的默认值为128。

请记住,当合并步骤收到来自下游的请求时,它不知道需要订阅多少流(请记住,我们在这里处理的是一个流)来完成请求!前10个流可能根本不返回任何值。如果第一个流没有返回任何内容,并且在1小时内没有完成,并且我们的maxConcurrent=1,那么在第一个小时内我们将不会收到任何事件,即使流2和流3已经准备好向我们发送内容。出于这些原因,我们必须为bufferSizemaxConcurrent选择通用默认值,通常选择的值会在某些基准情况下优化性能,并在许多边缘情况下最小化问题。

 类似资料:
  • 问题内容: 我正在使用selenium来抓取一些数据。 我单击的页面上有一个按钮,说“ custom_cols”。此按钮为我打开一个窗口,从中可以选择列。 此新窗口有时需要一些时间才能打开(大约5秒钟)。所以我已经使用了 延迟为20秒。但是有时它无法在新窗口中选择查找元素,即使该元素可见。在其余时间中,这种情况仅发生十次一次。 我在其他地方也使用了相同的功能(WebDriverWait),并且可以

  • 问题内容: 经过测试后,我只能对已经解析过的JSON数据返回一个肯定值。 根据官方文件: isValidJSONObject返回一个布尔值,该布尔值指示是否可以将给定对象转换为JSON数据。 但是,尽管事实是我尝试将其从JSON转换为NSDictionary的对象都可以正常转换,但仍会返回。 这是我的代码: 我的日志包含以下内容: 然后是dict的输出,这是一个巨大的NSMutableDictio

  • 问题内容: 考虑以下可以在任何程序执行之前预加载的库: 问题是,尽管总是调用全局变量的构造函数,但对于某些程序却不调用析构函数,例如: 对于其他一些程序,按预期方式调用析构函数: 您能解释一下为什么在第一种情况下不调用析构函数吗?编辑:上面的问题已得到解答,即程序可能会使用_exit(),abort()退出。 然而: 有没有办法在预加载的程序退出时强制调用给定函数? 问题答案: 具有作为其初始化代

  • 我必须将日期-时间字符串转换为分区日期-时间对象。我使用DateTimeForman读取模式。根据留档,模式中的“Z”可以接受以下格式: /-0000 但是“分区约会”。parse(myDate,formatter)只适用于第一种情况;相反,在第二种情况下,代码生成一个异常。 我用的是8Java 我做错什么了?谢谢!

  • 我编写了自己的AtomicDouble类,还有一个BankAccount类,它执行两个简单的取款和存款操作,它有一个AtomicDouble实例(余额)。我的代码的问题是,当我在deposit()中调用addAndGet方法时,程序会陷入一个无限循环,compareAndSet()永远不会返回真值,但当我调试它时,currentValue和atomic中的值。get()相等,但此方法无法理解。 有

  • 问题内容: 我是python新手,我熟悉循环并尝试了一本书中的示例 但是输出如下 问题答案: 您必须改用(Python 2.x),因为它等效于,因此它会将输入解析并评估为有效的Python表达式。 注意: 不会捕获用户错误(例如,如果用户输入了一些无效的Python表达式)。可以这样做,因为它将输入转换为。有关更多信息,请阅读Python docs 。