当前位置: 首页 > 知识库问答 >
问题:

在同一消费阶层中消费多个Kafka主题

缪嘉志
2023-03-14

我有一个spring boot项目,我是spring-kafka来连接底层的kafka事件枢纽。

我不得不在同一节消费者课上听2个不同的话题。我有两种方法可以这样做。

一个是要有两个这样的Kafka听众:

 @KafkaListener(topics = "topic1")
public void consumeTopic1(String message) throws Exception {  
   //do something
  }

 @KafkaListener(topics = "topic2")
public void consumeTopic2(String message) throws Exception {  
   //do something
  }

另一种方法是在同一个kafkaListener中有两个主题,如下所示

 @KafkaListener(topics = {"topic1", "topic2"})
public void consumeTopics(String message) throws Exception {  
   //do something
  }

===================edit===============application.yml中的Kafka属性如下所示:

 kafka:
    properties:
      topics:
        topic1: topic1
        topic2: topic2
    bootstrap-servers: server1,server2
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retries: 4
    consumer:
      group-id: mygroupid
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

共有1个答案

拓拔高畅
2023-03-14

>

  • 创建多个主题的目的是在多个
    内核/进程/线程之间传播数据,并
    随后传播数据的处理。

    即使这样,您也可以从多个主题的单个使用者开始,直到在队列中处理事件时观察到延迟。然后你就可以把它们分开。

  •  类似资料:
    • 我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。

    • 如何在apache/kafka中使用regex消费所有主题?我尝试了上面的代码,但不起作用。

    • 我的问题与单个消费者从多个话题消费有关。假设所有主题都加载了1M个记录,一个使用者必须处理这些记录。它将按照什么顺序从主题中读取(我的意思是首先读取哪个主题/分区,等等) Kafka内部资料的任何链接会有帮助吗?

    • 我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。

    • 我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。

    • null 我在这一页上读到以下内容: 使用者从任何单个分区读取,允许您以与消息生成类似的方式扩展消息消耗的吞吐量。 也可以将使用者组织为给定主题的使用者组-组内的每个使用者从唯一分区读取,并且组作为一个整体使用来自整个主题的所有消息。 如果使用者多于分区,则某些使用者将空闲,因为它们没有可从中读取的分区。 如果分区多于使用者,则使用者将从多个分区接收消息。 如果使用者和分区的数量相等,则每个使用者