当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream 3 RabbitMQ使用者不工作

宗政财
2023-03-14

我可以让Spring+Rabbit以非功能性的方式工作(在2.0之前?),但我试图与功能性模式一起使用,因为以前的模式是不推荐的。

我一直在关注这个文档:https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_binding_and_binding_names

队列(使用者)不是用新方法在Rabbit中创建的。我可以看到连接正在创建,但没有任何消费者。

spring.cloud.stream.function.bindings.approved-in-0=approved
spring.cloud.stream.bindings.approved.destination=myTopic.exchange
spring.cloud.stream.bindings.approved.group=myGroup.approved
spring.cloud.stream.bindings.approved.consumer.back-off-initial-interval=2000
spring.cloud.stream.rabbit.bindings.approved.consumer.queueNameGroupOnly=true
spring.cloud.stream.rabbit.bindings.approved.consumer.bindingRoutingKey=myRoutingKey

它正在取代:

spring.cloud.stream.bindings.approved.destination=myTopic.exchange
spring.cloud.stream.bindings.approved.group=myGroup.approved
spring.cloud.stream.bindings.approved.consumer.back-off-initial-interval=2000
spring.cloud.stream.rabbit.bindings.approved.consumer.queueNameGroupOnly=true
spring.cloud.stream.rabbit.bindings.approved.consumer.bindingRoutingKey=myRoutingKey

和新的班级

@Slf4j
@Service
public class ApprovedReceiver {

    @Bean
    public Consumer<String> approved() {
        // I also saw that it's recommended to not use Consumer, but use Function instead
        // https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_consumer_reactive
        return value -> log.info("value: {}", value);
    }

}

它正在取代

// BindableApprovedChannel.class
@Configuration
public interface BindableApprovedChannel {
    @Input("approved")
    SubscribableChannel getApproved();
}

// ApprovedReceiver.class
@Service
@EnableBinding(BindableApprovedChannel.class)
public class ApprovedReceiver {

    @StreamListener("approved")
    public void handleMessage(String payload) {
        log.info("value: {}", payload);
    }
}

共有1个答案

利稳
2023-03-14

如果您有多个类型为函数供应商消费者的beans(它们可以由第三方库声明),框架不知道要绑定到哪一个。

尝试将spring.cloud.function.definition属性设置为approved

https://docs.spring.io/spring-cloud-stream/docs/3.1.3/reference/html/spring-cloud-stream.html#spring_cloud_function

如果您只有java.util.function.[supplier/function/consumer]类型的单个bean,您可以跳过spring.cloud.function.definition属性,因为这样的函数bean将被自动发现。但是,使用这种属性避免任何混淆被认为是最佳做法。有时,这种自动发现可能会成为障碍,因为java.util.function.[suppliver/function/consumer]类型的单个bean可能用于处理消息以外的其他目的,但作为单个bean是自动发现和自动绑定的。对于这些罕见的场景,您可以通过提供将值设置为false的spring.cloud.stream.function.AutoDetect属性来禁用自动发现。

 类似资料:
  • 使用spring cloud stream和RabbitMQ的配置: Exchange改为和队列改为。怎么修?

  • 我试图理解当以azure工作者角色托管消息队列使用者时的最佳实践。我有许多不同类型消息使用者,它们订阅不同的azure服务总线订阅(或者队列,如果您愿意这样称呼的话)。我想知道是应该在一个Worker角色中为每个使用者实例化多个线程,还是应该为每个使用者部署多个Worker角色。

  • 我正在使用Azure函数进行一个ETL项目,在该项目中,我从blob存储中提取数据,在Python和pandas中转换数据,并使用pandas将数据加载到_sql()。我试图通过使用异步IO和语言工作者来提高这个过程的效率。 我有点困惑,因为我的印象是asyncio使用一个线程工作,但Azure Functions文档说如果你改变配置,你可以使用多个语言工作者,甚至一个不使用async关键字的方法

  • 我是一个新的Kafka。我开始做Kafka,我面临以下问题,请帮助我解决这一个,提前谢谢。首先,我正在编写生产者API,它工作良好,但在编写消费者API时,消息不会显示。 我的代码是这样的: 已订阅主题Hello-Kafka records::org.apache.kafka.clients.consumer.consumerRecords@76b0bfab org.apache.kafka.cl

  • 服务工作者可以做哪些web工作者做不到的事情?或者反之亦然? 看来web工作者是服务工作者功能的一个子集。这是正确的吗?

  • 我为RabbitMQ制作了一个消费者,作为一个用C#.NET编写的控制台应用程序。它被编程为永久监听队列,每当它在队列中发现消息时,它就处理它。使用者平均每秒处理35条消息。使用者被安排在系统启动时在任务计划程序中运行。消费者运行良好的3-4天。但是,它们继续运行,但不处理任何消息,尽管队列中有消息。当使用者停止并再次启动时,它再次开始正确处理消息。但是,当您手动重新启动时,数以百万计的消息排在队