当前位置: 首页 > 知识库问答 >
问题:

小组中的Kafka消费者跳过分区

司寇星海
2023-03-14

我有一个单一的消费者在消费一个话题。主题有6个分区。分配给该组的单个使用者。我会像下面的consumer.poll(10000)那样进行轮询,当没有记录返回时,我会退出使用者提取。

从文档来看,我相信当没有记录要消耗时,轮询返回空,并且持续时间10000足以重新平衡和获取记录。大多数情况下,poll从所有分区中消耗记录,但有些情况下,poll从3个分区中提取记录并返回空记录而不消耗其他3个分区。

顺便说一句,我使用的是2.0.1的Kafka客户端,而Kafka服务器版本是2.11-2.2.0。

共有1个答案

燕琛
2023-03-14

max.poll.records参数默认为500。因此有时可能无法通过一个poll()从主题中的所有分区获取所有消息

max.poll.records:对poll()的单个调用中返回的最大记录数。

顺便说一下,在组中只有一个使用者并不适合使用带有分区的主题。在最佳实践中,使用者组中使用者的数量应该等于主题中订阅的分区的数量。(默认情况下,Kafka将分区平均分配给使用者)否则,您无法水平地缩放负载,在这种情况下,拥有分区就没有那么大的意义了。

但是在您的例子中,因为退出了消费者,所以需要一些时间(session.timeout.ms)来认为这个消费者按照Kafka的方式已经死亡。如果您在不等待session.timeout.ms通过的情况下再次启动使用者,那么Kafka会意识到consumer组中有两个活动使用者,并将分区平均分配给这两个使用者。(比如:分区0,1,2分配给consumer-1,分区3,4,5分配给consumer-2)但是在Kafka意识到其中一个consumer已经死亡之后,在consumer组中开始重新平衡,所有分区都分配给consumer组中的一个活动的consumer。

session.timeout.ms:当使用Kafka的组管理工具时,用于检测客户端故障的超时。客户端向代理发送周期性心跳以指示其活跃度。如果在此会话超时到期之前,代理没有接收到心跳,则代理将从组中删除此客户端并启动重新平衡。请注意,该值必须在由group.min.session.timeout.ms和group.max.session.timeout.ms在代理配置中配置的允许范围内

您可以在代理端使用以下cli命令检查使用者组的当前分区分配:

./kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group yourConsumerGroup
 类似资料:
  • 我们有一个非常简单的Kafka Consumer(v 2.6.2)。它是使用者组中唯一的使用者,并且该组是唯一一个阅读主题的组(有6个分区,其中有大约300万个事件)。Broker也是2.6.x版本 由于我们需要实现一个“只有一次”的场景,我们深入研究了一下,如果我们真的只使用一次写入主题的每个事件。不幸的是,我们发现:消费者有时会跳过一个偏移量,有时甚至会跳过一组分区的偏移量。 消费者除了记录之

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现 1)高级消费者/消费者群体 2)简单消费者 高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我

  • 我正在阅读Kafka常见问题解答,他们如下所示。 •每个分区不会被每个使用者组中的多个使用者线程/进程使用。这允许每个进程以单线程方式使用,以保证分区内的使用者的顺序(如果我们将有序消息分割成一个分区并将它们传递给多个使用者,即使这些消息是按顺序存储的,它们有时也会被无序地处理)。 有没有可能,

  • 我有一个Kafka集群正在运行,当重新启动应用程序(消费者)时,它会跳过一些在应用程序关闭时推送到主题的消息。 当应用程序启动时,我可以看到它读取带有偏移量的消息,然后将偏移量推送到。然后当应用程序关闭时,带有偏移量的消息被推送到主题。重启应用程序后,它读取并将其偏移量设置为,因此跳过。 这是我的配置:

  • 我知道kafka将一个主题的数据安排在许多分区上,一个消费者组中的消费者被分配到不同的分区,从那里他们可以接收数据: 我的问题是: 术语,它们是由主机/IP标识的,还是由客户端连接标识的? 换句话说,如果我启动两个线程或进程,使用相同的消费者组运行相同的Kafka客户端代码,它们被认为是一个消费者还是两个消费者?