当前位置: 首页 > 知识库问答 >
问题:

消费者何时提交偏移?

夹谷弘亮
2023-03-14

我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?

共有1个答案

管翼
2023-03-14

我们通常建议不要使用auto.commit.enabled=true;让容器使用批处理记录ACKModes进行提交更具有确定性(分别在下一个poll()之前或处理完每个记录之后)。

在版本2.3中,我们默认禁用自动提交(除非在配置中显式设置)。

对于自动提交,我的理解是,如果从上一次轮询开始,间隔已经过去,提交将发生在poll()期间。它将提交自上次提交以来已处理的每个分区的最新偏移量。

 类似资料:
  • 我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用

  • null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。

  • 我正在使用事务性KafkaProducer向主题发送消息。这个很管用。我使用的是具有read_committed隔离级别的KafkaConsumer,而我的seek和seekToEnd方法存在问题。根据文档,seek和seekToEnd方法给出了LSO(上次稳定偏移量)。但这有点让人摸不着头脑。因为它给我的价值总是一样的,主题结束了。无论最后一个条目是(由生产者提交的)还是中止的事务的一部分。例如

  • 我要求从主题中读取消息,对它们进行批处理,然后将批处理推送到外部系统。如果批处理因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批处理,每个分区的 from 和 to 偏移量都存储在数据库中。为了实现这一点,我通过向读取器分配分区来为每个分区创建一个Kafka使用者,基于先前存储的偏移量,使用者寻求该位置并开始读取。我已关闭自动提交,并且不提交来自使用者的偏移量。对于每个批处理

  • 本文向大家介绍消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?相关面试题,主要包含被问及消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?时的应答技巧和注意事项,需要的朋友参考一下 offset+1