当前位置: 首页 > 知识库问答 >
问题:

春云流-消费群体绑定

蔚承天
2023-03-14

我的使用者绑定到匿名使用者组,而不是我指定的使用者组。

spring:
  cloud:
    stream:
      html" target="_blank">kafka:
        binder:
          brokers: localhost
          defaultBrokerPort: 9092
          zkNodes: localhost
          defaultZkPort: 2181
        bindings:
          inEvent:
            group: eventin
            destination: event
          outEvent:
            group: eventout
            destination: processevent

我的春靴应用

@SpringBootApplication
@EnableBinding(EventStream.class)
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
    @StreamListener(value = "inEvent")
    public void getEvent(Event event){
        System.out.println(event.name);
    }
}

我的输入输出通道接口

public interface EventStream {
    @Input("inEvent")
    SubscribableChannel inEvent();
    @Output("outEvent")
    MessageChannel outEvent();
}

我的控制台日志--

:在3.233秒内启动ConsumerApplication(JVM运行于4.004):[使用者Clientid=Consumer-3,Groupid=Anonymous.0D0C87D6-EF39-4BFE-B475-4491C40CAF6D]发现组协调器Singh:9092(ID:2147483647 RACK:null):[使用者Clientid=Consumer-3,Groupid=Anonymous.0D0C87D6-EF39-4BFE-B475-4491C40CAF6D]撤消以前分配的分区[]:撤消的分区:[]:[使用者Clientid=Consumer-3,Groupid=Anonymous.[使用者Clientid=Consumer-3,Groupid=Anonymous.0D0C87D6-EF39-4BFE-B475-4491C40CAF6D]成功加入具有第1代的组:[使用者Clientid=Consumer-3,Groupid=Anonymous.0D0C87D6-EF39-4BFE-B475-4491C40CAF6D]设置新分配的分区[inEvent-0]:[使用者Client=Consumer-3,Groupid=Anonymous.0D0C87D6-EF39-4BFE-B475-4491C40CAF6D]将分区inEvent-0的偏移量重置为:分配的分区:[inEvent-0]

共有1个答案

伯丁雷
2023-03-14

group属性不能在Kafka树中。必须是这样的:

我的使用者绑定到匿名使用者组,而不是我指定的使用者组。

spring:
  cloud:
    stream:
       bindings:
          inEvent:
            group: eventin
            destination: event

请参阅文档中的更多信息:http://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.1.release/single/spring-cloud-stream.html#consumer-groups

是一个公共属性,因此它与绑定器实现无关。Kafka是针对Apache Kafka特定的属性,在绑定器实现级别上公开。

 类似资料:
  • 我正试图按照GitHub的建议设置测试 其中StreamProcessor设置为 -->line从不使用在我看来应该在主题“output”上的消息,因为@StreamProcessor有@Sendto(“output”) 我希望能够测试流处理的消息。

  • 我试图改变生产者和消费者配置的顺序,但没有帮助。 编辑:我已经添加了完整的application.yml。当我第一次引导服务时,这个主题在Kafka中是不存在的。 它感觉在生产者和消费者配置之间有冲突,我认为它说有3个分区的原因是消费者中的并发性是3,所以它首先创建有3个分区的主题,然后当它移动到生产者配置时,它不调整分区计数。

  • 当我只打开一次处理时,我会得到以下错误。注意:我们的应用程序非常安全,我们只允许Kafka用户和消费者访问他们明确需要的资源。 只有一次处理kafka流是否在所有流任务中使用每个流任务的消费者组而不是消费者组?

  • 我是Kafka的新手,正在学习Kafka内部知识。请根据需要随时更正我的理解。。 这是我的实时场景..感谢所有的回复: 我有一个接收数据文件的实时FTP服务器…比如索赔文件。 我将把这些数据发布到一个主题中.让我们把这个主题称为claims_topic(2个分区). 我需要订阅这个claims_topic,阅读消息并将它们写入Oracle和Postgres表。让我们将oracle表称为Otable

  • 我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群

  • 我们希望获得Kafka消费群体指标(例如,节流和字节率)。 我们已经使用以下工具完成了此操作: Kafka消费者Java应用程序的JMX Mbean CLI实用程序: bin/kafka-consumer-groups.sh--描述--组group_name--bootstrap-serverlocalhost: port . 问题:这可以通过使用一些Java库以编程方式完成吗? 到目前为止,我们