当前位置: 首页 > 知识库问答 >
问题:

Kafka节点多个消费者

朱通
2023-03-14

有一个基本示例,它对1个消费者起作用。它接收消息。但是添加一个额外的消费者将被忽略。

let kafka = require('kafka-node');
let client = new kafka.Client();

let producer = new kafka.Producer(client);

let consumer1 =  new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}]);
let consumer2 =  new kafka.Consumer(client,[ {topic: 'topic2', partition: 0}]);


producer.on('ready', function () {

    producer.send([{topic:'topic1', messages: 'topic 1 msg' ], (err,data)=>{
        console.log(err,'1 sent');
    });
    producer.send([{topic:'topic2', messages: 'topic 1 msg'}], (err,data)=>{
        console.log(err, '2 sent');
    });

});
producer.on('error', function (err) {
    console.log('err', err);
})



consumer1.on('message',(message) =>{
    console.log(11, message);
});
consumer2.on('message',(message) =>{
    console.log(22, message);
})

consumer2的“22”事件从未引发问题。如果我使用命令行工具检查该主题,则该主题的数据存在

共有2个答案

麻鸿熙
2023-03-14

如果kafka clusture中只有一个主题分区,则不能使用多个使用者。如果要使用n个使用者,则必须至少有n个分区。这里将对其进行更详细的解释。

乐正浩言
2023-03-14

您忘记在消费者参数中添加消费者组。在您的情况下,两个消费者都属于一个消费者组(默认情况下称为Kafka节点组)。拥有一个消费者组意味着每条消息将在每个消费者组中传递一次。由于您的主题有0个分区,因此只有第一个使用者将处理该消息。

如果您想向两个消费者发送相同的消息,您必须有两个消费者组(每个组一个消费者)。您可以通过以下方式执行:

let consumer1 =  new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}], {groupId: 'group1'});
let consumer2 =  new kafka.Consumer(client,[ {topic: 'topic2', partition: 0}], {groupId: 'group2'});

如果您希望您的消费者(通过分区)一起处理消息,您需要增加主题中的分区数。

 类似资料:
  • 我正在数据库中为主题外部化kafka消费者元数据,包括消费者组和组中消费者的数量。 Consumer\u info表具有 主题名称,消费者组名称,组中的消费者数量消费者类名称 在app server启动时,我正在读取表并根据表中设置的数字创建使用者(线程)。如果使用者组计数设置为3,我将创建3个使用者线程。这基于给定主题的分区数 现在,如果我需要横向扩展,我如何将属于同一组的消费者分布在多个应用服

  • 我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。

  • 我有一个将消息写入主题/分区的生产者。为了保持顺序,我希望使用单个分区,我希望12个使用者读取来自这个分区的所有消息(没有使用者组,所有消息都应该发送给所有使用者)。这是可以实现的吗?我读过一些论坛,每个分区只有一个用户可以阅读。

  • 如何在apache/kafka中使用regex消费所有主题?我尝试了上面的代码,但不起作用。

  • 问题是Spring Kafka侦听器只配置了主题名。 我似乎可以让Kafka产生100个消费者来处理来自“队列”(日志)的消息。怎么能做到呢?

  • TL;DR;我试图理解一个被分配了多个分区的单个使用者是如何处理reach分区的消费记录的。 例如: 在移动到下一个分区之前,会完全处理一个分区。 每次处理每个分区中的可用记录块。 从第一个可用分区处理一批N条记录 以循环旋转方式处理来自分区的N条记录 我找到了或分配程序的配置,但这只决定了使用者如何分配分区,而不是它如何从分配给它的分区中使用。 我开始深入研究KafkaConsumer源代码,#