我用的是Spring助焊剂。我需要从不同的来源组装一个物体。如何确保两个流都返回了所需的数据?
比如:
public Mono<MyObject> createMyObject() {
MyObject myObject = new MyObject();
someService.getSomeData().subscribe(myObject::setData);
oneMoreService.getMoreData().subscribe(list -> {
// myObject populate more fields
});
// how can I be sure that someData and moreData is populated, before we reach doSomeBusinessStuff method?
return Mono.just(myObject);
}
public Result doSomeBusinessStuff(Mono<MyObject> myObject) {
// make some other calculations with someData and moreData
}
你可以拉上拉链。
return Mono.zip(someService.getSomeData(), oneMoreService.getMoreData())
.map(t -> {
X data1 = t.getT1();
Y data2 = t.getT2();
MyObject myObject = new MyObject();
//...
return myObject;
});
你可以在留档中找到关于它的信息。https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#zip-reactor.core.publisher.Mono-reactor.core.publisher.Mono-
我本可以使用而不是第二个,因为它不映射任何东西,但我不确定这是否是peek方法的可接受用法。 我也可以在第二个中使用一个有状态映射器来只运行一次,而不是用索引压缩,我想这是可以接受的,因为我已经使用了一个有状态谓词...
我使用的是Reactive Spring Cloud Stream,在创建没有输出的StreamListener时遇到了困难。只要没有收到格式错误的消息,下面的代码就可以工作。当收到格式错误的消息时,流量将关闭。 如果我理解正确的话,最好让框架订阅流量,而不是手动订阅流量。当侦听器有输出时,这不是一个问题,因为我可以简单地返回流量,如下所示: 框架似乎以一种在返回时不会关闭流量的方式处理坏消息。当
spring cloud stream如何将多个Kafka分区分配给属于同一消费者组的反应流? 我注意到,如果我使用普通的非反应流侦听器,每个线程将被分配到一个分区,这取决于使用者并发配置。然而,在流(流量输入)的情况下,我没有注意到任何这样的并行行为。似乎只定义了一个流来处理来自所有分区的消息。 我的期望是每个Kafka主题分区都有独立的流,即使在由不同线程备份的同一节点上也是如此。
如何将响应从反应型HTTP客户机流式传输到控制器,而不在任何时候将整个响应主体放在应用程序内存中? 几乎所有project reactor客户机的示例都返回。据我所知,反应流是关于流,而不是加载它,然后发送响应。 是否可以返回,以便在不需要使用大量RAM内存来存储中间结果的情况下,将大文件从某个外部服务传输到应用程序客户机?
我的理解是Spring Webflux提供了一个非阻塞/异步并发模型。然而,我得到了一个需要帮助的基本问题。作为一个免责声明,这种反应编程的整个概念对我来说是非常新的,我仍然处于这种范式转变的过程中。 请考虑以下代码: 我从文档中了解到,在调用“subscribe”函数之前,事件处理链不会发生任何事情。 我注意到代码总是按顺序运行,并且使用相同的线程。spring WebFlux的线程模型是什么?