当前位置: 首页 > 知识库问答 >
问题:

如果auto.offset.reset=reasly但topic没有消息,将设置什么使用者偏移量

邹胜泫
2023-03-14

我有Kafka server版本2.4,并设置log.retention.hours=168(这样主题中的消息将在7天后被删除)和auto.offset.reset=aresty(这样,如果消费者没有得到最后提交的偏移量,那么应该从一开始就处理它)。而且由于我使用的是Kafka2.4版本,所以默认值为offsets.retention.minutes=10080(因为我没有在应用程序中设置此属性)。

我的主题数据是:1,2,3,4,5,6,7,8,9,10

关闭消费者前的当前消费者偏移:10

结束偏移量:10

共有1个答案

商曦
2023-03-14

尽管Kafka主题中没有可用的数据,但您的代理仍然知道该分区中的“下一个”偏移量。在您的示例中,该主题的第一个和最后一个偏移量是10,而它不包含任何数据。

因此,已经提交偏移量10的使用者将在再次启动时尝试读取11,这与使用者配置auto.offset.reset无关。

当您的主题有偏移量时,例如,直到15,而使用者在提交偏移量10后关闭时,您的示例将变得更加有趣。现在,假设由于保留策略,所有的抵消都从主题中删除了。如果您随后仅启动使用者,则使用者配置auto.offset.reset将如文档中所述生效:

  • 最后提交的偏移量或,
  • 如果上次提交的偏移量不再存在,则通过auto.offset.reset.
  • 给出配置

另外要注意的是:即使消息似乎被保留策略清除了,您仍然可以在主题中看到一些数据,因为即使在保留时间/大小之后,数据仍然保留在Kafka主题中

 类似资料:
  • 例如,我有一个消费者,最初在时间t1发送100条消息,然后我的消费者在t1+30秒启动并运行,那么我的消费者会使用t1+30秒之后发布的消息,还是会使用t1之后发布的消息?

  • 我们有一个用例,其中我们只创建一个消费者来处理队列中的消息。消息处理器在确认之前积累一定数量的消息。以异步方式接收消息并使用事务会话。消息的大小非常小。 在一定数量的消息之后,主动MQ停止向唯一的消费者发送进一步的消息,并等待确认。我们尝试过像consumer.prefetchSize,consumer . maximumpendingmessagelimit;但是什么都不管用。我们用一个只有一个

  • 我有一个SOAP Web服务,它发送一个kafka请求消息,并等待一个kafka响应消息(例如,consumer.poll(10000))。 每次调用web服务时,它都会创建一个新的Kafka生产者和一个新的Kafka消费者。 每次调用web服务时,使用者都会收到相同的消息(例如,具有相同偏移量的消息)。 我使用的是Kafka0.9,启用了自动提交,并且自动提交频率为100毫秒。 更新0001

  • 本文向大家介绍Kafka有内部的topic吗?如果有是什么?有什么所用?相关面试题,主要包含被问及Kafka有内部的topic吗?如果有是什么?有什么所用?时的应答技巧和注意事项,需要的朋友参考一下 __consumer_offsets,保存消费者offset

  • 我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--如果使用者在第一次轮询时得到100条记录,而监听器正在处理一条一条的记录,而它在10秒内只处理了80条记录,它是提交并将偏移提前80还是100?

  • 我在kafka中面临一个奇怪的问题,即在消费者应用程序重新启动后,所有来自主题的kafka消息都在重播。有人能帮我我在这里做错了什么吗? 这是我的配置: spring.kafka.consumer.auto-偏移-重置=最早 spring.kafka.enable.auto。提交=false 我的生产者配置: 消费者配置: 消费者代码: 集装箱代码 消费者配置 应用程序.属性