问题描述:
我们的Kafka consumer(在Spring Boot2.x中开发)正在执行几天。当我们重新启动这些消费者时,主题的所有消息都将被再次消费,但仅在特定条件下。
条件:
auto.commit.interval.ms = 100
auto.offset.reset = earliest
bootstrap.servers = [server1:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = consumer_group1
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
代理配置:
log.retention.ms = 86400000
log.retention.minutes = 10080
log.retention.hours = 168
log.retention.bytes = -1
offsets.retention.ms = 864000000
offsets.retention.minutes = 14400
offsets.retention.hours = 240
unclean.leader.election.enable = false
log.cleaner.enable = true
auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 300
log.retention.check.interval.ms = 300000
log.cleaner.delete.retention.ms = 604800000
谢谢和问候
您是对的,您遇到此问题的原因是log.retention.*
和 ;offsets.retention.*
的值不同(分别为7天和1天)。对于2.0之前的Kafka版本,请检查此处的说明。这是由于很少的消息进入您的主题,并且偏移数据已经过期。
关于您的短语这是不完全正确的,显然我们不能将consumer设置为 ;“latest”
。如果您收到上次邮件的时间少于1天(如几小时前),则可以安全地将auto.offset.reset
值更新为最新
,并使用相同的组id(或application.id
)。在这种情况下,您不会丢失消息。
作为另一种选择,您可以将特定主题的日志保留值更改为1天。此外,您还可以更新 ;valueoffsets.retention.*
,但您需要从您的性能点测试它,它可能会被降级。
相反,我需要做的是将更改为新的内容,然后它将从最早的偏移量恢复。 会不会有其他的犯罪行为? 更新 根据我的理解,这看起来像是每次auto commit enable为false时,它都将提交偏移量。这是Camel Kafka组件的一个特性,因为即使启用了自动提交,它也将在x条消息之后同步
为什么实际主题中的偏移值与同一主题中的偏移值不同?PFB偏移位置以及使用的命令。 我错过了什么?
问题内容: 是否可以跳过X个第一行,并在一个查询中选择所有其他行?像那样: 它将选择:pqr,stu,vwx,yz 我尝试使用LIMIT和OFFSET完成此操作,但是问题是表是动态的,而且我不知道应该输入哪个LIMIT(我不知道表中有多少行)。 问题答案: 如果只需要最后N行,请尝试以下操作: 这会根据的顺序为您提供最后几条记录。 您可以使用自动递增的主键(希望有一个主键)来确定行的顺序(如果无法
我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。
可以从输入主题的特定偏移量到结束偏移量进行Kafka流处理吗? 我有一个Kafka流应用程序消耗输入主题,但由于某种原因失败了。我修复了问题并再次启动它,但它从输入主题的最新偏移量开始消耗。我知道应用程序已处理的输入主题的偏移量。现在,我如何将输入主题从一个偏移量处理到另一个偏移量。我正在使用合流平台5.1.2。
我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?