我用以下属性创建了一个Kafka主题
min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,segment.ms=100,cleanup.policy=紧凑
假设我按1111:1,1111:2,1111: null,2222:1的顺序插入k-v对,现在除了最后一条消息,日志压缩在其余消息上运行并清除前两条消息,但保留1111: null
根据文件,
Kafka log compaction also allows for deletes. A message with a key and a null payload acts like a tombstone, a delete marker for that key. Tombstones get cleared after a period.
因此,我希望当实现delete.retention.ms时,空标记应该删除带有键1111的消息
我有两个问题-为什么墓碑标记不起作用?为什么压缩会忽略最后一条消息?
这是server.properties文件所包含的内容-
log.retention.ms=100
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=100
log.cleaner.delete.retention.ms=100
log.cleaner.enable=true
log.cleaner.min.cleanable.ratio=0.01
在压缩中移除墓碑的算法应该如下。
可能墓碑仍然在日志的脏部分,因此没有被清除。触发几个不同键的消息应该会将逻辑删除块推入日志的清理部分并删除它们。
压缩主题有两部分:
1) 清洁部分:Kafka清洁剂至少清洁一次Kafka原木的部分。
2)脏的部分:kafka日志中至今没有被kafka cleaner清理过一次的部分。Kafka保持肮脏的抵消。带有偏移量的所有消息
注意:Kafka清洁剂清洁所有段(无论段是否在清洁/脏部分),并在每次脏比率达到min.cleanable.dirty.ratio.时重新复制它们
墓碑是分段删除的。如果段满足以下条件,则删除段中的墓碑:
>
段应该在日志的清理部分。
分段的最后修改时间应该是
很难详细说明第二点,但简单来说,第二点意味着=
您有两种选择来删除带有4条消息的墓碑:
源代码(额外阅读):https://github . com/Apache/Kafka/blob/trunk/core/src/main/Scala/Kafka/log/log cleaner . Scala
try {
// clean segments into the new destination segment
for (old <- segments) {
val retainDeletes = old.lastModified > deleteHorizonMs
info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
.format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats)
}
逻辑删除记录在设计上保留得更久。原因是,经纪人不跟踪消费者。假设使用者在阅读第一条记录后脱机一段时间。当消费者停机时,原木压实会启动。如果日志压缩将删除逻辑删除记录,则使用者永远不会知道该记录已被删除的事实。如果使用者实现缓存,则记录可能永远不会被删除。因此,逻辑删除的保留时间更长,以允许脱机使用者接收所有逻辑删除以进行本地清理。
Tombstone只有在delete.retention.ms
之后才会被删除(默认值为1天)。注意:这是一个主题级别的配置,没有代理级别的配置。因此,如果要更改它,您需要为每个主题设置配置。
我有几个用Java实现的Kafka消费者,我正在实现一个独立的应用程序来检查记录并删除它们。希望Kafka在压缩主题时删除状态存储。 现在...我对Kafka创建的不同类型的商店有点困惑。对于每一种类型的店铺,我想知道: Kafka删除相应主题中的旧唱片时是否删除? 删除相应主题中的记录时是否删除? 我们是不是被困住了? 我看到的商店类型有以下几种: null
我从一个用spark-kafka-cassandra(在kubernetes上)重写猛犸象spark-kafka-hbase应用程序的初步想法开始。 我有以下数据模型,一个支持全时插入,另一个支持upserts 办法1: 创建表test.inv_positions( location_id int, item bigint, time_id timestamp, sales_floor_qty i
我不明白为什么cassandra一直在扫描我的表寻找其他结果(因此获取了很多墓碑),因为第一行匹配,我指定我只想要一行。 如果我没有指定限制,我可以理解警告。但是,当第一行与限制1匹配时,扫描整个表有什么意义呢?
我已经构建了一个DockerLinux镜像,用于虚幻引擎的持续集成构建,具体方法如下:https://docs.adamrehn.com/ue4-docker/use-cases/continuous-integration 一旦构建完成(这需要一个具有大量磁盘空间的在线Linux虚拟机),我将该映像导出,下载到我的Windows 10计算机上,并尝试将该映像导入我的本地版本Docker中进行测试
在正确配置日志保留期后,kafka主题数据不会在windows中被删除。 以下错误记录在Kafka服务器日志中。 原因:java.nio.file.文件系统异常: \tmp\kafka-logs\TopicName-2\0000000000000007262.log - 计划任务“Kafka日志保留”(Kafka.utils.Kafka调度程序)中未捕获的异常