当前位置: 首页 > 知识库问答 >
问题:

使用reactive MongoDB在Webflux中的Mono上执行n次操作

姬国安
2023-03-14

对于Webflux应用程序中使用反应MongoDB驱动程序和Spring的数据加载器,我有以下场景:

  1. 创建X个类型为B的对象
  2. 创建Y个A类型的对象:对象A包含一个数组类型的字段和对B类型对象的引用。对B的引用是从第一步随机选择的
  3. 将N个条目添加到先前创建的对象的数组

我面临的问题似乎是Mono/Flux的并行执行,从我的理解来看,这不应该发生。根据文档,除非另有规定,否则总是按顺序执行。

谁能给我一个提示,我做错了什么?

下面是一个示例代码段。物体A是一个厕所。对象B是用户。“数组”字段是“注释”字段:

kotlin prettyprint-override">Flux.range(0, 10)
            // create 10 objects of type user
            .flatMap {
                LOG.debug("Creating user $it")
                userRepository.save(
                    User(
                        id = ObjectId(),
                        name = userNames.random(),
                        email = "${userNames.random()}@mail.com"
                    )
                )
            }
            .collectList()
            // create 2 objects of type toilet
            .flatMapMany { userList ->
                Flux.range(0, 2).zipWith(Flux.range(0, 2).map { userList })
            }
            .flatMap {
                LOG.debug("Creating toilet ${it.t1}")
                val userList = it.t2

                toiletRepository.save(
                    Toilet(
                        id = ObjectId(),
                        title = userList.random().name
                    )
                )
            }
            // add 5 entries to array of toilet
            .flatMap { toilet ->
                Flux.range(0, 5).zipWith(Flux.range(0, 5).map { toilet })
            }
            .flatMap { tuple ->
                val toilet = tuple.t2
                LOG.debug("Creating comment ${tuple.t1} for toilet $toilet")

                // get current values from toilet
                toiletRepository.findById(toilet.id).map {
                    // and push a new element to the comments array
                    LOG.debug("Comment size ${it.commentRefs.size}")
                    toiletRepository.save(it.apply { commentRefs.add(ObjectId()) })
                }
            }
            .subscribe {
                GlobalScope.launch {
                    exitProcess(SpringApplication.exit(context))
                }
            }

执行此代码将生成以下日志:

2020-11-15 19:42:54.197 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 0
2020-11-15 19:42:54.293 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 1
2020-11-15 19:42:54.295 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 2
2020-11-15 19:42:54.296 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 3
2020-11-15 19:42:54.300 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 4
2020-11-15 19:42:54.301 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 5
2020-11-15 19:42:54.304 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 6
2020-11-15 19:42:54.310 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 7
2020-11-15 19:42:54.316 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 8
2020-11-15 19:42:54.318 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 9
2020-11-15 19:42:54.348 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner                 : Creating toilet 0
2020-11-15 19:42:54.380 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner                 : Creating toilet 1
2020-11-15 19:42:54.386 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 0 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.405 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 1 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.406 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 2 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.407 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 3 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.409 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 4 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.410 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 0 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.412 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 1 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.413 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 2 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.414 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 3 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.415 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 4 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.425 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-6] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-2] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-3] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-9] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-7] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.429 DEBUG 13524 --- [ntLoopGroup-3-2] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.429 DEBUG 13524 --- [ntLoopGroup-3-9] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.464 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner                 : Comment size 0
    null

共有1个答案

巢烨
2023-03-14

我不确定这是否是最好的方法,但我可以通过将函数中的操作拆分以使其更合乎逻辑地分组来实现我想要的。

以下是以下操作的片段:

  1. 创建用户
  2. 创建注释
  3. 创建分级
private fun createUsers() = Flux.range(0, userNames.size + 1)
        .flatMap {
            if (it < userNames.size) {
                LOG.debug("Creating user $it")
                userRepository.save(
                    User(
                        id = ObjectId(),
                        name = userNames[it],
                        email = "${userNames[it]}@mail.com"
                    )
                )
            } else {
                LOG.debug("Creating dev-user")
                userRepository.save(
                    User(
                        id = ObjectId("000000000000012343456789"),
                        name = "devuser",
                        email = "devuser@mail.com"
                    )
                )
            }
        }
        .collectList()
private fun createComments(users: List<User>) = Flux.range(0, numComments)
        .flatMap {
            LOG.debug("Creating comment $it")
            commentRepository.save(
                Comment(
                    id = ObjectId(),
                    text = commentTexts.random(),
                    userRef = users.random().id
                )
            )
        }
        .collectList()
private fun createRatings(users: List<User>) = Flux.range(0, numRatings)
        .flatMap {
            LOG.debug("Creating rating $it")
            ratingRepository.save(
                Rating(
                    id = ObjectId(),
                    userRef = users.random().id,
                    value = Random.nextInt(0, 5)
                )
            )
        }
        .collectList()
