如何将Mono/Flux的所有结果相加
我有一些反应代码,我返回一个Flux并链接一些其他函数,我会用一些代码来解释它。
return commentService.getAllCommentByIssueId(issueId)
.map {
taggingService.tagging(it.content).map {
//Get the count of the returned set and add it up to
//all the other returned map results
//So the result should be like
//tags in first comment 2, tags in second comment 3
//so it should return 5
}
所以我返回一个流量,这是一个流量的评论。我想将此查询的所有注释映射到一个函数,在该函数中,我扫描每个注释的内容以查找标记,这是由标记服务(taggingservice)实现的。标记,该函数返回一个单声道
我想做的是统计问题每条评论的所有标签,并将其返回到StatsModel中,StatsModel由StatsModel(issueId,numberOfTagsInComments)组成
我现在将向您展示标记功能:
(我测试了功能,它正在工作)
fun tagging(text:String) : Mono<MutableSet<UUID>> {
val words = text.split( " ")
.filter { it.startsWith("@")}
val matches : MutableSet<UUID> = mutableSetOf()
return userRepository.findAll().collectList()
.map { userList ->
for(word in words){
userList.map {user ->
if(user.username == word.substring(1)) {
matches.add(user.id!!)
}
}
}
matches
}
}
还有我的CommentRepository,我正在使用这个repo的第一个功能。
@Repository
interface CommentRepository: ReactiveCrudRepository<CommentModel, UUID> {
fun findAllByIssueId(issueId: UUID): Flux<CommentModel>
fun findAllByUserId(userId: UUID): Flux<CommentModel>
}
让我们尝试清楚地说明最终目标:您想要获取StatsModel(seceId, numberOfUniqueTagsIn注解)
。然后我们可以将其分解为子问题(为了清楚起见,我会稍微更改命名):
fun getAllComments(issueId):Flux
把所有的放在一起:
getAllComments(issueId) // Flux<Comment>
.flatMap { uniqueTags(it) } // Flux<Set<UUID>>
.collectList() // Mono<List<Set<UUID>>
.map { uuidSets ->
uuidSets
.reduce { acc, set -> acc + set } // Set<UUID>
.size() // Int
}
.map { StatsModel(issueId, it) }
你的问题不清楚,但正如我所说,你似乎想减少。
您的问题不清楚,因为您正在从标记函数返回一个集,但集只允许唯一的值,所以如果同一个标记在注释中出现两次会怎么样?
无论如何,假设您想要每个注释中唯一标记的总数,您可以考虑:
我制作了标签和UUID的DB(映射)
Map<String, UUID> tags;
tags = Arrays.asList(new String[] {"tag1", "tag2"}).stream().collect(Collectors.toMap(Function.identity(), s->UUID.randomUUID() ));
标记功能的我的版本:
Mono<Set<UUID>> tagging(String string) {
return Flux.fromArray(text.split(" ")).filter(str->str.startsWith("@")).map(str->str.substring(1)).map(tags::get).collect(Collectors.toSet());
}
获取通量的函数
Flux<String> getAllComments() {
return Flux.fromArray(new String[] {"this is @tag1", "and @tag2", "finally @tag1 and @tag2", "unclear @tag1 and @tag1"});
}
以及一些简单的逻辑来计算每个集合的大小:
getAllComments()
.flatMap(this::tagging)
.map(Set::size)
.reduce(Integer::sum)
.map(StatsModel::new)
.subscribe(System.out::println);
假设龙目岛是你的朋友
@ToString
@RequiredArgsConstructor
class StatsModel{
UUID issueId = UUID.randomUUID();
@NonNull
Integer count;
}
给我:
StatsModel(issueId=a3cf1243-8a89-406c-b0e8-c13a6bdc832c, count=5)
生成的UUID是免费的。
我一直在使用Spring Boot 2.0.1及其Webflux库开发一个示例reactive web api。我一直在看网上的例子,试图建立它,但我被两件事难倒了。下面是我的两个问题。 1)如何返回响应实体流,当我尝试时,我得到一个错误,说只能返回单个响应实体。下面是我当前的代码。 2)当我更新一个对象时,我使用一个flatMap来更新保存在Mongo中的对象,然后使用一个Map来将其转换为响应
我使用了来自org的openapi生成器maven插件。在我的Spring Boot项目中启用了被动配置的openapitools。我的一个endpoint返回一个列表体响应,该响应自动生成为Mono 如何使用WebTestClient在联调中测试endpoint控制器的主体? 如果我尝试这样做,它不会起作用,因为我接收到的是通量,而不是预期的dto对象。
projectReactor文档说明是异步的,如下所示。 所以,我可以这样写我的所有方法来返回Mono发布者。 并与一起使用,如下所示: 公共最终单声道平面映射(函数<?super T,?extends Mono<?extends r>>transformer) 异步转换此单声道发出的项,返回另一个单声道发出的值(可能更改值类型)。
我正在使用Junit 5和mockito进行一些单元测试。 要被模拟的方法调用如下。它返回一个Mono并接受两个String参数。 我嘲笑它如下 这会产生一个空指针,如下所示
例如,如果我同时调度一个包含3个异步调用的列表,那么以下面的方式应用map操作会阻塞吗? 在上面的片段中,每个map操作都要阻塞吗?假设第一个呼叫需要5毫秒才能返回,其他每一个呼叫需要2毫秒才能返回,我们是否要等待3ms+2ms+2m=7ms来执行enitre操作?或者只有3ms,因为一旦第一个调用得到解析,那么2ms的调用就已经解析了。
假设我有以下流程 只是为了理解 因此,我有 但我需要 继续皱眉 这个问题有什么好的解决办法吗?