当前位置: 首页 > 知识库问答 >
问题:

如何查找Flux(父级)中的所有数据都由其内部非阻塞Flux或Mono(子级)处理?

邬友樵
2023-03-14

我有一个聚合器实用程序类,在那里我必须连接多个cassandra表数据。我的生产代码将如下所示,但不完全相同。

@Autowired FollowersRepository followersRepository;
    @Autowired TopicRepository topicRepository;
        @GetMapping("/info")
        public Flux<FullDetails> getData(){
            return Flux.create(emitter ->{
                followersRepository.findAll() 
                .doOnNext(data -> {
                    List<String> all = data.getTopiclist(); //will get list of topic id
                    List<Alltopics> processedList = new ArrayList<Alltopics>();
                    all.forEach(action -> {
                        topicRepository.findById(action) //will get full detail about topic
                        .doOnSuccess(topic ->{
                            processedList.add(topic);
                            if (processedList.size() >= all.size()) {
                                FullDetails fulldetails = new FullDetails(action,processedList);
                                emitter.next(fulldetails);
                                //emitter.complete();
                            }
                        })
                        .subscribe();
                    });
                })
                .doOnComplete(() ->{
                    System.out.println("All the data are processed !!!");
                    //emitter.complete(); // executing if all the data are pushed from database not waiting for doOnNext method to complete.
                })
                .subscribe();
            });
        }

有关更多详细信息,请参考此处的代码。

我已经尝试了 doOnOn 完成和 doOnFin 对于外部 Flux,它不会等待所有内部非阻塞调用完成。

在处理完Flux内部所有嵌套的Mono/Flux(非阻塞)请求后,我想调用onComplete。

对于嵌套阻塞磁通量/单晶通量,外部通量 doOn Complete 方法在完成内部通量/单晶后执行。

后记:-

在下面的例子中,我找不到放置emitter.complete()的位置。因为doOnCompl()方法是在完成所有内部Mono之前调用的。

请求正文:-

[{ "content":"Intro to React and operators", "author":"Josh Long", "name":"Spring WebFlux" },{ "content":"Intro to Flux", "author":"Josh Long", "name":"Spring WebFlux" },{ "content":"Intro to Mono", "author":"Josh Long", "name":"Spring WebFlux" }] 

我的Rest控制器:-

@PostMapping("/topics")
    public Flux<?> loadTopic(@RequestBody Flux<Alltopics> data)
    {
        return Flux.create(emitter ->{
            data
            .map(topic -> {
                topic.setTopicid(null ==topic.getTopicid() || topic.getTopicid().isEmpty()?UUID.randomUUID().toString():topic.getTopicid());
                return topic;
            })
            .doOnNext(topic -> {
                topicRepository.save(topic).doOnSuccess(persistedTopic ->{
                    emitter.next(persistedTopic);
                    //emitter.complete();
                }).subscribe();
            })
            .doOnComplete(() -> {
                //emitter.complete();
                System.out.println(" all the data are processed!!!");
            }).subscribe();
        });
    }

共有1个答案

池麒
2023-03-14

以下是编写反应式管道时应该遵循的一些规则:

  1. doOnXYZ运算符永远不应该用于执行大量I/O、涉及延迟的操作或任何反应式操作。这些应该用于“副作用”操作,例如日志记录、指标等。
  2. 您永远不应该从管道或返回反应类型的方法中订阅。这将此操作的处理与主管道分离,这意味着不能保证您会在正确的时间获得预期的结果,也不能保证您的应用程序会知道完整的/错误信号。
  3. 您永远不应该在管道或返回反应式类型的方法中阻止。这将在运行时给您的应用程序带来严重问题。

现在,由于您的代码片段相当复杂,我将只给出使用另一个代码片段的大致方向。

@GetMapping("/info")
public Flux<FullDetails> getData(){
    return followersRepository.findAll()
        .flatMap(follower -> {
            Mono<List<Alltopics>> topics = topicRepository.findAllById(follower.getTopiclist()).collectList();
            return topics.map(topiclist -> new FullDetails(follower.getId(), topiclist));
        });
}
 类似资料:
  • 我有这个场景。我有一个分页的API,它给我过去12个月的数据。API的响应是这样的: 现在我必须收集所有的数据,然后计算所有的总和,并返回为

  • 更新: 一点我想要实现的内容。我有两个服务--一个通过Http返回me,另一个通过Redis返回。对于这两种情况,我有完全相同的功能-10-15个操作符链,我想要实现的是避免重复代码。 例如:

  • 假设我有下面的通量和单声道嵌套。我有两个不同的Apache Cassandra表的信息。我想合并细节,并作为通量发送回。 请参阅下面更新的伪代码。 有关更多详细信息,请参阅此处的代码CodeLink。 我已经尝试了 doOnOn 完成和 doOnFin 对于外部 Flux,它不会等待所有内部非阻塞调用完成。 我想在处理完 Flux 内部的所有嵌套 Mono(非阻塞)请求后调用“完成”。

  • 问题内容: 我要删除,因此要从数据库中删除行。 我 确实 要删除某个实体及其所有行。不过,我 不 希望从它删除任何行。 如何 我能做到这一点? 是,并且是我要删除的实体。 请参阅下面的代码,了解我如何在“狗窝实体”中链接2: 目前,当我删除狗entitie(S),其相关的养犬实体 中 也被删除。 编辑:将狗映射到狗窝: 问题答案: 当前,当我删除狗实体时,其相关的狗窝实体也将被删除。 原因是您已设

  • 我想从Flux/Mono中获取对象。我使用 我会这样做: 我有错误: 为什么?有什么不同的方法来获取对象? 在反应式编程中,如何做到:在RequestBody中,您有UserDto。 如果不创建用户,请检查数据库中是否存在电子邮件。

  • 问题内容: 我需要一种方法来读取Popen创建的流中所有当前可用的字符,或者找出缓冲区中还剩下多少个字符。 背景:我想用Python远程控制一个交互式应用程序。到目前为止,我已经使用Popen创建了一个新的子进程: (我并不是真正开始使用python,但是实际的交互界面是相似的。)此刻,我读取了1个字节,直到检测到进程已到达命令提示符: 然后,我通过进行冗长的计算。我的问题是,由于无法检查流中的最