private fun createToilets(comments: List<Comment>, ratings: List<Rating>) = Flux.range(0, numToilets)
        .flatMap {
            val toilet = Toilet(
                id = ObjectId(),
                title = titles.random(),
                location = GeoJsonPoint(Random.nextDouble(10.0, 20.0), Random.nextDouble(45.0, 55.0)),
                description = descriptions.random()
            )

            // add comments
            val commentsToAdd = Random.nextInt(0, comments.size)
            for (i in 0 until commentsToAdd) {
                toilet.commentRefs.add(comments[i].id)
            }

            // add average rating and rating references
            val ratingsToAdd = Random.nextInt(0, ratings.size)
            for (i in 0 until ratingsToAdd) {
                toilet.ratingRefs.add(ratings[i].id)
                toilet.averageRating += ratings[i].value
            }
            if (toilet.ratingRefs.isNotEmpty()) {
                toilet.averageRating /= toilet.ratingRefs.size
            }

            LOG.debug("Creating toilet $it with $commentsToAdd comments and $ratingsToAdd ratings")

            toiletRepository.save(toilet)
        }
        // upload preview image
        .flatMap { toilet ->
            val imageName = "toilet${Random.nextInt(1, 10)}.jpg"
            imageService.store(
                Callable {
                    DataLoaderRunner::class.java.getResourceAsStream("/sample-images/$imageName")
                },
                "${toilet.title}-preview"
            ).zipWith(Mono.just(toilet))
        }
        // set preview image
        .flatMap {
            val imageId = it.t1
            val toilet = it.t2

            toiletRepository.save(toilet.copy(previewID = imageId))
        }
        .collectList()
createUsers()
            .flatMap { users ->
                createComments(users).map { comments ->
                    Tuples.of(users, comments)
                }
            }
            .flatMap {
                val users = it.t1
                val comments = it.t2

                createRatings(users).map { ratings ->
                    Tuples.of(comments, ratings)
                }
            }
            .flatMap {
                val comments = it.t1
                val ratings = it.t2

                createToilets(comments, ratings)
            }
            // close application when all toilets are processed
            .subscribe {
                GlobalScope.launch {
                    exitProcess(SpringApplication.exit(context))
                }
            }
 类似资料:
  • 问题内容: 我想做的是在有webclient的Webflux中的Mono上有条件地重复。情况如下: 我们有一些商务休息服务服务,可返回生成的文档。此文档的生成是由在此之前调用的另一服务触发的。但是,回到正题:文档生成服务需要10到30秒。我们想要做的是:10秒钟后检查是否生成文档(单声道)。如果是这样,一切都很好。如果不是,请在5秒钟后重复(或重试)并检查是否生成了文档。依此类推,直到(最坏的情况

  • 我要做的是用WebClient对Webflux中的单声道进行有条件的重复。情况如下: 我们有一些返回生成文档的业务rest服务。此文档的生成是从在此文档之前调用的另一个服务触发的。但是,回到业务:文档生成服务需要10-30秒。我们要做的是:10秒后检查是否生成了文档(单)。如果是这样,一切都很好。如果没有,请在5秒后重复(或重试)并检查是否生成了文档。以此类推,直到(最坏的情况)30秒后超时。这可

  • 我尝试使处理程序和路由器类的Spring引导webflow。模型类是用户类。代码是 下面是webflux项目的处理程序类。在register方法中,我编写了id复制测试代码。但这是完全错误的。 我想提取的用户名或id字符串从Mono的Spring webflow.将需要任何评论。我被这部分卡住了。

  • 我写了一个@Aspect来拦截以Mono/Flux返回值的被动方法。使用@AfterReturning advice,我试图通过调用webservice发出APNS通知。 不幸的是,processNotification Mono服务在没有执行调用链的情况下立即返回onComplete信号。下面是我的示例程序。 我们如何在不等待侦听的情况下异步触发此调用。。目前,processNotificati

  • 问题内容: 我正在尝试使用Java 8中的Streams API从Collection中检索n个唯一的随机元素,以进行进一步处理,但是运气不佳。 更确切地说,我想要这样的东西: 我想尽可能有效地做到这一点。 能做到吗? 编辑:我的第二次尝试-尽管不完全是我的目标: 编辑:第三次尝试,如果coll.size()很大而n很小时,它将消除很多随机播放的开销: 问题答案: 如fge在评论中和ZouZou在

  • 我正在尝试使用 Java 8 中的 Streams API 从集合中检索 n 个唯一的随机元素以进行进一步处理,但是,没有太多运气。 更准确地说,我想要这样的东西: 我想尽可能高效地做这件事。 这可以做到吗? 编辑:我的第二次尝试——虽然不是我的目标: 编辑:第三次尝试(受Holger启发),如果coll.size()很大而n很小的话会去掉很多shuffle的开销: