当前位置: 首页 > 知识库问答 >
问题:

一个Kafka消费者如何从多个分区读取数据?

周朗
2023-03-14

我想知道一个使用者如何从多个分区使用消息,具体来说,从不同的分区读取消息的顺序是什么?

我看了一眼源代码(Consumer,Fetcher),但我不能完全理解。

这是我以为会发生的:

分区是顺序读取的。也就是说:在继续下一个分区之前,一个分区中的所有消息都将被读取。如果我们达到< code>max.poll.records而没有消耗整个分区,则下一次读取将继续读取当前分区,直到耗尽为止,然后继续下一次读取。

我试着将max.poll.records设置为相对较低的数字,看看会发生什么。如果我向一个主题发送消息,然后启动一个消费者,那么在继续下一个分区之前,所有消息都会从一个分区中读取,即使该分区中的消息数量高于max.poll.records

然后,我尝试通过不断地向一个分区发送消息(使用JMeter ),看能否将消费者“锁定”在该分区中。但是我做不到:来自其他分区的消息也被读取。

共有2个答案

微生承业
2023-03-14

我已经阅读了评论中链接的问题的答案中提到的KIP,我想我终于理解了消费者是如何工作的。

有两个主要的配置选项会影响数据的使用方式:

> < Li > < p > < code > max . partition . fetch . bytes :服务器将为给定分区返回的最大数据量

< code>max.poll.records:每次消费者投票时返回的最大记录数量

从每个分区获取数据的过程是贪婪的,并且以循环方式进行。贪婪意味着将从每个分区检索尽可能多的记录;如果分区中的所有记录占用小于max.partition.fetch。字节,则将获取所有字节;否则,只有<code>max.partition.fetch。将获取字节。

现在,并非所有提取的记录都将在轮询调用中返回。将只返回< code>max.poll.records。

剩余的记录将被保留用于下一次呼叫轮询。此外,如果保留的记录数小于< code>max.poll.records,轮询方法将在返回之前开始新一轮的获取(预取)。这意味着,通常在获取新记录的同时,使用者也在处理记录。

如果某些分区接收的消息比其他分区多得多,这可能导致不太活跃的分区长时间不被处理。

这种方法的唯一缺点是,当分区各自的消息速率之间存在很大的不平衡时,它可能会导致一些分区长时间不被使用。例如,假设最大消息数设置为1的使用者从分区A和B获取数据,如果返回的获取包括来自A的1000条记录,而没有来自B的记录,则使用者必须处理来自A的所有1000条可用记录,然后才能再次在分区B上获取。

为了防止这种情况,我们可以减少最大分区。.

韩阳云
2023-03-14

消费者正在以贪婪的循环方式轮询来自其分配分区的消息。例如,如果max.poll.records设置为100,并且分配了2个分区A,B。消费者将尝试轮询来自A的100条消息。如果分区A没有100条可用消息,它将轮询来自分区B的100条消息的剩余内容。

虽然这并不理想,但这样就不会有分区挨饿。

这也解释了为什么分区之间不能保证排序。

 类似资料:
  • TL;DR;我试图理解一个被分配了多个分区的单个使用者是如何处理reach分区的消费记录的。 例如: 在移动到下一个分区之前,会完全处理一个分区。 每次处理每个分区中的可用记录块。 从第一个可用分区处理一批N条记录 以循环旋转方式处理来自分区的N条记录 我找到了或分配程序的配置,但这只决定了使用者如何分配分区,而不是它如何从分配给它的分区中使用。 我开始深入研究KafkaConsumer源代码,#

  • 我有一个Kafka系统,看起来像这样(所有消费者都在一个消费者群体中): 在每个消费者中,我轮询消息,然后进行昂贵的计算(从1到60秒)。如果操作成功,我将提交消费者。 在我提交之前,另一个使用者是否会开始处理相同的消息?我需要保证,一旦消息被拾取,它就会被只执行一次 - 除非处理中途失败。

  • 我有一个将消息写入主题/分区的生产者。为了保持顺序,我希望使用单个分区,我希望12个使用者读取来自这个分区的所有消息(没有使用者组,所有消息都应该发送给所有使用者)。这是可以实现的吗?我读过一些论坛,每个分区只有一个用户可以阅读。

  • null 我在这一页上读到以下内容: 使用者从任何单个分区读取,允许您以与消息生成类似的方式扩展消息消耗的吞吐量。 也可以将使用者组织为给定主题的使用者组-组内的每个使用者从唯一分区读取,并且组作为一个整体使用来自整个主题的所有消息。 如果使用者多于分区,则某些使用者将空闲,因为它们没有可从中读取的分区。 如果分区多于使用者,则使用者将从多个分区接收消息。 如果使用者和分区的数量相等,则每个使用者

  • 有一个16个分区的Kafka主题 使用给定的消费者组名称,我们目前正在启动单个消费者来阅读该主题。 > 单个消费者是否从该主题的(仅)读取?如果带有消息为空,消费者是否从下一个分区开始读取(...等等)? 我们可以选择启动多个消费者(使用相同的消费者组名称)来读取相同的主题(有16个分区)。为了并行读取多个分区,可以维护多少消费者?

  • 由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。