当前位置: 首页 > 知识库问答 >
问题:

RxJava-flatmap vs concatMap-为什么在订阅时订购相同的产品?

毛声
2023-03-14

根据这个线程,conCatMap和flatmap只在项目的发出顺序上有所不同。所以我做了一个测试,创建了一个简单的整数流,并想看看它们将以什么顺序发出。我做了一个小的可观测值,它可以接收1-5之间的数字,然后乘以2。容易的

这是带有平面图的代码:

myObservable.flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            return Observable.just(integer * 2);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
        Log.v("myapp","from flatMap:"+integer);
        }
    });

和使用concatMap的完全相同的代码:

myObservable.concatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            return Observable.just(integer * 2);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
        Log.v("myapp","from concatmap:"+integer);
        }
    });

当我在日志中看到打印出来的时候,两者的顺序都是一样的,为什么?我以为只有concatMap才能维持秩序?

共有1个答案

解高昂
2023-03-14

你所看到的只是巧合。每次您的平面图返回一个值时,它都会在与前一个相同的线程上执行。

我修改了您的示例以利用多线程:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .flatMap(integer -> Observable.just(integer)
                .observeOn(Schedulers.computation())
                .flatMap(i -> {
                    try {
                        Thread.sleep(new Random().nextInt(1000));
                        return Observable.just(2 * i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        return Observable.error(e);
                    }
                }))
        .subscribe(System.out::println,
                Throwable::printStackTrace,
                () -> System.out.println("onCompleted"));

我将每个值延迟一个随机延迟,以强制执行不同的顺序。此外,我在这之前添加了observeOn(Schedulers.computation()),这样下一个操作符flatMap)就会在计算线程池上运行——这就是多线程的魔力。

这是我的示例(在Android上)的输出:

I/System.out: 6
I/System.out: 4
I/System.out: 12
I/System.out: 14
I/System.out: 8
I/System.out: 2
I/System.out: 16
I/System.out: 20
I/System.out: 10
I/System.out: 18
I/System.out: onCompleted

如果将之后的平面图替换为concatMap,则会得到正确排序的输出。

托马斯·尼尔德(ThomasNield)有一篇很好的帖子,其中有一个恰当的解释。

 类似资料:
  • 我正在学习RxJava。我订阅了一个长时间运行的回调作为可观察回调,并希望看到执行onNext回调: 我希望打印输出“6”,但该方法在此之前完成。 如果我没有在IO调度程序上订阅,那么调用将被阻止,并且我确实看到打印的输出。 我的问题是,如果在IO调度程序上运行,为什么不执行回调。我知道test()方法已终止,但尚未释放对订阅回调的引用。

  • 我是新的数据流和发布子工具在GCP。 需要将prem过程中的电流迁移到GCP。 当前流程如下: 我们有两种类型的数据馈送 Full Feed–其adhoc作业–完整XML的大小约为100GB(单个XML–非常复杂的一个–完整的数据–ETL作业处理此XML并将其加载到约60个表中) 单独的ETL作业用于处理完整提要。ETL作业过程完全馈送并创建负载就绪文件,所有表将被截断并重新加载 源系统每30分钟

  • 例: 注意:是非Android 运行环境, 使用的是RxJava2.x

  • 在什么情况下Spring webflow会进行订阅?我在任何地方都读到必须有订阅,否则不会发生任何更改。在我使用Spring Webflow的短暂时间里,我从未在控制器或服务中见过。 我的疑问还在于,在使用flatMap时,。。。等订阅在什么时候进行? 我所读的并不能真正解决我的疑虑。 我知道这是一个异步问题,但每个flatMap都同时运行?。。。所以有时候我注意到一些数据是空的。

  • 1.使用者角度 为企业、组织或个人提供一种信息传播方式,用对口的内容达成企业、组织与成员之间的沟通和知识传播。体现在为用户提供内容服务,传达各类资讯,用户订阅后,可在轻推客户端定期接收到内容资讯的推送 2.开发者角度 主要通过会话的形式为用户提供服务,用户在协同界面点击订阅号图标后,可直接进入与订阅号的聊天界面,开发难度低,支持在后台定制菜单,通过菜单引导用户到不同的去处。通知消息会被折叠在订阅消

  • 考虑一个场景,我们有一个发出字符串的流,我们想将字符串保存在文件中。 我正在使用Publish科目,这很好: 但是,这不起作用(只有被交付) 有没有办法让第二个场景也能正常工作? i、 例如,我们是否可以更改PublishSubject以确保它缓冲事件,直到订阅者使用它们? 请注意,BehaviorSubject不是一个选项,因为重新订阅会导致另一个文件保存。它没有“消费事件”的概念。 我找到了U