当前位置: 首页 > 知识库问答 >
问题:

日志压缩,每个密钥只保留一条消息

华子昂
2023-03-14

我想创建一个主题,其中包含唯一键及其对应的最新值。因此,当将具有现有密钥的消息插入主题时,旧消息将被删除。

为此,我在服务器中配置了以下参数。属性文件:

log.cleaner.enable=true
log.cleanup.policy=compact

# The minimum age of a log file to be eligible for deletion due to age
log.retention.minutes=3

log.retention.bytes=10737418

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000

# The maximum time before a new log segment is rolled out (in milliseconds).
# If not set, the value in log.roll.hours is used
log.roll.ms=600000

所以压缩应该每3分钟进行一次。为了测试压缩策略,我创建了一个主题retention_test

kafka-topics --zookeeper localhost:2181 --create --topic retention_test --replication-factor 1 --partitions 1

使用控制台使用者,kafka控制台生产者——代理列表localhost:9092——主题保留测试——属性解析。key=true—属性键。分隔符=:我产生了以下消息:

>1:first
>2:second
>3:third

其中,控制台消费者——引导服务器本地主机:9092——主题保留测试——从一开始就成功地消费了它们;

first
second
third

现在,当我尝试插入带有已添加键的消息时,旧消息似乎不会被忽略,并保留在主题中:

在生产者方面:

>1:updatedFirst

请注意,为了测试该行为,在3分钟的保留期过去很久之后,我多次重新启动了消费者。输出为

first
second
third
updatedFirst

期望的输出应该是

second
third
updatedFirst

因为第一个和更新的第一个具有相同的键。

根据文件:

日志压缩为我们提供了一个更细粒度的保留机制,以便我们保证至少保留每个主键的最后一次更新

是否可以每个键只保留一条消息(最近的一条),而不是至少保留一条消息(包括最近的一条)?

共有1个答案

平元明
2023-03-14

我认为这通常是不可能的。Kafka将每个主题的每个分区的消息分段存储。每个段都是一个文件,它们只会作为一个整体添加(或删除)。压缩仅通过重新写入现有段文件而起作用,跳过稍后有具有相同密钥的消息的消息。但是,头段(当前正在向其添加新消息的头段)不会被压缩(直到创建了一个新段成为头段)。

您通过日志配置的3分钟。当记录时,保留配置不起作用。清理。策略=紧凑,仅当记录时有效。清理。策略=删除

为什么给定键只有一条消息很重要?如果您提供有关用例的更多信息,也许可以建议一种替代方法。

 类似资料:
  • 我有一个关于Kafka主题清理策略和它们的日志.保留... 例如,如果我将cleanup.policy设置为compact,则只有在主题的保留时间或保留时间对压缩没有影响之后才会开始压缩? 答案Thx...

  • 我需要压缩Weblogic日志。 我检查了Weblogic控制台的日志设置,发现我可以旋转日志,但没有看到可以为日志压缩(压缩到zip文件)设置的任何属性。 当前设置如图所示。 有没有自动压缩这些日志的方法?

  • 日志压缩可确保 Kafka 始终至少为单个 topic partition 的数据日志中的每个 message key 保留最新的已知值。 这样的设计解决了应用程序崩溃、系统故障后恢复或者应用在运行维护过程中重启后重新加载缓存的场景。 接下来让我们深入讨论这些在使用过程中的更多细节,阐述在这个过程中它是如何进行日志压缩的。 迄今为止,我们只介绍了简单的日志保留方法(当旧的数据保留时间超过指定时间、

  • 一个与主题压缩有关的问题。在压缩主题中,当日志清理器在清理特定键的以前偏移量(3,4,5)时出现延迟(假设5是最新的偏移量),而作为使用者使用这些偏移量时,即使3和4还没有压缩,我会只看到该键的最新偏移量(5)吗?还是使用者将按照该顺序获得(3,4,5)?

  • 我有一个带有Kafka使用者的spring应用程序,它使用@KafKalisterner注释。正在使用的主题是日志压缩的,我们可能会遇到必须再次使用主题消息的情况。以编程方式实现这一目标的最佳方法是什么?我们不控制Kafka主题配置。

  • 保存/记录在AWS SNS主题上发布的每条消息的最简单方法是什么?我想可能有一个神奇的设置可以自动将它们推送到S3或数据库,或者可能是一个自动支持HTTP目标的数据库服务,但似乎并非如此。也许需要通过Lambda函数来完成? 目的只是为了在设置一些SNS发布时进行基本的诊断和调试。我并不真正关心大规模或快速查询,只想一次记录和执行几分钟对所有活动的基本查询。