当前位置: 首页 > 知识库问答 >
问题:

求反应器Mono中map的和

扈俊健
2023-03-14

如何将Mono/Flux的所有结果相加

我有一些反应代码,我返回一个Flux并链接一些其他函数,我会用一些代码来解释它。

return commentService.getAllCommentByIssueId(issueId)
                .map {
                    taggingService.tagging(it.content).map {
                        //Get the count of the returned set and add it up to 
                        //all the other returned map results
                        //So the result should be like
                        //tags in first comment 2, tags in second comment 3
                        //so it should return 5
                    }

所以我返回一个流量,这是一个流量的评论。我想将此查询的所有注释映射到一个函数,在该函数中,我扫描每个注释的内容以查找标记,这是由标记服务(taggingservice)实现的。标记,该函数返回一个单声道

我想做的是统计问题每条评论的所有标签,并将其返回到StatsModel中,StatsModel由StatsModel(issueId,numberOfTagsInComments)组成

我现在将向您展示标记功能:

(我测试了功能,它正在工作)

fun tagging(text:String) : Mono<MutableSet<UUID>> {
        val words = text.split( " ")
                .filter { it.startsWith("@")}
        val matches : MutableSet<UUID> = mutableSetOf()

        return userRepository.findAll().collectList()
                .map { userList ->
                    for(word in words){
                        userList.map {user ->
                            if(user.username == word.substring(1)) {
                                matches.add(user.id!!)
                            }
                        }
                    }
                    matches
                }
    }

还有我的CommentRepository,我正在使用这个repo的第一个功能。

@Repository
interface CommentRepository: ReactiveCrudRepository<CommentModel, UUID> {
    fun findAllByIssueId(issueId: UUID): Flux<CommentModel>
    fun findAllByUserId(userId: UUID): Flux<CommentModel>
}

共有2个答案

微生慈
2023-03-14

让我们尝试清楚地说明最终目标:您想要获取StatsModel(seceId, numberOfUniqueTagsIn注解)。然后我们可以将其分解为子问题(为了清楚起见,我会稍微更改命名):

  • 获取问题的所有评论:fun getAllComments(issueId):Flux

把所有的放在一起:

getAllComments(issueId)            // Flux<Comment>
  .flatMap { uniqueTags(it) }      // Flux<Set<UUID>>
  .collectList()                   // Mono<List<Set<UUID>>
  .map { uuidSets ->
    uuidSets
      .reduce { acc, set -> acc + set } // Set<UUID>
      .size()                           // Int
  }
  .map { StatsModel(issueId, it) }

孟谭三
2023-03-14

你的问题不清楚,但正如我所说,你似乎想减少。

您的问题不清楚,因为您正在从标记函数返回一个集,但集只允许唯一的值,所以如果同一个标记在注释中出现两次会怎么样?

无论如何,假设您想要每个注释中唯一标记的总数,您可以考虑:

我制作了标签和UUID的DB(映射)

Map<String, UUID> tags;
tags = Arrays.asList(new String[] {"tag1", "tag2"}).stream().collect(Collectors.toMap(Function.identity(), s->UUID.randomUUID() ));

标记功能的我的版本:

Mono<Set<UUID>> tagging(String string) {
    return Flux.fromArray(text.split(" ")).filter(str->str.startsWith("@")).map(str->str.substring(1)).map(tags::get).collect(Collectors.toSet());
}

获取通量的函数

Flux<String> getAllComments() {
    return Flux.fromArray(new String[] {"this is @tag1", "and @tag2", "finally @tag1 and @tag2", "unclear @tag1 and @tag1"});
}

以及一些简单的逻辑来计算每个集合的大小:

getAllComments()
    .flatMap(this::tagging)
    .map(Set::size)
    .reduce(Integer::sum)
    .map(StatsModel::new)
    .subscribe(System.out::println);

假设龙目岛是你的朋友

@ToString
@RequiredArgsConstructor
class StatsModel{
    UUID issueId = UUID.randomUUID();
    @NonNull
    Integer count;
}

给我:

StatsModel(issueId=a3cf1243-8a89-406c-b0e8-c13a6bdc832c, count=5)

生成的UUID是免费的。

 类似资料:
  • 我一直在使用Spring Boot 2.0.1及其Webflux库开发一个示例reactive web api。我一直在看网上的例子,试图建立它,但我被两件事难倒了。下面是我的两个问题。 1)如何返回响应实体流,当我尝试时,我得到一个错误,说只能返回单个响应实体。下面是我当前的代码。 2)当我更新一个对象时,我使用一个flatMap来更新保存在Mongo中的对象,然后使用一个Map来将其转换为响应

  • 我使用了来自org的openapi生成器maven插件。在我的Spring Boot项目中启用了被动配置的openapitools。我的一个endpoint返回一个列表体响应,该响应自动生成为Mono 如何使用WebTestClient在联调中测试endpoint控制器的主体? 如果我尝试这样做,它不会起作用,因为我接收到的是通量,而不是预期的dto对象。

  • projectReactor文档说明是异步的,如下所示。 所以,我可以这样写我的所有方法来返回Mono发布者。 并与一起使用,如下所示: 公共最终单声道平面映射(函数<?super T,?extends Mono<?extends r>>transformer) 异步转换此单声道发出的项,返回另一个单声道发出的值(可能更改值类型)。

  • 我正在使用Junit 5和mockito进行一些单元测试。 要被模拟的方法调用如下。它返回一个Mono并接受两个String参数。 我嘲笑它如下 这会产生一个空指针,如下所示

  • 例如,如果我同时调度一个包含3个异步调用的列表,那么以下面的方式应用map操作会阻塞吗? 在上面的片段中,每个map操作都要阻塞吗?假设第一个呼叫需要5毫秒才能返回,其他每一个呼叫需要2毫秒才能返回,我们是否要等待3ms+2ms+2m=7ms来执行enitre操作?或者只有3ms,因为一旦第一个调用得到解析,那么2ms的调用就已经解析了。

  • 假设我有以下流程 只是为了理解 因此,我有 但我需要 继续皱眉 这个问题有什么好的解决办法吗?