当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者轮询间隔

邵伟泽
2023-03-14

我有一个Kafka主题,并为其附加了1个消费者(主题只有1个分区)。现在对于超时,我使用默认值(心跳:3秒,会话超时:10秒,轮询超时:5分钟)。

根据留档,轮询超时定义消费者必须在其他代理将该消费者从消费者组中删除之前处理消息。现在假设,消费者只需1分钟即可完成处理消息。

现在我有两个问题

a) Now will it call poll only after 5 mins or it will call poll() as soon as it finishes processing. 
b) Also, suppose consumer is sitting idle for sometime, then what would be the frequency of polling i.e. at what interval consumer will poll the broker for message? Will it be poll timeout or something else?

共有1个答案

暨曾笑
2023-03-14

我假设您所指的5分钟设置是<code>max.poll.interval。ms</code>,而不是轮询超时。

还有,我假设你是从爪哇打电话给Kafka;如果你使用不同的语言,答案可能会不同。

轮询超时是传递给 Kafka 消费者轮询 () 方法的值。这是 poll() 方法在您调用它之后将阻止的最长时间。

最大轮询间隔为5分钟,这意味着您必须在轮询()的最后一次调用返回后的5分钟结束之前再次调用轮询()。如果不这样做,您的消费者将断开连接。

所以你的问题:

a)现在它只会在5分钟后调用轮询,或者它会在完成处理后立即调用轮询()。

这完全取决于你。你是那个在你自己的代码中做召唤的人。你应该有一个循环来调用 poll()。

b)此外,假设消费者闲置了一段时间,那么轮询的频率是多少,即消费者以什么间隔轮询代理的消息?会不会是轮询超时或者别的什么?

使用者(即您自己的应用程序代码)不应该处于空闲状态,而应该处于循环中。在此循环中,调用 poll(),然后处理事件(1 分钟),然后再次调用 poll()。

 类似资料:
  • 我是Apache Camel的新手,我试图在一个简单的项目中理解和使用轮询消费者EIP,但我感到有点迷茫…谁能帮我解释一下,甚至用一个小的工作例子。 如有任何帮助,我们将不胜感激

  • 我的Kafka消费者的代码是这样的 我已经意识到,这种消费者设置无法读取所有信息。我无法再现这一点,因为这是一个间歇性的问题。 当我使用 将最后 100 条消息与此消费者进行比较时,我发现我的消费者间歇性地随机错过了几条消息。我的消费者有什么问题? 在python中使用消息的方法太多了。应该有一种最好只有一种明显的方法来做到这一点。

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 场景: session.timeout.ms:10秒 最大poll.interval.ms:5分钟 处理“Poll()”中使用的消息需要6分钟 C(6秒):发送另一个心跳 D(5分钟):发送了另一个心跳(5*60%3=0),但达到了“max.poll.interval.ms”(5分钟 在点“D”,消费者: 继续每3秒发送一次心跳? 如果是“1”点,则 a.在完成6分钟的处理后,考虑到由于在点“d”

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?