当前位置: 首页 > 知识库问答 >
问题:

使用Spring Cloud Stream在使用者端设置到同一目的地和组的多个绑定失败

单于智
2023-03-14

我打算为Kafka用户弹性设置多个绑定。更具体地说,备份侦听器与主侦听器期望代理IPS具有相同的目的地和组。备份侦听器的自动启动在启动时关闭,但在故障转移发生时将以编程方式打开。

org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer: 
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:461) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:90) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:169) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:126) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:112) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:1.8.0_181]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at com.example.demo.DemoApplication.main(DemoApplication.java:14) ~[classes/:na]
Caused by: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'my-kafka-topic-name.my-kafka-consumer-group-name.errors.recoverer' defined in null: Cannot register bean definition [Root bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] for bean 'my-kafka-topic-name.my-kafka-consumer-group-name.errors.recoverer.errors.recoverer': There is already [Root bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] bound.
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.registerBeanDefinition(DefaultListableBeanFactory.java:927) ~[spring-beans-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.GenericApplicationContext.registerBeanDefinition(GenericApplicationContext.java:323) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.GenericApplicationContext.registerBean(GenericApplicationContext.java:471) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:681) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:633) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:644) ~[spring-cloud-stream-binder-kafka-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:148) ~[spring-cloud-stream-binder-kafka-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:407) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    ... 24 common frames omitted
spring:
  cloud:
    function:
      definition: mainListener;backupListener;
    stream:
      default-binder: my-main-binder
      function:
        bindings:
          mainListener-in-0: event-in
          backupListener-in-0: backup-event-in
      bindings:
        event-in:                                 <-- main listener
          binder: my-main-binder
          consumer:
            qulifer: local
            use-native-decoding: true
          destination: my-kafka-topic-name
          group: my-kafka-consumer-group-name
        backup-event-in:                          <-- backup listener with autoStartup turned off
          binder: my-backup-binder
          consumer:
            autoStartup: false
            use-native-decoding: true
          destination: my-kafka-topic-name        <-- same destination and group as the main listener
          group: my-kafka-consumer-group-name
      binders:
        my-main-binder:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: ip_1; ip_2;
        my-backup-binder:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: backup_ip_1; backup_ip_2;

启动消费者时引发异常-(不能为微服务中的不同通道分配相同的组名)

https://github.com/garyrussell/spring-cloud-stream/commit/5b87b8cae494ae9568d924f64adc436374a67ea7

更新#1:按照建议将allog-bean-definition-overriding配置为true后,我看到了一个不同的异常,如下所示:

org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer: 
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:461) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:90) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:169) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:126) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:112) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:1.8.0_181]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) [spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at com.example.demo.DemoApplication.main(DemoApplication.java:14) ~[classes/:na]
Caused by: java.lang.IllegalStateException: Only one LastSubscriberMessageHandler is allowed
    at org.springframework.cloud.stream.binder.BinderErrorChannel.subscribe(BinderErrorChannel.java:44) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:712) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:633) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:644) ~[spring-cloud-stream-binder-kafka-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:148) ~[spring-cloud-stream-binder-kafka-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:407) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
    ... 24 common frames omitted

共有1个答案

扈阳辉
2023-03-14

默认情况下当前不支持该配置;错误基础结构bean名称基于组和目的地。

如果您不介意为两个使用者使用相同的错误基础结构(在本例中听起来不错),您可以设置

spring.main.allow-bean-definition-overriding=true
 类似资料:
  • 使用StreamBridge,我将包含两种不同类型的对象的消息发送到单个Kafka主题。有没有办法定义一个能够使用两种类型消息的Spring Cloud Stream的功能消费者?

  • 我正在尝试将套接字绑定到以下端口: 但它给出: 为什么会出现这个错误呢?

  • 我有一组验证器类,它们都扩展了一个公共抽象类,它们都具有相同的构造函数 我想要Guice的能力 < li >知道如何构造所有这些验证器,将相同的数据源注入所有这些验证器中 < li >使用多绑定器将所有验证器注入到一个类中 通过一些谷歌搜索,看起来我可以使用,但问题是如果我想这样做,我必须为每个验证器创建一个工厂,这是一个巨大的样板。因为他们都有相同的构造函数,我觉得一定有更好的方法。我的对象是在

  • 有可能吗?如果是,使用消息并将消息发送到数据库的简单方式可能不安全。

  • 我正在尝试用https://github.com/php-amqplib/rabbitMQBundle和Symfony2框架实现RabbitMQ。 我已经设法使这个东西在一个生产者和一个消费者的情况下工作,但问题是当我使用多个消费者的时候。 这是我的配置: [Symfony\Component\DependencyInjection\Exception\ServiceNotFoundExcepti

  • 问题内容: 我正在尝试将套接字绑定到以下端口: 但是它给出: 为什么会发生此错误? 问题答案: 该错误通常表示您尝试打开的端口已被另一个应用程序使用。尝试使用netstat查看哪些端口已打开,然后使用可用端口。 还检查您是否绑定到正确的IP地址(我假设它是本地主机)