当前位置: 首页 > 知识库问答 >
问题:

Kafka多主题消费

仲孙昊焱
2023-03-14
   consumer.subscribe(Pattern.compile(".*"),new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> clctn) {

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> clctn) {
            }            
        });

如何在apache/kafka中使用regex消费所有主题?我尝试了上面的代码,但不起作用。

共有1个答案

连曜灿
2023-03-14

对于正则表达式,请使用以下签名

KafkaConsumer.subscribe(Pattern pattern, ConsumerRebalanceListener listener)

例如。以下代码片段使消费者能够收听带有前缀my_topics_的所有主题

ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {

  @Override
  public void onPartitionsRevoked(Collection<TopicPartition> arg0) {
    // Don't need it now.
  }

  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> arg0) {
    // Don't need it now.
  }
};

pattern = Pattern.compile("my_topics_.*");
kafkaConsumer.subscribe(pattern, listener);
 类似资料:
  • 我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。

  • 然而,当在我的环境中测试此示例时,我得到了一个异常。

  • 我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。

  • 我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测

  • 我正在尝试以avro格式对kafka消息进行解密我使用了以下代码:https://github.com/ivangfr/springboot-kafka-debezium-ksql/blob/master/kafka-research-consumer/src/main/java/com/mycompany/kafkaResearchconsumer/kafka/reviewsconsumerco

  • 我有一个spring boot项目,我是spring-kafka来连接底层的kafka事件枢纽。 我不得不在同一节消费者课上听2个不同的话题。我有两种方法可以这样做。 一个是要有两个这样的Kafka听众: 另一种方法是在同一个kafkaListener中有两个主题,如下所示 ===================edit===============application.yml中的Kafka属性