当前位置: 首页 > 知识库问答 >
问题:

Kafka Broker偏移量/日志保留和最早模式下的使用者偏移量重置

傅兴平
2023-03-14

问题描述:

我们的Kafka consumer(在Spring Boot2.x中开发)正在执行几天。当我们重新启动这些消费者时,主题的所有消息都将被再次消费,但仅在特定条件下。

条件:

           auto.commit.interval.ms = 100
           auto.offset.reset = earliest
           bootstrap.servers = [server1:9092]
           check.crcs = true
           client.id = 
           connections.max.idle.ms = 540000
           enable.auto.commit = false
           exclude.internal.topics = true
           fetch.max.bytes = 52428800
           fetch.max.wait.ms = 500
           fetch.min.bytes = 1
           group.id = consumer_group1
           heartbeat.interval.ms = 3000
           interceptor.classes = null
           internal.leave.group.on.close = true
           isolation.level = read_uncommitted
           key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
           max.partition.fetch.bytes = 1048576
           max.poll.interval.ms = 300000
           max.poll.records = 500
           metadata.max.age.ms = 300000
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
           receive.buffer.bytes = 65536
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 305000
           retry.backoff.ms = 100
           value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

代理配置:

           log.retention.ms = 86400000
           log.retention.minutes = 10080
           log.retention.hours = 168
           log.retention.bytes = -1

           offsets.retention.ms = 864000000
           offsets.retention.minutes = 14400
           offsets.retention.hours = 240 

           unclean.leader.election.enable = false
           log.cleaner.enable = true
           auto.leader.rebalance.enable = true
           leader.imbalance.check.interval.seconds = 300
           log.retention.check.interval.ms = 300000
           log.cleaner.delete.retention.ms = 604800000

谢谢和问候

共有1个答案

钱睿范
2023-03-14

您是对的,您遇到此问题的原因是log.retention.*和 ;offsets.retention.*的值不同(分别为7天和1天)。对于2.0之前的Kafka版本,请检查此处的说明。这是由于很少的消息进入您的主题,并且偏移数据已经过期。

关于您的短语这是不完全正确的,显然我们不能将consumer设置为 ;“latest”。如果您收到上次邮件的时间少于1天(如几小时前),则可以安全地将auto.offset.reset值更新为最新,并使用相同的组id(或application.id)。在这种情况下,您不会丢失消息。

作为另一种选择,您可以将特定主题的日志保留值更改为1天。此外,您还可以更新 ;valueoffsets.retention.*,但您需要从您的性能点测试它,它可能会被降级。

 类似资料:
  • 相反,我需要做的是将更改为新的内容,然后它将从最早的偏移量恢复。 会不会有其他的犯罪行为? 更新 根据我的理解,这看起来像是每次auto commit enable为false时,它都将提交偏移量。这是Camel Kafka组件的一个特性,因为即使启用了自动提交,它也将在x条消息之后同步

  • 为什么实际主题中的偏移值与同一主题中的偏移值不同?PFB偏移位置以及使用的命令。 我错过了什么?

  • 问题内容: 是否可以跳过X个第一行,并在一个查询中选择所有其他行?像那样: 它将选择:pqr,stu,vwx,yz 我尝试使用LIMIT和OFFSET完成此操作,但是问题是表是动态的,而且我不知道应该输入哪个LIMIT(我不知道表中有多少行)。 问题答案: 如果只需要最后N行,请尝试以下操作: 这会根据的顺序为您提供最后几条记录。 您可以使用自动递增的主键(希望有一个主键)来确定行的顺序(如果无法

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。

  • 可以从输入主题的特定偏移量到结束偏移量进行Kafka流处理吗? 我有一个Kafka流应用程序消耗输入主题,但由于某种原因失败了。我修复了问题并再次启动它,但它从输入主题的最新偏移量开始消耗。我知道应用程序已处理的输入主题的偏移量。现在,我如何将输入主题从一个偏移量处理到另一个偏移量。我正在使用合流平台5.1.2。

  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?