当前位置: 首页 > 知识库问答 >
问题:

如果offsets.retention.minutes=2和cleanup.policy=compact,kafka将删除提交的偏移量还是对其进行压缩

周通
2023-03-14

我对Kafka是新手,但我知道Kafka在__consumer_offsets主题中存储消费者抵消,而offsets.retention.minutes定义了这段时间之后消费者抵消将被删除。但是对我来说__consumer_offsets主题有clean.policy=compact。请参见下面我删除__consumer_offsets主题时的结果:

主题:__consumer_offsets partitioncount:50replicationfactor:1configs:compression.type=producer,cleanup.policy=compact,segment.bytes=104857600

所以现在的问题是,两分钟后,消费者偏移将从主题中删除,还是仍然以压缩的形式存在于主题中?

共有1个答案

周宸
2023-03-14

(Kafka是如何存储消费者抵销的一些简要上下文)

首先要知道的是,消费者的偏移量以下面的消息格式存储在__consumer_offsets主题中。

键:值(约为格式)

key->(consumer-group,topic-name,parition):value->NULL

但是这个相同密钥的旧记录仍然存在。但是,如果您的使用者现在重新启动,它将在__consumer_offsets主题中看到无效的偏移量,因此要决定读取哪个偏移量,有一个定义为“auto.offset.reset”的使用者属性,您可以根据应用程序逻辑将其设置为“最早”或“最新”。

现在,对于这个“紧凑”的主题,就清理策略而言,它表示在主题的保留策略过期后,只保留与密钥相对应的最新记录,该保留策略由集群级的属性log.retention.min决定。因此,在保留策略过期后,将只保留每个使用者的最新提交的补偿,从而使您得到本主题的压缩形式。

 类似资料:
  • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。

  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

  • Spring Kafka-batchListener-true enable.auto.commit-true max_poll_records_config-默认(500) MAX_POLL_INTERVAL_MS_CONFIG-默认值(5分钟)

  • null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1

  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。