当前位置: 首页 > 知识库问答 >
问题:

如何从read_committed Kafka消费者获取上次提交的偏移量

魏刚豪
2023-03-14

我正在使用事务性KafkaProducer向主题发送消息。这个很管用。我使用的是具有read_committed隔离级别的KafkaConsumer,而我的seek和seekToEnd方法存在问题。根据文档,seek和seekToEnd方法给出了LSO(上次稳定偏移量)。但这有点让人摸不着头脑。因为它给我的价值总是一样的,主题结束了。无论最后一个条目是(由生产者提交的)还是中止的事务的一部分。例如,在我中止最后5次尝试插入20_000条消息后,最后100_000条记录不应该被使用者读取。但是在seekToEnd过程中,它会移动到主题的末尾(包括100000条消息)。但poll()不返回它们。

我正在寻找一种方法来检索最后一个提交的偏移量(因此生产者最后一个成功提交的消息)。对此似乎没有合适的API方法。那我需要自己滚吗?

选项将后移并轮询,直到没有检索到更多的记录为止,这将产生最后一条提交的消息。但我会假设Kafka提供了这种方法。

我们使用Kafka1.0.0。

共有1个答案

邓毅
2023-03-14

kafkaconsumer有一些很好的方法,如:partitionforbegginingoffsetsendoffsets以及commitedposition

看看哪一个适合你的需要。特别是仔细考虑所有4种与抵销相关的方法。partitionfor方法返回包含其他信息的完整元数据对象,但对于丰富日志记录很有用。

 类似资料:
  • 我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。

  • 我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用

  • null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1

  • 我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1

  • 我已经开始让我的制作人向Kafka发送数据,也让我的消费者提取相同的数据。当我在ApacheNIFI中使用ConsumerKafka处理器(kafka版本1.0)时,我脑海中很少有与kafka consumer相关的查询。 Q.1)当我第一次启动ConsumeKafka处理器时,我如何从开始和当前消息中读取消息? 问题2)以及在Kafka消费者关闭的情况下,如何在最后一条消费信息之后阅读信息? 在