当前位置: 首页 > 知识库问答 >
问题:

从Mono列表中创建通量的正确方法

晏华奥
2023-03-14

假设我有一个使用CustomObject列表的API操作。对于这些对象中的每一个,它都会调用一个创建Mono的服务方法。如何以一种惯用的无阻塞方式从这些单一对象创建流量?

我现在想到的是这个。我更改了方法名称,以更好地反映它们的预期目的。

fun myApiMethod(@RequestBody customObjs: List<CustomObject>): Flux<CustomObject> {

    return Flux.create { sink ->
        customObjs.forEach {

            service.persistAndReturnMonoOfCustomObject(it).map {
                sink.next(it)
            }
        }
        sink.complete()
    }
}

此外,我需要订阅通量才能真正让它返回一些东西吗?

共有1个答案

严欣怡
2023-03-14

我相信您可以使用concat()来代替:

/**
 * Concatenate all sources provided as a vararg, forwarding elements emitted by the
 * sources downstream.
 * <p>
 * Concatenation is achieved by sequentially subscribing to the first source then
 * waiting for it to complete before subscribing to the next, and so on until the
 * last source completes. Any error interrupts the sequence immediately and is
 * forwarded downstream.
 * <p>
 * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/concat.png" alt="">
 * <p>
 * @param sources The {@link Publisher} of {@link Publisher} to concat
 * @param <T> The type of values in both source and output sequences
 *
 * @return a new {@link Flux} concatenating all source sequences
 */
@SafeVarargs
public static <T> Flux<T> concat(Publisher<? extends T>... sources) {

merge()

/**
 * Merge data from {@link Publisher} sequences contained in an array / vararg
 * into an interleaved merged sequence. Unlike {@link #concat(Publisher) concat},
 * sources are subscribed to eagerly.
 * <p>
 * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/merge.png" alt="">
 * <p>
 * Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
 * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
 * in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
 * another source.
 *
 * @param sources the array of {@link Publisher} sources to merge
 * @param <I> The source type of the data sequence
 *
 * @return a merged {@link Flux}
 */
@SafeVarargs
public static <I> Flux<I> merge(Publisher<? extends I>... sources) {
 类似资料:
  • 如何正确处理由期货构建的Monos? 我试着让我的头脑围绕着Spring Reactive(和Spring 5),观看所有的视频,阅读所有我能找到的博客,但他们似乎都没有做一些事情,而不仅仅是查询数据库或其他琐碎的事情。 我正在使用新的AWS 2.0开发工具包,它使用的用于大多数事情。使用服务创建新实例,我的方法如下所示 我在这里的理解是,我几乎立即返回类型的,而将随时执行它的操作。 我从我的路由

  • 在GFM中,在列表中创建连续列表的正确方法是什么? 目标: 一个 我已经看了关于持续号码列表和相关问题的常见帖子,但还没有看到有人问这个问题。

  • 读取CSV文件。 基于记录子集(10-100行?),迭代地检查每行的每列,以自动确定CSV中数据的正确列类型。因此,如果第1行A列的值为12345(int),但第2行A列的值为ABC(varchar),系统将根据在前两次传递中找到的数据的组合自动确定它应该是varchar(5)格式。只要用户认为有必要确定列的可能类型和大小,这个过程就可以进行多少次。 按照CSV的列检查定义构建CREATE TAB

  • 使用构建类型为不同环境构建APK 我正在用这篇文章来构建我的apk,而不是android studio。除了在gradle中使用环境变量外,其他一切都正常: 在构建之前,我设置了环境变量 然后在我的build.gradle: 当我执行:gradle assemble时,我得到一个错误: 我尝试了几种组合,但得到了相同的错误: BuildConfig无法正确创建(Gradle Android) Bu

  • 我尝试在Spring Boot应用程序中配置apache kafka。我阅读了这篇文档,并按照以下步骤操作: 1)我将以下行添加到: 2)我创建新主题:

  • 我想知道在静态编程语言中创建全局常量的最佳方法是什么。Java,我们将使用一个包含常量的类,我们只需要导入这个类就可以访问所有常量。但是在静态编程语言中,有两种主要的方法: > 您可以创建一个包含所有常量的对象: 对象常量{const valCONST_1="foo"const valCONST_2="bar"const valCONST_3="toto"} 但这不是推荐的方式,正如一位语言开发者