当前位置: 首页 > 知识库问答 >
问题:

Kafka使用者配置中heartbeat.interval.ms和session.timeout.ms之间的差异

毋胜涝
2023-03-14

我当前运行的是Kafka0.10.0.1,两个值的对应文档如下:

heartbeat.interval.ms-在使用Kafka的组管理设施时,向消费者协调器发送心跳的预期间隔时间。heartbeat用于确保消费者的会话保持活动,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于session.timeout.ms,但通常不应设置为高于该值的1/3。可以调得更低,以控制正常再平衡的预期时间。

session.timeout.ms-在使用Kafka的组管理工具时用于检测故障的超时。当会话超时内没有接收到使用者的心跳时,代理会将使用者标记为失败并重新平衡组。因为只有在调用poll()时才发送心跳,所以较高的会话超时允许在使用者的poll循环中有更多的时间处理消息,代价是检测硬故障的时间较长。另请参阅max.poll.records以获得控制轮询循环中处理时间的另一个选项。

我不清楚为什么文档建议将heartbeat.interval.ms设置为session.timeout.ms的1/3。由于只有在调用poll()时才发送心跳,因此在完成对当前记录的处理时,使这些值相同是否没有意义?

共有2个答案

阎俊英
2023-03-14

代码做了一个硬限制,您不能将heartbeat.interval.ms设置为不小于request.timeout.ms,否则Kafka会抱怨“heartbeat必须设置为低于会话超时”。

如果您真的将这两个配置设置为相同的值,那么可能出现的情况是网络客户端将不再进行heartbeat,因为会话超时几乎总是在进行heartbeat之前发生。

至于那1/3,我更倾向于认为这是一个启发式的值。

葛桐
2023-03-14

heartbeat.interval.ms指定消费者发送心跳信号的频率。因此,如果这是3000 ms(默认),那么每隔3秒,使用者将向代理发送心跳信号。

session.timeout.ms指定代理需要从使用者获取至少一个心跳信号的时间。否则,它会将消费者标记为死亡。默认值10000毫秒(10秒)可在经纪人将消费者标记为死亡之前,防止丢失三个心跳信号。

在一个重负载的网络设置中,错过很少的心跳信号是正常的。所以建议等错过3个心跳信号后再将消费者标记为死亡。这就是1/3推荐的理由。

 类似资料:
  • 我不清楚为什么我们同时需要和以及我们何时使用其中之一或两者?这两个设置似乎都指出了协调器在假定消费者已死亡之前等待从消费者获取心跳的时间上限。 另外,它在基于KIP-62的0.10.1.0+版本中的表现如何?

  • 我试图理解以下两种合流的消费者配置的默认值是如何一起工作的。 max.poll.interval.ms-根据汇流文档,默认值为300,000毫秒 session.timeout.ms-根据汇合文档,默认值为10,000 ms 例如,假设消费者每3,000 ms发送一次心跳,我的第一次轮询发生在时间戳t1,然后第二次轮询发生在t1+20,00 ms。那么是否会因为超出“session.timeout

  • 试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移

  • AFAIK,max.poll.interval.ms是Kafka 0.10.1中引入的。然而,还不清楚我们什么时候可以同时使用session.timeout.ms和max.poll.interval.ms,考虑使用心跳线程没有响应的酪蛋白,但是我的处理线程由于设置了更高的值,它仍然在处理记录。但是当heartbeat线程关闭时,在越过session.timeout.ms之后,会发生什么情况。因为我

  • 有人能解释一下Log4j的以下配置是做什么的吗? 我特别对标签Configuration=WARN和Root level=“info”感到困惑。

  • 序列化程序的主要链接文档:https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/package-frame.html LongSerializer:https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/longS