当前位置: 首页 > 知识库问答 >
问题:

session.timeout.ms和max.poll.interval.ms之间的差异(对于Kafka>=0.10.1)

段干跃
2023-03-14

我不清楚为什么我们同时需要session.timeout.msmax.poll.interval.ms以及我们何时使用其中之一或两者?这两个设置似乎都指出了协调器在假定消费者已死亡之前等待从消费者获取心跳的时间上限。

另外,它在基于KIP-62的0.10.1.0+版本中的表现如何?

共有1个答案

穆丁雨
2023-03-14

在KIP-62之前,只有session.timeout.ms(即Kafka0.10.0和更早版本)。max.poll.interval.ms通过KIP-62(Kafka0.10.1的一部分)引入。

KIP-62通过后台心跳线程将心跳从调用解耦到poll(),允许比心跳间隔更长的处理时间(即两个连续poll())。

假设处理一条消息需要1分钟。如果heartbeat和poll是耦合的(即在KIP-62之前),则需要将session.timeout.ms设置为大于1分钟,以防止使用者超时。然而,如果消费者死亡,检测失败的消费者也需要超过1分钟的时间。

KIP-62解耦轮询和心跳,允许在两个连续轮询之间发送心跳。现在您有两个线程在运行,心跳线程和处理线程,因此,KIP-62为每个线程引入了超时。session.timeout.ms用于心跳线程,而max.poll.interval.ms用于处理线程。

假设您设置了session.timeout.ms=30000,因此,使用者心跳线程必须在此时间到期之前向代理发送一个心跳。另一方面,如果处理单个消息需要1分钟,则可以将max.poll.interval.ms设置为大于1分钟,以使处理线程有更多时间处理消息。

如果处理线程死亡,则需要max.poll.interval.ms来检测。但是,如果整个使用者死亡(并且一个死亡的处理线程很可能会使包括heartbeat线程在内的整个使用者崩溃),则只需要session.timeout.ms来检测它。

这样做的目的是,即使处理本身需要相当长的时间,也可以快速检测出失败的消费者。

 类似资料:
  • 我当前运行的是Kafka0.10.0.1,两个值的对应文档如下: heartbeat.interval.ms-在使用Kafka的组管理设施时,向消费者协调器发送心跳的预期间隔时间。heartbeat用于确保消费者的会话保持活动,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于session.timeout.ms,但通常不应设置为高于该值的1/3。可以调得更低,以控制正常再平衡的预期时间。

  • 我使用的是一个使用Kafka Streams1.0和Kafka Broker1.0.1的无状态处理器 //将其设置为1/3会话。超时 //使其更大,因为我正在进行密集的计算操作,处理1条kafka消息(NLP操作)可能需要10分钟 尽管有这样的配置和我对kafka超时配置工作原理的理解,但我看到消费者每隔几秒钟就会重新平衡一次。 我已经阅读了下面的文章和其他stackoverflow问题。关于如何

  • 我试图理解以下两种合流的消费者配置的默认值是如何一起工作的。 max.poll.interval.ms-根据汇流文档,默认值为300,000毫秒 session.timeout.ms-根据汇合文档,默认值为10,000 ms 例如,假设消费者每3,000 ms发送一次心跳,我的第一次轮询发生在时间戳t1,然后第二次轮询发生在t1+20,00 ms。那么是否会因为超出“session.timeout

  • 问题内容: 很快就有两个相等运算符:double equals( )和Triple equals( ),两者之间有什么区别? 问题答案: 简而言之: 操作员检查其实例值是否相等, 操作员检查引用是否指向同一实例, 长答案: 类是引用类型,可能有多个常量和变量在幕后引用类的同一单个实例。类引用保留在运行时堆栈(RTS)中,其实例保留在内存的堆区域中。当您控制平等时, 这意味着它们的实例是否彼此相等。

  • AFAIK,max.poll.interval.ms是Kafka 0.10.1中引入的。然而,还不清楚我们什么时候可以同时使用session.timeout.ms和max.poll.interval.ms,考虑使用心跳线程没有响应的酪蛋白,但是我的处理线程由于设置了更高的值,它仍然在处理记录。但是当heartbeat线程关闭时,在越过session.timeout.ms之后,会发生什么情况。因为我

  • 序列化程序的主要链接文档:https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/package-frame.html LongSerializer:https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/longS