我继续玩Retor,现在我看到compose
运算符的行为完全类似于平面图
,我想知道是否有任何我不明白的区别。
@Test
public void compose() throws InterruptedException {
Scheduler mainThread = Schedulers.single();
Flux.just(("old element"))
.compose(element ->
Flux.just("new element in new thread")
.subscribeOn(mainThread)
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
.subscribe(System.out::println);
Thread.sleep(1000);
}
@Test
public void flatMapVsCompose() throws InterruptedException {
Scheduler mainThread = Schedulers.single();
Flux.just(("old element"))
.flatMap(element ->
Flux.just("new element in new thread")
.subscribeOn(mainThread)
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
.subscribe(System.out::println);
Thread.sleep(1000);
}
这两个示例的行为和返回的结果相同。
问候。
Dan Lew的精彩解释:
不同之处在于,compose()
是一个更高级别的抽象:它对整个流进行操作,而不是单独发出的项。更具体地说:
>
相反,如果在flatMap()
中放入subscribeOn()
/observeOn()
,它只会影响在flatMap()
中创建的可观察的
,而不会影响流的其余部分。
compose()
在创建Observable
流时立即执行,就好像您已经内联编写了运算符一样flatMap()
在每次调用其onNext()
时执行。换句话说,flatMap()
转换每个项目,而compose()
转换整个流。
flatMap()
的效率必然较低,因为每次调用onNext()
时,它都必须创建一个新的Observable
compose()
按原样对流进行操作。如果要用可重用代码替换某些运算符,请使用compose()
flatMap()
有很多用途,但这不是其中之一。
来自@Andrew的解释很好。只是想增加一个更好理解的例子。
Flux.just("1", "2")
.compose( stringFlux -> {
System.out.println("In compose"); // It takes whe whole Flux as input
return stringFlux.collectList();
}).subscribe(System.out::println);
Flux.just("1", "2").flatMap(s -> { //Input to the anonymous function is individual items in stream
System.out.println("In flatMap");
return Flux.just(Integer.parseInt(s));
}).subscribe(System.out::println);
这就产生了输出
In compose
[1, 2]
In flatMap
1
In flatMap
2
这表明compose
适用于整个流,但平面图
适用于流中的单个项目
我已经找到了很多关于RxJava的答案,但我想了解它在Reactor中是如何工作的。 我目前的理解是非常模糊的,我倾向于认为map是同步的,flatMap是异步的,但我不能真正理解它。 以下是一个例子: 我有文件(a
我试图将一个完整的链接到我的Rx链中,当我这样做时,链永远不会在onError或onComplete中完成。 当我单步执行代码时,我的可完成代码就会被执行。我甚至可以添加日志记录并看到它登录到它自己的doOn完成() 下面将记录“我已完成”,但不会进入错误或完成回调。 如果我改为使用flatMap并使用andThen返回布尔可观测值,它将起作用 我尝试在flatMapCompletable版本中添
通量上的平面图总是连续的吗?我知道在flatMap返回通量中使用then函数时不是顺序的。项目Reactor平面图 但是如果flatMap中使用的函数返回mono,它是否总是顺序的? 假设我有一个函数,它接受一个对象,只返回Mono。 然后呢 总是返回2,3,4,5?
我从留档中读到: 将该流量发出的元素异步转换为发布者,然后通过合并将这些内部发布者平坦化为单个流量,从而允许它们交错。 那个: 将此流量发出的元素异步转换为发布者,然后将这些内部发布者平坦化为单个流量,但按其源元素的顺序合并它们。 并且: 将此流量发出的元素异步转换为发布者,然后将这些内部发布者展平为单个流量,顺序并使用串联保持顺序。该运算符有三个维度可以与flatMap和flatMapSeque
因此,将此运算符放在链中的任何位置也会影响onnext/onerror/oncomplete信号的执行*上下文,从链的开始直到下一次出现{@link publishOn(Scheduler)publishOn} 这让我有点困惑,因为当处理链中没有指定任何时,线程名称的打印值为: 从线程single-scheduler-1中保存person-如预期 从线程线程-13查找人员 从线程线程-6查找人员
本文向大家介绍请你来说一下reactor模型组成 ?相关面试题,主要包含被问及请你来说一下reactor模型组成 ?时的应答技巧和注意事项,需要的朋友参考一下 reactor模型要求主线程只负责监听文件描述上是否有事件发生,有的话就立即将该事件通知工作线程,除此之外,主线程不做任何其他实质性的工作,读写数据、接受新的连接以及处理客户请求均在工作线程中完成。其模型组成如下: 1)Handle:即操作