当前位置: 首页 > 知识库问答 >
问题:

RxJava中嵌套的Single.FlatMap与Single.zip,是否相同?

赏阳嘉
2023-03-14
val s1 : Single<String> = service1.execute().subscribeOn(io())
val s2 : Single<Int> = service2.execute().subscribeOn(io())
val s3 : Single<Int> = service3.execute().subscribeOn(io())
val s4 : Single<String> = service4.execute().subscribeOn(io())
val ....
val s10 : Single<Int> = service10.execute().subscribeOn(io())

数据类MyObj(Field1:String、Field2:Int、Field3:Int、Field4:String......Field10:Int)

我有一个service10.execute(s1:String s2:Int s3:Int s4:String)

如果我这样做:

s1.flatMap { str -> 
    s2.flatMap { int1 ->
        s3.flatMap { int2 ->
            s4.flatMap { str2 ->
                ...
                s10.flatmap { int10
                  service10.execute(myObj(str, int1, int2, str2..., int10))
                }
            }
        }
    }
}
Single.zip(
            listOf(
                s1,
                s2,
                s3,
                s4
              ...,
              ...,
              s10
            )
        ) { array ->
            val str = array[0] as String
            val int1 = array[1] as Int
            val int2 = array[2] as Int
            val str2 = array[3] as String
            ...
            val str10 = array[9] as Int
        }

1)flatMap在那里是并行执行还是顺序执行?2)如果嵌套的flatMap是顺序的,有没有办法让它们像zip一样并行?

共有1个答案

司马作人
2023-03-14

不,嵌套的FlatMap不能使并行运行,下面的测试证明了这一点:

    // so we can be sure service1 and service2 are active
    val bothSubscribed = CountDownLatch(2)
    // so we can simulate a blocking, long running operation on both services
    val subscribeThreadsStillRunning = CountDownLatch(1)

    val service5 = { str: String, str2: String ->
        Observable.just("service5: $str, $str2").singleOrError()
    }

    val scheduler = Schedulers.io()

    val createSingle = { value: String ->
        Observable
            .create<String> { emitter ->
                println("subscribe $value on ${Thread.currentThread().name}")
                bothSubscribed.countDown()
                subscribeThreadsStillRunning.await(10, SECONDS)
                emitter.onNext(value)
            }
            .singleOrError()
            .subscribeOn(scheduler)
    }

    val s1 = createSingle("outer")
    val s4 = createSingle("inner")

    s1.flatMap { outer ->
        s4.flatMap { inner ->
            service5(outer, inner)
        }
    }.subscribe()

    assert(bothSubscribed.await(5, SECONDS))
    subscribeThreadsStillRunning.countDown()

原因可以通过记住lambda中的代码在lambda执行之前是不运行的来理解的(似乎是这样说的,但我花了一点时间才得到它)。S4.FlatMap触发订阅S4,但是在outer可用之前,即在S1已经发出并因此完成之前,此代码不会执行。

Zip似乎是这方面的完美解决方案,我不知道为什么要使用平面映射。我想不出做这件事的办法。它还有一个类型安全的API,因此您不必在示例中使用基于数组的API。

Singles
        .zip(s1, s4) { outer, inner -> service5(outer, inner) }
        .flatMap { it }
        .subscribe()
 类似资料:
  • 问题内容: MySQL是否允许使用嵌套事务? 问题答案: 支持。 您可以执行以下操作:

  • 我陷入了需要检查嵌套JSON对象中是否存在键的情况。通过嵌套JSON对象,我在父JSON对象中有一个JSON对象作为其键之一的值。所以我需要检查这个键是否存在于整个JSON对象中。我将以下数据作为对象获取。我知道我可以解析这个对象以获取JSON对象。 我使用了方法来检查主JSON对象中是否存在密钥,它可以正常工作。但是要检查任何内部JSON对象,比如“info”,我需要再次将解析为JSON对象,然

  • 假设我有一个返回列表的博客帖子api 从列表创建可观察 将每个可观察拆分为

  • 问题内容: 在我的Java应用程序中,我使用第三方库。 但是,我发现有些奇怪,有一些嵌套的程序包,有些类的名称可能与程序包的名称相同。 恐怕我不清楚。这是一个例子: 包 在“ com.xx.a”内部有一个名为“ a”的类。 因此,如果我想将此类称为“ a” … 我写: 然后,IDE将认为我的意思是软件包“ com.xx.a.a”。 那我就不能打电话了。 我想知道为什么? 顺便说一句,图书馆提供者似

  • 我是Elasticsearch的新手,我提出了一个问题,Elasticsearch嵌套查询是否只能为嵌套字段返回匹配的嵌套文档。 对于示例,我有一个名为的类型,其中嵌套字段名为 和嵌套查询 我需要的是搜索有提到足球的评论的博客文章,每个博客文章的评论数与足球相匹配(在例子中它数为1,因为另一个评论刚刚提到篮球)。 然而,Elasticsearch似乎总是返回完整的文档,所以我如何才能实现它,或者我

  • 我面临的问题是我需要一个身份验证令牌来创建我的Retrofit服务。我目前使用可观察来获取所述令牌,导致一个相当丑陋的可观察构造: 我忍不住觉得这不是应该做的。我说得对吗?