假设我有下面的通量和单声道嵌套。我有两个不同的Apache Cassandra表的信息。我想合并细节,并作为通量发送回。
请参阅下面更新的伪代码。
@Autowired FollowersRepository followersRepository;
@Autowired TopicRepository topicRepository;
@GetMapping("/info")
public Flux<FullDetails> getData(){
return Flux.create(emitter ->{
followersRepository.findAll()
.doOnNext(data -> {
List<String> all = data.getTopiclist(); //will get list of topic id
List<Alltopics> processedList = new ArrayList<Alltopics>();
all.forEach(action -> {
topicRepository.findById(action) //will get full detail about topic
.doOnSuccess(topic ->{
processedList.add(topic);
if (processedList.size() >= all.size()) {
FullDetails fulldetails = new FullDetails(action,processedList);
emitter.next(fulldetails);
//emitter.complete();
}
})
.subscribe();
});
})
.doOnComplete(() ->{
System.out.println("All the data are processed !!!");
//emitter.complete(); // executing if all the data are pushed from database not waiting for doOnNext method to complete.
})
.subscribe();
});
}
有关更多详细信息,请参阅此处的代码CodeLink。
我已经尝试了 doOnOn 完成和 doOnFin 对于外部 Flux,它不会等待所有内部非阻塞调用完成。
我想在处理完 Flux 内部的所有嵌套 Mono(非阻塞)请求后调用“完成”。
发射器内的Flux实际上没有做任何事情,因为没有订户。发射器通常会对引发的事件做出反应,例如收到消息等。您可以在下面添加subscribe()以使其工作。阅读热与冷订阅者。http://projectreactor.io/docs/core/snapshot/reference/#reactor.hotCold
return Flux.create(emitter -> {
Flux.just(1,2,3,4,5) //list of ids from database
.doOnNext(uuid ->{
this.getData(uuid).doOnSuccess((result) -> {
System.out.println("query data from database "+uuid);
emitter.next("Data from database.");
});
})
.doOnComplete(()->{
System.out.println("Not waiting for all the Nested Mono to complete. ");
})
.subscribe();
});
如果你调用的是数据库,你可能不需要担心通过发射器引发事件
public Flux<String> getAllData2(){
return Flux.just(1, 2, 3, 4, 5)
.flatMap(uuid1 -> getData(uuid1).doOnSuccess(result -> System.out.println("query data from database " + result)))
.doOnComplete(() -> System.out.println("Not waiting for all the Nested Mono to complete. "));
}
如另一个问题中所述,当使用Undertow时,所有处理都应该在专用的工作线程池中完成,如下所示: 我知道可用于显式地告诉Undertow在专用的线程池中调度请求以阻止请求。我们可以通过将包装在实例中来修改上面的示例,如下所示: 调用此方法将exchange置于阻塞模式,并创建一个BlockingHttpExchange对象来存储流。当交换处于阻塞模式时,输入流方法变得可用,除了阻塞和非阻塞模式之间
问题内容: OpenSSL库允许使用SSL_read从基础套接字读取并使用SSL_write对其进行写入。这些函数可能会根据其SSL协议需求(例如,在重新协商连接时),以SSL_ERROR_WANT_READ或SSL_ERROR_WANT_WRITE返回。 我不太了解API希望我如何处理这些结果。 对一个接受客户端连接的服务器应用程序进行映像,建立一个新的ssl会话,使基础套接字成为非阻塞状态,然
我有一个聚合器实用程序类,在那里我必须连接多个cassandra表数据。我的生产代码将如下所示,但不完全相同。 有关更多详细信息,请参考此处的代码。 我已经尝试了 doOnOn 完成和 doOnFin 对于外部 Flux,它不会等待所有内部非阻塞调用完成。 在处理完Flux内部所有嵌套的Mono/Flux(非阻塞)请求后,我想调用onComplete。 对于嵌套阻塞磁通量/单晶通量,外部通量 do
问题内容: 我需要一种方法来读取Popen创建的流中所有当前可用的字符,或者找出缓冲区中还剩下多少个字符。 背景:我想用Python远程控制一个交互式应用程序。到目前为止,我已经使用Popen创建了一个新的子进程: (我并不是真正开始使用python,但是实际的交互界面是相似的。)此刻,我读取了1个字节,直到检测到进程已到达命令提示符: 然后,我通过进行冗长的计算。我的问题是,由于无法检查流中的最
我的目标很简单,但不幸的是,我在反应式编程方面完全是新手: 具有<代码>单声道
以下构造是否以任何可能的方式阻塞或锁定?这是使用R2DB的正确方式吗?否则,如何以被动方式处理表中的所有记录? 我关心的是在整个流量消耗之前数据库连接、表和Reactor线程发生了什么。如果我正在阻塞线程,或者我保持表锁定,或者数据库连接被阻塞。 目的是写一个表中所有行的批处理。对于每一行,我都要执行一个活动,该活动包括从外部web服务获取数据,并最终在同一个表中覆盖原始发票。 假设单行处理方法(