当前位置: 首页 > 知识库问答 >
问题:

Reactor组成与平面图

苏伟志
2023-03-14

我继续玩Retor,现在我看到compose运算符的行为完全类似于平面图,我想知道是否有任何我不明白的区别。

    @Test
public void compose() throws InterruptedException {
    Scheduler mainThread = Schedulers.single();
    Flux.just(("old element"))
            .compose(element ->
                    Flux.just("new element in new thread")
                            .subscribeOn(mainThread)
                            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
            .subscribe(System.out::println);
    Thread.sleep(1000);
}

@Test
public void flatMapVsCompose() throws InterruptedException {
    Scheduler mainThread = Schedulers.single();
    Flux.just(("old element"))
            .flatMap(element ->
                    Flux.just("new element in new thread")
                            .subscribeOn(mainThread)
                            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
            .subscribe(System.out::println);
    Thread.sleep(1000);
}

这两个示例的行为和返回的结果相同。

问候。

共有2个答案

南宫凯康
2023-03-14

Dan Lew的精彩解释:

不同之处在于,compose()是一个更高级别的抽象:它对整个流进行操作,而不是单独发出的项。更具体地说:

>

相反,如果在flatMap()中放入subscribeOn()/observeOn(),它只会影响在flatMap()中创建的可观察的,而不会影响流的其余部分。

compose()在创建Observable流时立即执行,就好像您已经内联编写了运算符一样flatMap()在每次调用其onNext()时执行。换句话说,flatMap()转换每个项目,而compose()转换整个流。

flatMap()的效率必然较低,因为每次调用onNext()时,它都必须创建一个新的Observablecompose()按原样对流进行操作。如果要用可重用代码替换某些运算符,请使用compose()flatMap()有很多用途,但这不是其中之一。

王君墨
2023-03-14

来自@Andrew的解释很好。只是想增加一个更好理解的例子。

Flux.just("1", "2")
        .compose( stringFlux -> {
            System.out.println("In compose"); // It takes whe whole Flux as input
           return stringFlux.collectList();
        }).subscribe(System.out::println);


Flux.just("1", "2").flatMap(s -> { //Input to the anonymous function is individual items in stream
            System.out.println("In flatMap");
            return Flux.just(Integer.parseInt(s));
        }).subscribe(System.out::println);

这就产生了输出

In compose
[1, 2]
In flatMap
1
In flatMap
2

这表明compose适用于整个流,但平面图适用于流中的单个项目

 类似资料:
  • 我已经找到了很多关于RxJava的答案,但我想了解它在Reactor中是如何工作的。 我目前的理解是非常模糊的,我倾向于认为map是同步的,flatMap是异步的,但我不能真正理解它。 以下是一个例子: 我有文件(a

  • 我试图将一个完整的链接到我的Rx链中,当我这样做时,链永远不会在onError或onComplete中完成。 当我单步执行代码时,我的可完成代码就会被执行。我甚至可以添加日志记录并看到它登录到它自己的doOn完成() 下面将记录“我已完成”,但不会进入错误或完成回调。 如果我改为使用flatMap并使用andThen返回布尔可观测值,它将起作用 我尝试在flatMapCompletable版本中添

  • 通量上的平面图总是连续的吗?我知道在flatMap返回通量中使用then函数时不是顺序的。项目Reactor平面图 但是如果flatMap中使用的函数返回mono,它是否总是顺序的? 假设我有一个函数,它接受一个对象,只返回Mono。 然后呢 总是返回2,3,4,5?

  • 我从留档中读到: 将该流量发出的元素异步转换为发布者,然后通过合并将这些内部发布者平坦化为单个流量,从而允许它们交错。 那个: 将此流量发出的元素异步转换为发布者,然后将这些内部发布者平坦化为单个流量,但按其源元素的顺序合并它们。 并且: 将此流量发出的元素异步转换为发布者,然后将这些内部发布者展平为单个流量,顺序并使用串联保持顺序。该运算符有三个维度可以与flatMap和flatMapSeque

  • 因此,将此运算符放在链中的任何位置也会影响onnext/onerror/oncomplete信号的执行*上下文,从链的开始直到下一次出现{@link publishOn(Scheduler)publishOn} 这让我有点困惑,因为当处理链中没有指定任何时,线程名称的打印值为: 从线程single-scheduler-1中保存person-如预期 从线程线程-13查找人员 从线程线程-6查找人员

  • 本文向大家介绍请你来说一下reactor模型组成 ?相关面试题,主要包含被问及请你来说一下reactor模型组成 ?时的应答技巧和注意事项,需要的朋友参考一下 reactor模型要求主线程只负责监听文件描述上是否有事件发生,有的话就立即将该事件通知工作线程,除此之外,主线程不做任何其他实质性的工作,读写数据、接受新的连接以及处理客户请求均在工作线程中完成。其模型组成如下: 1)Handle:即操作