当前位置: 首页 > 知识库问答 >
问题:

如何在Webflux应用中使Spring云流消费者?

白光耀
2023-03-14

我有一个基于Webflux的微服务,它有一个简单的反应存储库:

    public interface NotificationRepository extends ReactiveMongoRepository<Notification, ObjectId> {
    }
    @Bean
    public Consumer<KStream<String, Event>> documents(NotificationRepository repository) {
        return input ->
                input.foreach((key, value) -> {
                    LOG.info("Received event, Key: {}, value: {}", key, value);
                    repository.save(initNotification(value)).subscribe();
                });
    }
Native support for reactive programming - since v3.0.0 we no longer distribute spring-cloud-stream-reactive modules and instead relying on native reactive support provided by spring cloud function. For backward compatibility you can still bring spring-cloud-stream-reactive from previous versions.

而且在这个演示视频中,他们提到他们使用project Reactor有反应性编程支持。所以我想有一种方法我只是不知道。你能教我怎么做对吗?

如果这一切听起来太愚蠢,我很抱歉,但我对Spring、Cloud、Stream和reactive编程非常陌生,还没有找到很多描述这方面的文章。

共有1个答案

洪高阳
2023-03-14

只需将Flux用作已消耗类型,如下所示:

@Bean
public Consumer<Flux<Message<Event>>> documents(NotificationRepository repository) {
    return input ->
            input
             .map(message-> /*map the necessary value like:*/ message.getPayload().getEventValue())
             .concatMap((value) -> repository.save(initNotification(value)))
             .subscribe();
}

如果使用返回类型为空的函数(函数 >、mono > )而不是使用者,那么框架可以自动订阅。对于consumer,您必须手动订阅,因为框架没有对流的引用。但是在consumer情况下,您订阅的不是存储库,而是整个流,这是可以的。

 类似资料:
  • Spring的云流是否也支持Kafka式的动觉再平衡?最近有人promise要解决这个问题https://github.com/spring-projects/spring-integration-aws/issues/99 谢谢

  • 在Spring Web中,我使用@PreAuthorize with SpEl来检查当前用户的权限。类似这样的东西: 在RestController中: 现在我尝试使用WebFlux。写入了reactiveCheckPermission。 但是它抛出IllegalStateException("block()/block First()/block Final()是阻塞的,线程并行不支持 将boo

  • 我目前正在使用带有的Kafka绑定器的Spring Cloud Stream为我的Spring Boot微服务执行消息记录。 我有: 生产者将消息发布到订阅频道 在消息从生产者发布到流并被消费者收听的整个过程中,可以观察到preSend方法被触发了两次: 一次在生产者端-消息发布到流时 然而,出于日志记录的目的,我只需要在消费者端截获并记录消息。 是否有任何方法可以仅在一侧(例如消费者侧)截获SC

  • 问题内容: 我正在使用spring数据(mongoDb),并且有我的存储库: 然后我有一个控制器: 一切正常,但是我无法使用RestTemplate getForEntity方法消耗端点: 我应该提供什么课程来成功反序列化实体页面? 问题答案: new TypeReference >() {} 该语句的问题在于,Jackson无法实例化抽象类型。您应该为Jackson提供有关如何实例化具体类型的信

  • 所以基本上,我希望数组中的所有对象都是单个流的一部分,因为这就是我将要使用的客户端应用程序的设计方式。目前总共只有3个。所以在每个流中都将包含一个数组的所有对象。我知道这不是WebFlux的最佳用途。如果可以做到,这怎么能做到呢?