当前位置: 首页 > 知识库问答 >
问题:

Kafka突然重置了消费者补偿

宫坚
2023-03-14

我正在使用Kafka 0.8

最近,我们开始喂食和消耗一个行为怪异的新主题,消耗的偏移量突然被重置,它尊重我们设置的auto.offset.reset策略(实际上是最小的)但我无法理解为什么该主题会突然重置其偏移量。

我正在使用高级消费者。

这是我发现的一些错误日志: 我们有一堆这样的错误日志:

[2015-03-26 05:21:17,789] INFO Fetching metadata from broker id:1,host:172.16.23.1,port:9092 with correlation id 47 for 1 topic(s) Set(MyTopic) (kafka.cl
ient.ClientUtils$)
[2015-03-26 05:21:17,789] ERROR Producer connection to 172.16.23.1:9092 unsuccessful (kafka.producer.SyncProducer)
java.nio.channels.ClosedByInterruptException
        at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:681)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

每次出现此问题时,我都会看到警告日志:

[2015-03-26 05:21:30,596] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)

然后真正的问题发生了:

[2015-03-26 05:21:47,551] INFO Connected to 172.16.23.5:9092 for producing (kafka.producer.SyncProducer)
[2015-03-26 05:21:47,552] INFO Disconnecting from 172.16.23.5:9092 (kafka.producer.SyncProducer)
[2015-03-26 05:21:47,553] INFO [ConsumerFetcherManager-1427047649942] Added fetcher for partitions ArrayBuffer([[MyTopic,0], initOffset 45268422051 to br
oker id:5,host:172.16.23.5,port:9092] ) (kafka.consumer.ConsumerFetcherManager)
[2015-03-26 05:21:47,553] INFO [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Starting  (kafka.consumer.Cons
umerFetcherThread)
[2015-03-26 05:21:50,388] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 45268422051 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
[2015-03-26 05:21:50,490] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 1948447612 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
[2015-03-26 05:21:50,591] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 1948447612 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
[2015-03-26 05:21:50,692] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 1948447612 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)

现在的问题是:有人已经经历过这种行为吗?有人能告诉我当Kafka决定重置其偏移量时,auto.offset.reset是最大还是最小吗?

谢谢。

共有1个答案

庄弘业
2023-03-14

现在的情况是你有一段时间描述你的话题太慢了。

Kafka 有一个保留模型,它不是基于消费者是否获得数据,而是基于磁盘使用情况和/或周期。在某些时候,你来得太晚了,你需要的下一条消息已经被清除了,并且由于 kafka 已经清理了数据,所以不再可用。因此,分区 [MyTopic,0] 的当前偏移量45268422051超出范围;将偏移量重置为1948447612消息。

然后,您的消费者将您的重置策略再次应用于引导本身,在您的情况下最小。

当您有突发的工作流时,这是一个常见的问题,有时会超出数据保留范围。它可能消失了,因为您提高了脱毛速度,或者增加了保留策略,以便能够在突发中幸存下来。

 类似资料:
  • 问题是Spring Kafka侦听器只配置了主题名。 我似乎可以让Kafka产生100个消费者来处理来自“队列”(日志)的消息。怎么能做到呢?

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 我设置了MirrorMaker2,用于在两个DC之间复制数据。 我的 mm2 属性, 看到下面的MM2创业。 我的数据正在按预期进行复制。源主题作为源在目标集群中创建..但是,消费者群体补偿并没有被复制。 已在源群集中启动使用者组。 消耗了少量消息并将其停止。在此主题中发布了新消息,镜像制造商也将数据镜像到目标集群。 我尝试使用来自目标集群的消息,如下所示。 由于我使用相同的使用者组,因此我希望我

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我有一个微服务应用程序,它是使用camel kafka开发的,用于消费来自kafka的消息。我用4个并发实例运行这个服务,其中每个实例都有1个消费者,他们消费的主题有20个分区。 在维护期间(每天凌晨1-2点),应用程序将通过停止消费者来暂停消费。 当应用程序暂停消费时,kafka重新平衡发生,一些实例将在停止消费之前分配更多分区。 e、 由instanceA(partition1-5)使用的g分