当前位置: 首页 > 知识库问答 >
问题:

如何在一个有嵌套单声道(非阻塞)的流中找到所有被处理的数据?

万俟嘉珍
2023-03-14

假设我有下面的通量和单声道嵌套。我有两个不同的Apache Cassandra表的信息。我想合并细节,并作为通量发送回。

请参阅下面更新的伪代码。

@Autowired FollowersRepository followersRepository;
@Autowired TopicRepository topicRepository;
    @GetMapping("/info")
    public Flux<FullDetails> getData(){
        return Flux.create(emitter ->{
            followersRepository.findAll() 
            .doOnNext(data -> {
                List<String> all = data.getTopiclist(); //will get list of topic id
                List<Alltopics> processedList = new ArrayList<Alltopics>();
                all.forEach(action -> {
                    topicRepository.findById(action) //will get full detail about topic
                    .doOnSuccess(topic ->{
                        processedList.add(topic);
                        if (processedList.size() >= all.size()) {
                            FullDetails fulldetails = new FullDetails(action,processedList);
                            emitter.next(fulldetails);
                            //emitter.complete();
                        }
                    })
                    .subscribe();
                });
            })
            .doOnComplete(() ->{
                System.out.println("All the data are processed !!!");
                //emitter.complete(); // executing if all the data are pushed from database not waiting for doOnNext method to complete.
            })
            .subscribe();
        });
    }

有关更多详细信息,请参阅此处的代码CodeLink。

我已经尝试了 doOnOn 完成和 doOnFin 对于外部 Flux,它不会等待所有内部非阻塞调用完成。

我想在处理完 Flux 内部的所有嵌套 Mono(非阻塞)请求后调用“完成”。

共有1个答案

施华奥
2023-03-14

发射器内的Flux实际上没有做任何事情,因为没有订户。发射器通常会对引发的事件做出反应,例如收到消息等。您可以在下面添加subscribe()以使其工作。阅读热与冷订阅者。http://projectreactor.io/docs/core/snapshot/reference/#reactor.hotCold

return Flux.create(emitter -> {
            Flux.just(1,2,3,4,5) //list of ids from database

                    .doOnNext(uuid ->{
                        this.getData(uuid).doOnSuccess((result) -> {
                            System.out.println("query data from database "+uuid);
                            emitter.next("Data from database.");
                        });
                    })
                    .doOnComplete(()->{
                        System.out.println("Not waiting for all the Nested Mono to complete. ");
                    })
            .subscribe();
        });

如果你调用的是数据库,你可能不需要担心通过发射器引发事件

public Flux<String> getAllData2(){
        return Flux.just(1, 2, 3, 4, 5)
                .flatMap(uuid1 -> getData(uuid1).doOnSuccess(result -> System.out.println("query data from database " + result)))
                .doOnComplete(() -> System.out.println("Not waiting for all the Nested Mono to complete. "));
    }
 类似资料:
  • 如另一个问题中所述,当使用Undertow时,所有处理都应该在专用的工作线程池中完成,如下所示: 我知道可用于显式地告诉Undertow在专用的线程池中调度请求以阻止请求。我们可以通过将包装在实例中来修改上面的示例,如下所示: 调用此方法将exchange置于阻塞模式,并创建一个BlockingHttpExchange对象来存储流。当交换处于阻塞模式时,输入流方法变得可用,除了阻塞和非阻塞模式之间

  • 问题内容: OpenSSL库允许使用SSL_read从基础套接字读取并使用SSL_write对其进行写入。这些函数可能会根据其SSL协议需求(例如,在重新协商连接时),以SSL_ERROR_WANT_READ或SSL_ERROR_WANT_WRITE返回。 我不太了解API希望我如何处理这些结果。 对一个接受客户端连接的服务器应用程序进行映像,建立一个新的ssl会话,使基础套接字成为非阻塞状态,然

  • 我有一个聚合器实用程序类,在那里我必须连接多个cassandra表数据。我的生产代码将如下所示,但不完全相同。 有关更多详细信息,请参考此处的代码。 我已经尝试了 doOnOn 完成和 doOnFin 对于外部 Flux,它不会等待所有内部非阻塞调用完成。 在处理完Flux内部所有嵌套的Mono/Flux(非阻塞)请求后,我想调用onComplete。 对于嵌套阻塞磁通量/单晶通量,外部通量 do

  • 问题内容: 我需要一种方法来读取Popen创建的流中所有当前可用的字符,或者找出缓冲区中还剩下多少个字符。 背景:我想用Python远程控制一个交互式应用程序。到目前为止,我已经使用Popen创建了一个新的子进程: (我并不是真正开始使用python,但是实际的交互界面是相似的。)此刻,我读取了1个字节,直到检测到进程已到达命令提示符: 然后,我通过进行冗长的计算。我的问题是,由于无法检查流中的最

  • 我的目标很简单,但不幸的是,我在反应式编程方面完全是新手: 具有<代码>单声道

  • 以下构造是否以任何可能的方式阻塞或锁定?这是使用R2DB的正确方式吗?否则,如何以被动方式处理表中的所有记录? 我关心的是在整个流量消耗之前数据库连接、表和Reactor线程发生了什么。如果我正在阻塞线程,或者我保持表锁定,或者数据库连接被阻塞。 目的是写一个表中所有行的批处理。对于每一行,我都要执行一个活动,该活动包括从外部web服务获取数据,并最终在同一个表中覆盖原始发票。 假设单行处理方法(