当前位置: 首页 > 知识库问答 >
问题:

在Spring reactor/webflux中,如何基于一个公共属性连接两个发布者,并从中构建一个发布者?

司允晨
2023-03-14

假设我有两个通量Flux

用例是基于公共属性“id”将两个通量连接起来,并构造一个通量

-对于属性id,两个通量之间始终存在1对1的匹配。

-通量包含的对象不会超过100个。

-焊剂不按id排序。

如何在项目Reactor/Spring网通量中实现这一点?


共有2个答案

巫马盛
2023-03-14

我认为这应该在以下限制下工作:

>

  • 第二个流量需要向所有订阅者发送相同的元素,因为它会被一次又一次地订阅
  • 这基本上相当于嵌套循环联接,对于大流量来说效率非常低
  • 第一个通量的每个元素在第二个通量中都有一个匹配元素

    flux1.flatMap(
        f1 -> flux2.filter(f2 -> f2.id.equals(f1.id)).take(1)) // take the first with matching id
              .map(f2 -> Tuple.of(f1,f2))) // convert to tuple.
    

    在没有IDE的情况下写入。考虑伪代码。

  • 施慈
    2023-03-14

    假设:

    • 这两个集合都不是很大(您可以将它们保存在内存中,而不会出现OOM问题)
    • 它们不按id排序
    • 集合中的每个元素在另一个元素中都有对应的元素

    首先,您应该使那些类1、类2实现可比较,或者至少准备一个比较器实现,您可以使用它按id对它们进行排序。

    然后,您可以使用zip运算符

    Flux<Class1> flux1 = ...
    Flux<Class2> flux2 = ...
    Flux<Tuple2<Class1,Class2>> zipped = Flux.zip(flux1.sort(comparator1), flux2.sort(comparator2));
    

    Tuple2是一个Reactor core类,允许您像这样访问Tuple的每个元素

    Tuple2<Class1,Class2> tuple = ...
    Class1 klass1 = tuple.getT1();
    Class2 klass2 = tuple.getT2();
    

    在这种情况下,排序将缓冲所有元素,如果集合很大,这可能会导致内存/延迟问题。根据这些集合中的排序方式(假设不保证排序,但这些是批量插入的),您还可以缓冲其中的一些(使用窗口)并对每个窗口进行排序(使用排序)。

    当然,在理想情况下,能够同时获取已排序的数据将避免缓冲数据,并将改善应用程序中的背压支持。

     类似资料:
    • 我正在尝试建立一个APK,我可以上传到播放商店。 (在确认了Android Studio的警告消息后,a.S.弹出了一个Generate Signed APK向导,我通过了这个向导,传递了我的密钥的详细信息。生成的APK被Play Store拒绝了,因为密钥的有效期太短了)。 我也试着打开Android Studio终端窗口并运行“gradle”,正如上面的消息所指示的,但是没有找到这个命令。顺便

    • In order to share your package with other developers around the world through Yarn, you’ll first need to publish it. When you publish a package with Yarn it goes onto the npm registry which is used to

    • 本文向大家介绍system.reactive 共享一个订阅(发布),包括了system.reactive 共享一个订阅(发布)的使用技巧和注意事项,需要的朋友参考一下 示例 给定一个IObservable<Offer>的offers从商家购买或以固定价格出售某些类型的项目,我们可以按照如下匹配对买家和卖家的: 问题在于,每个订阅trades将订阅offers两次。我们可以sellers和buyer

    • 假设我有如下两个通量: 现在我想把这些通量组合成一个通量或者两个通量的元组,在一个通量中包含两个通量元素。 我使用zipwith方法尝试了以下操作: 但是这给出了一个编译时错误: 我怎样才能做到这一点?请建议。

    • 我如何用Gradle在发布模式下构建测试APK?

    • 我正在尝试了解RxJava并发的一些细节,但我不确定我的想法是否正确。我对SubscribeOn/观察的工作原理有很好的了解,但我正在尝试确定池调度程序的一些细节。为此,我正在考虑尽可能简单地实现一个1-N生产者-消费者链,其中消费者的数量与CPU一样多。 根据文档,Schedulers.computation()由与内核一样多的线程池支持。但是,根据Reactive合约,运算符只能获得顺序调用。