我有一个基于Webflux的微服务,它有一个简单的反应存储库:
public interface NotificationRepository extends ReactiveMongoRepository<Notification, ObjectId> {
}
@Bean
public Consumer<KStream<String, Event>> documents(NotificationRepository repository) {
return input ->
input.foreach((key, value) -> {
LOG.info("Received event, Key: {}, value: {}", key, value);
repository.save(initNotification(value)).subscribe();
});
}
Native support for reactive programming - since v3.0.0 we no longer distribute spring-cloud-stream-reactive modules and instead relying on native reactive support provided by spring cloud function. For backward compatibility you can still bring spring-cloud-stream-reactive from previous versions.
而且在这个演示视频中,他们提到他们使用project Reactor有反应性编程支持。所以我想有一种方法我只是不知道。你能教我怎么做对吗?
如果这一切听起来太愚蠢,我很抱歉,但我对Spring、Cloud、Stream和reactive编程非常陌生,还没有找到很多描述这方面的文章。
只需将Flux用作已消耗类型,如下所示:
@Bean
public Consumer<Flux<Message<Event>>> documents(NotificationRepository repository) {
return input ->
input
.map(message-> /*map the necessary value like:*/ message.getPayload().getEventValue())
.concatMap((value) -> repository.save(initNotification(value)))
.subscribe();
}
如果使用返回类型为空的函数
(函数
)而不是使用者,那么框架可以自动订阅。对于consumer
,您必须手动订阅,因为框架没有对流的引用。但是在consumer
情况下,您订阅的不是存储库,而是整个流,这是可以的。
Spring的云流是否也支持Kafka式的动觉再平衡?最近有人promise要解决这个问题https://github.com/spring-projects/spring-integration-aws/issues/99 谢谢
在Spring Web中,我使用@PreAuthorize with SpEl来检查当前用户的权限。类似这样的东西: 在RestController中: 现在我尝试使用WebFlux。写入了reactiveCheckPermission。 但是它抛出IllegalStateException("block()/block First()/block Final()是阻塞的,线程并行不支持 将boo
我目前正在使用带有的Kafka绑定器的Spring Cloud Stream为我的Spring Boot微服务执行消息记录。 我有: 生产者将消息发布到订阅频道 在消息从生产者发布到流并被消费者收听的整个过程中,可以观察到preSend方法被触发了两次: 一次在生产者端-消息发布到流时 然而,出于日志记录的目的,我只需要在消费者端截获并记录消息。 是否有任何方法可以仅在一侧(例如消费者侧)截获SC
问题内容: 我正在使用spring数据(mongoDb),并且有我的存储库: 然后我有一个控制器: 一切正常,但是我无法使用RestTemplate getForEntity方法消耗端点: 我应该提供什么课程来成功反序列化实体页面? 问题答案: new TypeReference >() {} 该语句的问题在于,Jackson无法实例化抽象类型。您应该为Jackson提供有关如何实例化具体类型的信
所以基本上,我希望数组中的所有对象都是单个流的一部分,因为这就是我将要使用的客户端应用程序的设计方式。目前总共只有3个。所以在每个流中都将包含一个数组的所有对象。我知道这不是WebFlux的最佳用途。如果可以做到,这怎么能做到呢?