当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者离开消费群体

滕鸿畴
2023-03-14

我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。

  KAFKA_DEFAULT_REPLICATION_FACTOR: 3
  KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
  KAFKA_MIN_INSYNC_REPLICAS: 2
  KAFKA_NUM_PARTITIONS: 8
  KAFKA_REPLICA_SOCKET_TIMEOUT_MS: 30000
  KAFKA_REQUEST_TIMEOUT_MS: 30000
  KAFKA_COMPRESSION_TYPE: "gzip"
  KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
  KAFKA_HEAP_OPTS: "-Xmx768m -Xms768m -XX:MetaspaceSize=96m"

我的情况:

    < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群集) < li >消费者不再使用消息,因为它由于重新平衡而离开了消费者群体

是否存在任何机制来告诉消费者在重新平衡后加入群组?

日志:

 INFO 1 --- [ | loggingGroup] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=kafka-consumer-0, groupId=loggingGroup] Attempt to heartbeat failed since group is rebalancing
 WARN 1 --- [ | loggingGroup] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=kafka-consumer-0, groupId=loggingGroup] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

共有1个答案

慕光霁
2023-03-14

@rostyslav每当我们打电话让消费者阅读消息时,它都会打两个主要电话。

  1. 投票
  2. 提交

轮询基本上是从kafka主题中获取记录,并提交告诉kafka将其保存为已读消息,这样就不会再次读取。虽然轮询几个参数起着主要作用:

    < li >最大轮询记录数 < li >最大轮询间隔毫秒数

仅供参考:变量名称是每个python api的。

因此,每当我们尝试读取来自消费者的消息时,它每max_poll_interval_ms进行一次轮询调用,并且仅在处理获取的记录(如max_poll_records中定义)后才会进行相同的调用。因此,无论何时,max_poll_records未在max_poll_inetrval_ms中处理,我们都会收到错误。

为了克服这个问题,我们需要改变这两个变量中的一个。更改max_poll_interval_ms可能会很忙,因为有时处理某些记录可能需要更长的时间,有时则需要更少的记录。我总是建议使用max_poll_records来解决这个问题。这对我有用。

 类似资料:
  • 我正在阅读Kafka常见问题解答,他们如下所示。 •每个分区不会被每个使用者组中的多个使用者线程/进程使用。这允许每个进程以单线程方式使用,以保证分区内的使用者的顺序(如果我们将有序消息分割成一个分区并将它们传递给多个使用者,即使这些消息是按顺序存储的,它们有时也会被无序地处理)。 有没有可能,

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 我是Kafka的新手,正在学习Kafka内部知识。请根据需要随时更正我的理解。。 这是我的实时场景..感谢所有的回复: 我有一个接收数据文件的实时FTP服务器…比如索赔文件。 我将把这些数据发布到一个主题中.让我们把这个主题称为claims_topic(2个分区). 我需要订阅这个claims_topic,阅读消息并将它们写入Oracle和Postgres表。让我们将oracle表称为Otable

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka