我试图创建一个可流动的
,它会发出关于反压力的事件,以避免内存问题,同时并行运行转换的每个阶段以提高效率。我创建了一个简单的测试程序,来解释我的程序的不同步骤的行为,以及何时发出事件,何时等待不同的阶段。
我的程序如下:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList())
.stream().map(i -> {
System.out.println("emitting:" + i);
return i;
});
Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator());
System.out.println(String.format("Buffer size: %d", flowable.bufferSize()));
Long count = flowable.onBackpressureBuffer(10)
.buffer(10)
.flatMap(buf -> {
System.out.println("Sleeping 500 for batch");
Thread.sleep(500);
System.out.println("Got batch of events");
return Flowable.fromIterable(buf);
}, 1)
.map(x -> x + 1)
.doOnNext(i -> {
System.out.println(String.format("Sleeping : %d", i));
Thread.sleep(100);
System.out.println(i);
})
.count()
.blockingGet();
System.out.println("count: " + count);
}
当我运行这个程序时,我得到了与预期背压相关的输出,其中一批事件被发送到缓冲区中的大小,然后它们被平面映射,最后采取一些操作,一个接一个地打印它们:
Buffer size: 128
emitting:0
emitting:1
emitting:2
emitting:3
emitting:4
emitting:5
emitting:6
emitting:7
emitting:8
emitting:9
Sleeping 500 for batch
Got batch of events
Sleeping : 1
1
Sleeping : 2
2
Sleeping : 3
3
Sleeping : 4
4
Sleeping : 5
5
Sleeping : 6
6
Sleeping : 7
7
Sleeping : 8
8
Sleeping : 9
9
Sleeping : 10
10
emitting:10
emitting:11
emitting:12
emitting:13
emitting:14
emitting:15
emitting:16
emitting:17
emitting:18
emitting:19
Sleeping 500 for batch
Got batch of events
Sleeping : 11
11
Sleeping : 12
12
Sleeping : 13
但是,如果我试图通过向
添加一些调用来并行化不同的操作阶段。observeOn(Schedulers.computation())
然后我的程序似乎不再考虑背压。我的代码现在看起来像:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList())
.stream().map(i -> {
System.out.println("emitting:" + i);
return i;
});
Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator());
System.out.println(String.format("Buffer size: %d", flowable.bufferSize()));
Long count = flowable.onBackpressureBuffer(10)
.buffer(10)
.observeOn(Schedulers.computation())
.flatMap(buf -> {
System.out.println("Sleeping 500 for batch");
Thread.sleep(500);
System.out.println("Got batch of events");
return Flowable.fromIterable(buf);
}, 1)
.map(x -> x + 1)
.observeOn(Schedulers.computation())
.doOnNext(i -> {
System.out.println(String.format("Sleeping : %d", i));
Thread.sleep(100);
System.out.println(i);
})
.observeOn(Schedulers.computation())
.count()
.blockingGet();
System.out.println("count: " + count);
}
我的输出如下,我所有的事件都是预先发出的,而不是按照执行的各个阶段指定的背压和缓冲:
Buffer size: 128
emitting:0
emitting:1
emitting:2
emitting:3
emitting:4
emitting:5
emitting:6
emitting:7
emitting:8
emitting:9
emitting:10
Sleeping 500 for batch
emitting:11
emitting:12
... everything else is emitted here ...
emitting:998
emitting:999
Got batch of events
Sleeping 500 for batch
Sleeping : 1
1
Sleeping : 2
2
Sleeping : 3
3
Sleeping : 4
4
Sleeping : 5
Got batch of events
Sleeping 500 for batch
5
Sleeping : 6
6
Sleeping : 7
7
Sleeping : 8
8
Sleeping : 9
9
Sleeping : 10
Got batch of events
Sleeping 500 for batch
10
Sleeping : 11
11
Sleeping : 12
12
Sleeping : 13
13
Sleeping : 14
14
Sleeping : 15
Got batch of events
Sleeping 500 for batch
15
Sleeping : 16
16
Sleeping : 17
17
Sleeping : 18
18
Sleeping : 19
19
Sleeping : 20
Got batch of events
Sleeping 500 for batch
20
Sleeping : 21
21
Sleeping : 22
22
Sleeping : 23
23
Sleeping : 24
24
Sleeping : 25
Got batch of events
Sleeping 500 for batch
25
假设我的批处理阶段正在调用外部服务,但是由于延迟,我希望它们并行运行。我还希望在给定的时间控制内存中的项目数量,因为最初发出的项目数量可能非常可变,并且批处理阶段的运行速度比事件的初始发射慢得多。
我怎么能让我的
Flowable
在Scheduler
上尊重背压呢?为什么当我在调用观察
时,它似乎只不尊重背压?
我怎么能在调度程序上有我的流动尊重背压
实际上,应用onBackpressureBuffer
会使上面的源代码与下游应用的任何反压力断开连接,因为它是一个无界的运算符。您不需要它,因为Flowable.fromIterable
(顺便说一句,RxJava有一个范围
运算符)支持并尊重反压力。
为什么它似乎只有不尊重背压时,我洒在呼吁观察?
在第一个例子中,出现了一种自然的背压,称为调用堆栈阻塞。RxJava在默认情况下是同步的,大多数操作符不引入异步,就像第一个示例中没有引入异步一样。
观察
引入了异步边界,因此理论上,阶段可以并行运行。它有一个默认的128元素预取缓冲区,可以通过它的重载之一进行调整。然而,在您的情况下,缓冲区(10)实际上会将预取量放大到1280,这仍然可能导致您的1000元素长源代码一次性消耗完毕。
我不知道为什么背压在这个流动链中没有得到尊重。 我将其简化为以下三个阶段:,和。前两个阶段的速度比最后一个阶段快得多,因此它们的输出会让内存等待第3步,从而导致内存不足错误/内存使用量不断增加。 当我观看调试器时,我注意到步骤1比步骤3更频繁地发生,尽管我已经将缓冲区大小(observeOn的第三个参数)设置为一个较低的数字(例如1)。 我已经阅读了我正在使用的每个运算符的留档,看起来他们都有一些
我有一个将launchMode设置为singleTask的活动: 我有一个持续的通知,其中包含启动该活动的PendingIntent: 当我与现有的MyActivity交互时,我点击Home并通过启动器重新启动MyActivity,MyActivity的按预期调用。 问题是,当我与现有的MyActivity交互时,我点击正在进行的通知,通过创建一个新的MyActivity,通过销毁现有的MyAct
在vertx web backpressure示例中,假设我作为标准verticle启动服务器verticle,那么observeOn(RxHelper.scheduler(vertx.getDelegate())会做什么。 我在一台8核机器上部署了8个事件循环线程和8个Server verticle实例,我没有在路由的处理程序中阻止IO调用
我正在创建一个约会网站来学习网络开发,我不知道如何制作背景色,它只是在特定的div上不起作用。我应该改变什么?我尝试了很多东西,但什么都没有发生。 添加lorem ipsum所以我可以问这个问题...跳过这个 Lorem ipsum dolor sit amet,concetetur adipiscing elit。Morbi ut felis magna。聪明的人,聪明的人,聪明的人,聪明的人。
我无法理解为什么以下操作不起作用? 我一直收到错误: 我尝试使用通用参数: 我好像想不出来。 我完全错过了什么? 我正在使用OpenJDK 11。
从TensorArray读取: 使用: 问题: 回溯(最近一次调用last):RLU培训中第130行的文件“\main.py”。train()文件“C:\Users\user\Documents\Projects\rl toolkit\rl_training.py”,第129行,在train self中_rpm,赛尔夫。批量大小,自行确定。梯度步数,记录步数b=self。在call result=