当前位置: 首页 > 知识库问答 >
问题:

Kafka-偏移提交

索梓
2023-03-14

我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。

因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗?

客户端版本-kafka客户端-2.4.0

谢谢!!

共有1个答案

景哲
2023-03-14

当您提交时(自动或手动几乎没有区别),您将在代理端存储消费者在分区中到达的记录。此已提交的偏移量仅在重新平衡的情况下使用,因此当消费者被分配到该分区时,他们可以从已知已处理所有先前消息的位置接收。这提供了一种保证,只要消费者编码正确,当消息被顺序处理时,如果组成员发生变化,消息不会在消费时丢失。

当组成员身份稳定时,committed offset不会执行任何操作。每个使用者都有自己的内存偏移量,它维护并在每次从代理获取一批记录时使用。默认情况下,此偏移量按顺序增加。seek方法仅更改内存中的偏移量,以便下次轮询将从指定的任意偏移量中提取,除非它不存在,否则将引发异常。

如果您在外部存储提交偏移量,则在重新平衡后可以使用seek来检索外部存储的偏移量并从中提取,但在这种情况下,您必须在重新平衡侦听器中调用seek-如果在轮询之前调用seek,则不会有任何影响,因为使用者在轮询方法期间只会发现重新平衡和新分区分配,因此,在轮询期间,如果不进行干预,它将使用最后提交的偏移量。

当你暂停消费者时,这种稍微不直观的情况也会出现,这是我在https://chrisg23.blogspot.com/2020/02/why-is-pausing-kafka-consumer-so.html?m=1

 类似资料:
  • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。

  • 我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用

  • null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1

  • 假设,我有多个Kafka制作者同时为单个Kafka主题生成数据。 有可能得到哪个是给定生产者生产的最后一个偏移吗? 例如: 生产者: 我想找出分别由P1和P2发布的最后一条记录的偏移量。 请注意,我不是在要求全局主题分区偏移量。

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。

  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?