当前位置: 首页 > 知识库问答 >
问题:

Kafka的消费群体是如何运作的?

商运锋
2023-03-14

嗨,我正在使用KafkaCLI,以清楚地了解Kafka的工作原理。我对消费者群体感到困惑。我用三个分区创建了主题。我将创建producer,为主题提供一些数据。第一次我添加了一些数据,如下所示。

kafka-console-producer --broker-list 127.0.0.1:9092 --topic users 
>user1
kafka-console-producer --broker-list 127.0.0.1:9092 --topic users 
>user2
kafka-console-producer --broker-list 127.0.0.1:9092 --topic users 
>user3

现在我的理解是user1、user2、user3会随机到三个不同的分区。

创建消费群时,如下所示。

kafka-console-consumer --bootstrap-server localhost:9092 --topic users  --group user_group

这将给我所有的user1、user2、user3。

现在,在一个消费者组中,我可以有许多消费者。如果消费者组中有三个消费者,则第一个消费者将从partition1读取,第二个消费者将从consumer2读取,然后第三个消费者将从consumer3读取。这是我目前的理解。如果我的理解正确,那么演示上述行为的cli命令是什么?我只知道上面提到的一个命令将返回所有数据?如果我的上述理解是正确的,那么如果所有消费者都需要所有数据,那么如何获取?有人能帮我理解这个概念吗。任何帮助都将不胜感激。谢谢

共有2个答案

孟跃
2023-03-14

您需要同时运行三个控制台使用者和组选项,以观察您期望的行为。

分区随机分配给组成员,分区1不一定会分配给第一个使用者,依此类推。组中的第一个也是唯一一个使用者将始终获得所有分区,当第二个使用者加入时,其中一个使用者正在读取两个分区

易京
2023-03-14


让我们从理解分区与消费者的关系开始

假设我有一个名为T1的主题,有4个分区和1个消费者组。在这种情况下,使用者组1将被分配到所有分区中的使用者-

现在,当我们将另一个消费者添加到同一消费者组时,分区将在他们之间均匀分布-

当添加另一个消费者时,依此类推,直到该主题中的分区数量-

在给定主题中添加超过分区数量的更多使用者将导致空闲使用者-

这基本上意味着您受单个主题中分区数量的限制。

消费者如何加入消费者组?
当消费者想要加入消费者组时,他会向组协调员发送JoinGroup请求。第一个加入组的人成为组长,他负责根据预定义的分配策略将分区的子集分配给每个消费者。
在决定每个消费者的分区分配后,消费者领导者将向组协调员发送分配分区列表,他将将此信息发送给组内的所有消费者。

如何选择分配策略?
Kafka支持少数可以使用partition.assignment.strategy参数html" target="_blank">控制的分配策略。
策略是RangeAssignor、RoundRobinAssignor和StickyAssignor,默认策略是RangeAssignor

你可以在这篇有用的博文中了解更多关于他们的信息。

如何查看它
我建议使用Kafka管理器之类的工具,可以帮助您可视化消费者与主题的关系。

 类似资料:
  • 我是Kafka的新手,正在学习Kafka内部知识。请根据需要随时更正我的理解。。 这是我的实时场景..感谢所有的回复: 我有一个接收数据文件的实时FTP服务器…比如索赔文件。 我将把这些数据发布到一个主题中.让我们把这个主题称为claims_topic(2个分区). 我需要订阅这个claims_topic,阅读消息并将它们写入Oracle和Postgres表。让我们将oracle表称为Otable

  • 我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群

  • 我正在阅读Kafka常见问题解答,他们如下所示。 •每个分区不会被每个使用者组中的多个使用者线程/进程使用。这允许每个进程以单线程方式使用,以保证分区内的使用者的顺序(如果我们将有序消息分割成一个分区并将它们传递给多个使用者,即使这些消息是按顺序存储的,它们有时也会被无序地处理)。 有没有可能,

  • 我们希望获得Kafka消费群体指标(例如,节流和字节率)。 我们已经使用以下工具完成了此操作: Kafka消费者Java应用程序的JMX Mbean CLI实用程序: bin/kafka-consumer-groups.sh--描述--组group_name--bootstrap-serverlocalhost: port . 问题:这可以通过使用一些Java库以编程方式完成吗? 到目前为止,我们

  • 我有一个问题,来自一个消费群体的许多Kafka客户端没有正确关闭,因此Kafka集群认为它们仍然是连接的。因此,我无法使用新版本的客户端连接到消费者组。会卡在再平衡这一步。 根据文档,应在< code>session.timeout.ms或maximum < code > group . max . session . time out . ms 之后删除它们。起初我试图将< code>sessi

  • 我是Kafka的新手,我将非常感谢关于下一个案件的澄清。 Kafka文档在“消费者立场”一段中说: 问题是,如果只有一个消费者能够拉出特定的信息,那么如何向多个消费者群体广播呢?