当前位置: 首页 > 知识库问答 >
问题:

为多个主题创建一个Kafka消费者

王辉
2023-03-14

我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示:

private Consumer createConsumer() {
    Properties props = getConsumerProps();
    Consumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    ArrayList<String> topicMISL = new ArrayList<>();
    for (String s:Connect2Redshift.kafkaTopics) {
        topicMISL.add(systemID + "." + s);
    }
    consumer.subscribe(topicMISL);
    return consumer;
}


private boolean consumeMessages( Duration duration, Consumer<String, byte[]> consumer) {
        try {  Long start = System.currentTimeMillis();
            ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(duration);
            }
   }

之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?

共有2个答案

白侯林
2023-03-14
ConsumerRecords<String, String> records = consumer.poll(long value);
    for (TopicPartition partition : records.partitions()) {
        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
        for (ConsumerRecord<String, String> record : partitionRecords) {
            
        }
        
    }

还需要通过查找offset和使用consumer进行commit for offset。委员会同步

阙阳
2023-03-14

一般来说,这取决于您的主题设置。kafka通过使用每个主题的多个分区来缩放。

  • 如果一个主题有3个分区,Kafka可以并行读取
  • 对于多个主题也是如此,阅读可以并行进行

如果您有一个分区接收的消息比其他分区多得多,那么您可能会遇到这个特定分区的使用者延迟的情况。调整批量大小和消费者设置可能会对他们有所帮助,同时也会压缩消息。理想情况下,确保均匀分布负载可以避免这种情况。

看看这篇博客文章,它让我对内在有了很好的理解:https://www.confluent.io/blog/configure-kafka-to-minimize-latency/

 类似资料:
  • 我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。

  • 我有一个spring boot项目,我是spring-kafka来连接底层的kafka事件枢纽。 我不得不在同一节消费者课上听2个不同的话题。我有两种方法可以这样做。 一个是要有两个这样的Kafka听众: 另一种方法是在同一个kafkaListener中有两个主题,如下所示 ===================edit===============application.yml中的Kafka属性

  • 我刚刚开始玩弄《Spring-Cloud-Stream》中的Kafka活页夹。 我配置了一个简单的消费者: 但当我启动应用程序时,我看到在启动日志中创建了三个独立的消费者配置: 我发现这些配置之间唯一不同的是客户机。id。 除此之外,我不知道为什么只有一个消费者有三种配置。 是因为我也在运行吗? 这是我的:

  • 我试着把这个理论与缩放工人做比较。 但是,使用版本1.2.1时,storm Kafka spout在多个不同的拓扑中的行为并不像我预期的那样。 为单个主题的所有拓扑中的kafka spout使用者设置一个公共client.id和group.id,每个拓扑仍然订阅所有可用的分区和重复的元组,并在重新提交已提交的元组时抛出错误。 如果有人能解释一下 Kafka喷口的这种行为的实现逻辑是什么? 有解决此

  • 如何在apache/kafka中使用regex消费所有主题?我尝试了上面的代码,但不起作用。

  • 我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。