当前位置: 首页 > 知识库问答 >
问题:

Confluent Kafka使用者配置-session.timeout.ms和max.poll.interval.ms是如何相关的?

空谦
2023-03-14

我试图理解以下两种合流的消费者配置的默认值是如何一起工作的。

max.poll.interval.ms-根据汇流文档,默认值为300,000毫秒

session.timeout.ms-根据汇合文档,默认值为10,000 ms

例如,假设消费者每3,000 ms发送一次心跳,我的第一次轮询发生在时间戳t1,然后第二次轮询发生在t1+20,00 ms。那么是否会因为超出“session.timeout.ms”而导致重新平衡?或者,如果消费者确实按照预期的时间戳发送了心跳,那么它会很好地工作吗?

共有1个答案

夹谷晋
2023-03-14

在前面的线程中,这里还解释了会话超时和最大轮询超时。也让我解释一下我对此的理解。

ConsumerRecords轮询(final long timeout):用于从主题的分区中按顺序提取数据,从上次消耗的偏移量或手动设置的偏移量开始。如果有可用的记录,这将立即返回,否则它将等待已过的超时。如果超时通过,将返回空记录。poll API不断调用以获取到达的任何新消息,并确保消费者的活力

session.timeout.ms在每次轮询期间,消费者协调器向代理发送心跳,以确保消费者的会话处于活动状态。如果broker在session.timeout.ms之前没有接收到任何心跳,则broker离开使用者并重新平衡

现在让我们来讨论一下它们是如何相互关联的。

使用者在调用轮询其检查心跳时,会话超时轮询在后台超时,如下所示:

>

  • 使用者协调器检查使用者是否未处于再平衡状态,如果仍然处于再平衡状态,则等待协调器加入使用者。等一下再打电话给波尔。请注意,如果max.poll.interval.ms较大,则需要更多时间重新平衡。
  • 轮询和重新平衡完成后,协调器检查会话超时如果会话超时已过期但未看到成功的心跳,旧的协调器将断开连接,因此下一轮询将尝试重新平衡。因此,会话超时直接依赖于时间协调器的活跃度,如果会话超时,使用者协调器本身死了,呼叫轮询将不得不在重新平衡之前分配新的协调器。

    session.timeout.ms:接收心跳的最大时间

    Max.poll.interval.ms:独立处理线程上的最大时间

    因此,如果您将max.poll.interval.ms设置为300,000,那么将有300,000 ms到下一轮投票,这意味着使用者线程最多有300,000 ms完成处理。在heartbeat.interval.ms(即3,000)时,心跳将继续发送heartbeat请求,以指示线程仍处于活动状态,如果在会话之前没有心跳,Timeout.ms(即10,000)时协调器将死亡,并调用poll来重新分配新的协调器和重新平衡

  •  类似资料:
    • 我当前运行的是Kafka0.10.0.1,两个值的对应文档如下: heartbeat.interval.ms-在使用Kafka的组管理设施时,向消费者协调器发送心跳的预期间隔时间。heartbeat用于确保消费者的会话保持活动,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于session.timeout.ms,但通常不应设置为高于该值的1/3。可以调得更低,以控制正常再平衡的预期时间。

    • 我不清楚为什么我们同时需要和以及我们何时使用其中之一或两者?这两个设置似乎都指出了协调器在假定消费者已死亡之前等待从消费者获取心跳的时间上限。 另外,它在基于KIP-62的0.10.1.0+版本中的表现如何?

    • 我使用的是一个使用Kafka Streams1.0和Kafka Broker1.0.1的无状态处理器 //将其设置为1/3会话。超时 //使其更大,因为我正在进行密集的计算操作,处理1条kafka消息(NLP操作)可能需要10分钟 尽管有这样的配置和我对kafka超时配置工作原理的理解,但我看到消费者每隔几秒钟就会重新平衡一次。 我已经阅读了下面的文章和其他stackoverflow问题。关于如何

    • 另外,JMS使用什么标准来确定一个专属消费者何时死亡或消失?

    • 本文向大家介绍nginx使用IPV6的相关配置项介绍,包括了nginx使用IPV6的相关配置项介绍的使用技巧和注意事项,需要的朋友参考一下 IPV4日益稀缺,ipv6已经慢慢走上日程,待ipv6在国内普及,使用nginx配置ipv6那是肯定的,看看如何让nginx支持ipv6以及配置. 查看nginx是否支持ipv6 没有出现–with-ipv6,说明当前的nginx不支持ipv6,所以我们需要重

    • 例如,我有一个消费者,最初在时间t1发送100条消息,然后我的消费者在t1+30秒启动并运行,那么我的消费者会使用t1+30秒之后发布的消息,还是会使用t1之后发布的消息?