当前位置: 首页 > 知识库问答 >
问题:

使用spring cloud stream对类似侦听器的行为进行排队

祁远
2023-03-14

我正在尝试使用spring cloud stream供应商和消费者实现上述场景。

  1. 此应用程序是一个包含生产者和消费者的单一spring boot应用程序

下面是java类

 @Component
public class MultipleFunctionsApplication {
    
    @Bean
    public Consumer<String> sink1() {
        return message -> {         
            System.out.println(new Date() + "----------->>> sink1 - Received message " + message);
        };
    }

    @Bean
    public Consumer<String> sink2() {
        return message -> {         
            System.out.println(new Date() + "----------->>> sink2 - Received message " + message);
        };
    }

}

我正在尝试使用消费者群体功能来实现这一点,如下所示。

spring:
  cloud:
    stream:
      bindings:
        requester1:
          destination: rss-exchange
          group: requester
        requester2:
          destination: rss-exchange
          group: requester
      function:
        bindings:
          sink1-in-0: requester1
          sink2-in-0: requester2          
        definition: sink1;sink2
  application:
    name: rss

当我启动应用程序时,我得到以下错误。

Caused by: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'rss-exchange.requester.errors.recoverer' defined in null: Cannot register bean definition [Generic bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=singleton; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] for bean 'rss-exchange.requester.errors.recoverer': There is already [Generic bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=singleton; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] bound.
at org.springframework.beans.factory.support.DefaultListableBeanFactory.registerBeanDefinition(DefaultListableBeanFactory.java:995) ~[spring-beans-5.3.5.jar:5.3.5]
at org.springframework.context.support.GenericApplicationContext.registerBeanDefinition(GenericApplicationContext.java:330) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.beans.factory.support.BeanDefinitionReaderUtils.registerBeanDefinition(BeanDefinitionReaderUtils.java:164) ~[spring-beans-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotatedBeanDefinitionReader.doRegisterBean(AnnotatedBeanDefinitionReader.java:285) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotatedBeanDefinitionReader.registerBean(AnnotatedBeanDefinitionReader.java:233) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotationConfigApplicationContext.registerBean(AnnotationConfigApplicationContext.java:198) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:687) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:639) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:525) ~[spring-cloud-stream-binder-rabbit-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:136) ~[spring-cloud-stream-binder-rabbit-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:408) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
... 24 common frames omitted

从日志中可以清楚地看出,它正试图再次创建rss-exchange.requester.errors.recoverer。在这种情况下,只有sink1从以下消息开始。

Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chn19388","rssurl":"url random"}
Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chnl02394","rssurl":"http://localhost:8080/test"}
Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chnldkdjaif","rssurl":"url random"}

当我添加“允许-豆类-定义-覆盖:真实”,然后一切正常,如下面的日志所示。

Fri Aug 27 15:03:57 IST 2021----------->>> sink1 - Received message {"channel":"chn19388","rssurl":"url random"}
Fri Aug 27 15:03:57 IST 2021----------->>> sink2 - Received message {"channel":"chnl02394","rssurl":"http://localhost:8080/test"}
Fri Aug 27 15:03:57 IST 2021----------->>> sink2 - Received message {"channel":"chnldkdjaif","rssurl":"url random"}

我不确定这样做是否正确,因为我发现bean的错误已经存在,尽管我正在尝试的用例正在使用覆盖属性。

注意-自从我开始探索流云流只有几天了,所以如果我问了一些愚蠢的问题,请认为我很天真。

共有1个答案

牛越
2023-03-14

因此,该问题已通过您的配置进行了修复和测试,并已合并,可在当前快照(3.2.0-SNAPSHOT)中使用。

 类似资料:
  • 我在我的一个工作流应用程序中使用了Camunda BPMN2.0。在我的一个服务任务中,我在start事件中创建了一个执行侦听器,在create事件中创建了一个任务侦听器。我不确定在开始时同时分配这些是否合适。如果是正确的,它们中的哪一个将首先执行--执行监听器或任务监听器,分别在start或create事件中执行?

  • 问题内容: 我刚刚开始学习如何使用动作侦听器。据我了解,它的工作方式如下: 默认情况下,有些类包含“ addActionListener”方法(例如,按钮类)。 使用此方法,我们向对象添加了动作侦听器。例如:。 当执行带有“ listenedObject”的动作时,将调用“ listeningObject”的“ actionPerformed”方法。因此,这意味着当我们为侦听对象编写类时,我们需要

  • 我有一个银行账户程序,实现了一个扩展到支票账户和储蓄账户子类的银行账户超类。 每个账户有四个属性:名字、姓氏、社会保障和余额。 我有一个BankDatabase类,它创建了一个新的ArrayList来存放这些对象。 我想排序这个ArrayList使用比较接口和comareTo()方法。 我实现了与我的超类类似的功能: 我写了下面的comareTo()方法: 我在BankAccount超类中创建了一

  • 问题内容: 在我可以排序的使用此声明: 我无法使用 Swift 重现相同的语句。我发现的只是使用。 问题答案: 您可以使用Swift的内置排序函数,也可以使用Swift数组,因为Swift数组是桥接的,因此可以直接从swift 调用。 使用Swift的功能: 或者,使用的:

  • 嗨,我有这个问题与我的代码每次我使一个对象在监听器类的监听器将不工作例如。 我在我的主类(CSmain.java)中有这个公共变量: 我在类中还有一个名为getSpawn()的方法,它返回spawn: 我已经在我的onEnable()中初始化了这个变量,但是当我试图从我的另一个类中获取它时,这个类(JoinListener.java)将无法工作。 我测试了它没有创建对象(CSmain main C

  • 如果我向JavaFx属性添加onChange侦听器, 监听器是按顺序调用的吗?如果我有一个字符串属性,然后我把字符串转到“爱丽丝”,然后转到“鲍勃”,我保证在看到“鲍勃”之前看到“爱丽丝”吗?事实上,我不在乎我是否看到“爱丽丝”,只要我看不到“鲍勃”之后 监听器是按顺序调用的吗?我的监听器有可能同时被“爱丽丝”和“鲍勃”通知调用吗?