当前位置: 首页 > 知识库问答 >
问题:

集团财产不工作在春流云?

吴正祥
2023-03-14

我使用Spring Stream云来消费Kafka上的消息。当Kafka上的信息产生时,所有的消费者都受到了冲击。

但Kafka的文献表明,通过使用群体,只有一个消费者消费信息。

这是我的消费代码

@EnableBinding(Sink.class)

public class Consumer2 {


    @StreamListener(target = Sink.INPUT)
    public void consume(String message) {
        System.out.println("33333");

    }

    @StreamListener(target = Sink.INPUT)
    public void consume1(String message) {
        System.out.println("444444");

    }


}
}

这是我的配置,但我的两个方法都调用了:(

spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          brokers:
            - localhost:9092
      bindings:
        input:
          binder: kafka
          destination: abbas
          content-type: text/plain
          group: input-group-1

        output:
          binder: kafka
          destination: abbas
          group: output-group-1
          content-type: text/plain

共有2个答案

奚晟
2023-03-14

在一个实例中,组没有影响。

当运行应用程序的多个实例时,每次输入通道中有新消息时,都会通知所有订阅者。

大多数时候,我们只需要处理一次消息。Spring Cloud Stream通过消费者组实现了这种行为。

要启用此行为,每个使用者绑定都可以使用spring。云流动绑定。。要指定组名称的组属性:

https://www.baeldung.com/spring-cloud-stream第5.4节

傅树
2023-03-14

使用该配置,您只有1个消费者(SINK.INPUT),而不是2个消费者(StreamListener不是消费者,它是处理入站消息的模型)

这就是为什么Spring将入站消息路由到您的两个@StreamListener使用相同的接收器进行注释的原因。

 类似资料:
  • 现在我正在尝试用kafka创建消息服务功能以使用< code > spring-cloud-stream-bind-Kafka ,但效果不太好。 Spring罩1.4.2 当我使用此错误日志启动项目时失败 我在怀疑我的春靴版本。这么低配的版本。< br >我认为< code > spring-cloud-stream-binder-Kafka 在spring boot 2.0版本下无法使用或者其他

  • 985文科硕 两段产运实习,其中一段是阅文集团。 5月21号下午2点15场, 到3点30面完了hr面 速通1面2面hr面 推进太快有点惊讶,连面三场也是真的累 非技术岗位嘛,也没有问一些特别的问题,主要就是过往经历的挖掘,对AI方向的兴趣等,所以也没有特别多的面经可以分享。 另外就是问了一下offer情况,到岗时间,实习时长等。 hr说预计5月底会出结果 坐等吧

  • Centralized Workflow。项目的所有协作者把对项目的修改推送到统一的远程仓库,这就是集中式工作流。其它的 Git 工作流基本都是基于这种工作流程做了一些扩展。 项目的发起者在自己电脑上创建了一个本地仓库,他又为项目在远程创建了一个仓库,这个远程仓库就是所有协作者要把提交推送到的地方。这个远程仓库在谁家那创建都无所谓,可以用 Github,Coding.net,阿里云 Code,也可

  • 问题内容: 我正在使用Spring 3和PropertyPlaceholderConfigurator。 我的属性代码如下所示: 如果我的.properties文件中没有prop1,则spring无法初始化其上下文。 问题是如何定义该属性不是必需的?一些注释,配置? 问题答案: 您可以使用默认值: 如果未定义该属性,spring将注入一个空字符串。语法为。

  • 问题内容: 在python中,我可以使用装饰器向类添加方法。是否有类似的装饰器将属性添加到类?我可以更好地表明我在说什么。 我上面使用的语法是否可能还是需要更多的语法? 我想要类属性的原因是可以延迟加载类属性,这似乎很合理。 问题答案: 这是我的处理方式: 在我们打电话时,设置员没有工作 ,因为我们正在打电话 ,而不是。 添加元类定义可以解决此问题: 现在一切都会好起来的。

  • 我试图改变生产者和消费者配置的顺序,但没有帮助。 编辑:我已经添加了完整的application.yml。当我第一次引导服务时,这个主题在Kafka中是不存在的。 它感觉在生产者和消费者配置之间有冲突,我认为它说有3个分区的原因是消费者中的并发性是3,所以它首先创建有3个分区的主题,然后当它移动到生产者配置时,它不调整分区计数。