我正试图使一个流动的背压。我的想法是,在当前的一个项目完成处理之前,flowable的新项目不会被释放。我正在使用ResourceSubscriber和subscribeWidth()方法来实现这一点。
流的每个元素都在一个单独的线程池上异步处理。(这是我通过使用平面图/订阅实现的)
我希望每秒后的每个元素都将在被调用的订阅者的onNext方法之后发射。然而,当我试图运行此代码时,Flowable无法控制地发射元素。背压不起作用。
下面是重现问题的代码:
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.ResourceSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class RxTest2 {
private static final Logger log = LoggerFactory.getLogger(RxTest.class);
static AtomicInteger integer = new AtomicInteger();
public static void main(String[] args) {
Flowable.generate(emitter -> {
final int i1 = integer.incrementAndGet();
if (i1 >= 20) {
Thread.sleep(10000);
System.exit(0);
}
emitter.onNext(i1);
})
.doOnNext(i -> log.info("Published: " + i))
.flatMap(i -> Flowable.defer(() -> {
log.info("Starting consuming {}", i);
Thread.sleep(100);
log.info("Finished consuming {}", i);
return Flowable.just(i);
}).subscribeOn(Schedulers.computation()))
.doOnNext(i -> log.info("Consuming finished, result: " + i))
.subscribeWith(new BackpressureSubscriber(2));
}
}
class BackpressureSubscriber extends ResourceSubscriber<Object> {
private static final Logger log = LoggerFactory.getLogger(BackpressureSubscriber.class);
private final long initialRequest;
public BackpressureSubscriber(final long initialRequest) {
this.initialRequest = initialRequest;
}
@Override
protected void onStart() {
super.onStart();
log.info("Starting execution with {} initial requests", initialRequest);
request(initialRequest);
}
@Override
public void onNext(final Object message) {
log.info("On next for {}", message);
request(1);
}
@Override
public void onError(final Throwable throwable) {
log.error("Unhandled error: ", throwable);
}
@Override
public void onComplete() {
log.info("On Complete");
}
}
预期输出如下:
[main] INFO RxTest - Published: 1
[main] INFO RxTest - Published: 2
[RxComputationThreadPool-1] INFO RxTest - Starting consuming 1
[RxComputationThreadPool-1] INFO RxTest - Finished consuming 1
[RxComputationThreadPool-2] INFO RxTest - Starting consuming 2
[RxComputationThreadPool-1] INFO RxTest - On next for 1
[main] INFO RxTest - Published: 3
[RxComputationThreadPool-1] INFO RxTest - Finished consuming 2
实际产量:
11:30:32.166 [main] INFO BackpressureSubscriber - Starting execution with 2 initial requests
11:30:32.170 [main] INFO RxTest - Published: 1
11:30:32.189 [main] INFO RxTest - Published: 2
11:30:32.189 [RxComputationThreadPool-1] INFO RxTest - Starting consuming 1
11:30:32.189 [RxComputationThreadPool-2] INFO RxTest - Starting consuming 2
11:30:32.189 [main] INFO RxTest - Published: 3
11:30:32.190 [main] INFO RxTest - Published: 4
11:30:32.190 [RxComputationThreadPool-3] INFO RxTest - Starting consuming 3
11:30:32.190 [main] INFO RxTest - Published: 5
11:30:32.190 [RxComputationThreadPool-4] INFO RxTest - Starting consuming 4
11:30:32.190 [main] INFO RxTest - Published: 6
11:30:32.190 [RxComputationThreadPool-5] INFO RxTest - Starting consuming 5
11:30:32.190 [main] INFO RxTest - Published: 7
11:30:32.191 [RxComputationThreadPool-6] INFO RxTest - Starting consuming 6
11:30:32.191 [main] INFO RxTest - Published: 8
11:30:32.191 [RxComputationThreadPool-7] INFO RxTest - Starting consuming 7
11:30:32.191 [main] INFO RxTest - Published: 9
11:30:32.191 [RxComputationThreadPool-8] INFO RxTest - Starting consuming 8
11:30:32.191 [main] INFO RxTest - Published: 10
11:30:32.191 [RxComputationThreadPool-9] INFO RxTest - Starting consuming 9
11:30:32.191 [main] INFO RxTest - Published: 11
11:30:32.191 [RxComputationThreadPool-10] INFO RxTest - Starting consuming 10
11:30:32.192 [main] INFO RxTest - Published: 12
11:30:32.192 [RxComputationThreadPool-11] INFO RxTest - Starting consuming 11
11:30:32.192 [main] INFO RxTest - Published: 13
11:30:32.192 [main] INFO RxTest - Published: 14
11:30:32.192 [RxComputationThreadPool-12] INFO RxTest - Starting consuming 12
11:30:32.192 [main] INFO RxTest - Published: 15
11:30:32.192 [main] INFO RxTest - Published: 16
11:30:32.192 [main] INFO RxTest - Published: 17
11:30:32.192 [main] INFO RxTest - Published: 18
11:30:32.192 [main] INFO RxTest - Published: 19
11:30:32.294 [RxComputationThreadPool-2] INFO RxTest - Finished consuming 2
11:30:32.294 [RxComputationThreadPool-1] INFO RxTest - Finished consuming 1
11:30:32.294 [RxComputationThreadPool-1] INFO RxTest - Consuming finished, result: 1
11:30:32.294 [RxComputationThreadPool-1] INFO BackpressureSubscriber - On next for 1
在库版本上测试:
2.2.19 2.1.2
就我所了解的ReactiveX文档而言,我认为这是RX错误。不过,我可能错了,如果您能指出,我将不胜感激
flatMap
实际上成批地从上游请求,并将缓冲项目,直到下游html" target="_blank">请求它们。这个事实足以描述你所看到的行为。如果将bufferSize
设置为1,您可能会看到预期的行为。有一个重载允许您设置bufferSize
。
此外,flatMap
还有一个maxConcurrent
参数,如果您意识到flatMap
实际上是一个map
,那么merge
将应用于map
给出的流。merge
一次只能实际订阅有限数量的源,即maxConcurrent
。bufferSize
和maxConcurrent
的默认值为128。
请记住,当合并步骤收到来自下游的请求时,它不知道需要订阅多少流(请记住,我们在这里处理的是一个流)来完成请求!前10个流可能根本不返回任何值。如果第一个流没有返回任何内容,并且在1小时内没有完成,并且我们的maxConcurrent=1,那么在第一个小时内我们将不会收到任何事件,即使流2和流3已经准备好向我们发送内容。出于这些原因,我们必须为bufferSize
和maxConcurrent
选择通用默认值,通常选择的值会在某些基准情况下优化性能,并在许多边缘情况下最小化问题。
问题内容: 我正在使用selenium来抓取一些数据。 我单击的页面上有一个按钮,说“ custom_cols”。此按钮为我打开一个窗口,从中可以选择列。 此新窗口有时需要一些时间才能打开(大约5秒钟)。所以我已经使用了 延迟为20秒。但是有时它无法在新窗口中选择查找元素,即使该元素可见。在其余时间中,这种情况仅发生十次一次。 我在其他地方也使用了相同的功能(WebDriverWait),并且可以
问题内容: 经过测试后,我只能对已经解析过的JSON数据返回一个肯定值。 根据官方文件: isValidJSONObject返回一个布尔值,该布尔值指示是否可以将给定对象转换为JSON数据。 但是,尽管事实是我尝试将其从JSON转换为NSDictionary的对象都可以正常转换,但仍会返回。 这是我的代码: 我的日志包含以下内容: 然后是dict的输出,这是一个巨大的NSMutableDictio
问题内容: 考虑以下可以在任何程序执行之前预加载的库: 问题是,尽管总是调用全局变量的构造函数,但对于某些程序却不调用析构函数,例如: 对于其他一些程序,按预期方式调用析构函数: 您能解释一下为什么在第一种情况下不调用析构函数吗?编辑:上面的问题已得到解答,即程序可能会使用_exit(),abort()退出。 然而: 有没有办法在预加载的程序退出时强制调用给定函数? 问题答案: 具有作为其初始化代
我必须将日期-时间字符串转换为分区日期-时间对象。我使用DateTimeForman读取模式。根据留档,模式中的“Z”可以接受以下格式: /-0000 但是“分区约会”。parse(myDate,formatter)只适用于第一种情况;相反,在第二种情况下,代码生成一个异常。 我用的是8Java 我做错什么了?谢谢!
我编写了自己的AtomicDouble类,还有一个BankAccount类,它执行两个简单的取款和存款操作,它有一个AtomicDouble实例(余额)。我的代码的问题是,当我在deposit()中调用addAndGet方法时,程序会陷入一个无限循环,compareAndSet()永远不会返回真值,但当我调试它时,currentValue和atomic中的值。get()相等,但此方法无法理解。 有
问题内容: 我是python新手,我熟悉循环并尝试了一本书中的示例 但是输出如下 问题答案: 您必须改用(Python 2.x),因为它等效于,因此它会将输入解析并评估为有效的Python表达式。 注意: 不会捕获用户错误(例如,如果用户输入了一些无效的Python表达式)。可以这样做,因为它将输入转换为。有关更多信息,请阅读Python docs 。