当前位置: 首页 > 知识库问答 >
问题:

Kafka每个主题Retention.Bytes和全局日志Retention.Bytes不工作

何涵育
2023-03-14

我们正在运行一个Kafka0.11.0的6节点集群。我们设置了全局保留和每个主题保留(以字节为单位),但这两个保留都没有应用。我在日志中没有看到任何错误,只是没有被删除(按大小;时间保留似乎起作用了)

# global retention 75GB or 60 days, segment size 512MB
log.retention.bytes=75000000000

log.retention.check.interval.ms=60000

log.retention.hours=1440

log.cleanup.policy=delete

log.segment.bytes=536870912
[tstumpges@kafka-02 kafka]$ bin/kafka-topics.sh  --zookeeper zk-01:2181/kafka --describe --topic stg_logtopic
Topic:stg_logtopic    PartitionCount:12       ReplicationFactor:3     Configs:retention.bytes=30000000000
        Topic: stg_logtopic   Partition: 0    Leader: 4       Replicas: 4,5,6 Isr: 4,5,6
        Topic: stg_logtopic   Partition: 1    Leader: 5       Replicas: 5,6,1 Isr: 5,1,6
        ...
[tstumpges@kafka-02 kafka]$ sudo du -s -h /data1/kafka-data/*
82G     /data1/kafka-data/stg_logother3-2
155G    /data1/kafka-data/stg_logother2-9
169G    /data1/kafka-data/stg_logother1-6
910G    /data1/kafka-data/stg_logtopic-4

我可以看到在分区目录中有大量的段日志文件(每个512MB)...怎么回事?!

谢谢你,雷鸣

共有1个答案

邢财
2023-03-14

通过kafka用户邮件列表找到了答案。我们显然碰到了kafka bug KAFKA-6030(对数清洁剂可清除率计算中的整数溢出)

升级到V1.0.0已经为我们修复了这一点!

 类似资料:
  • 我在一个输入主题上构建KTable,并且在两个Kafka Stream应用程序实例上加入KStream。 KTable的输入主题已经是一个日志压缩主题。因此,当我的一个应用程序实例关闭时,通过读取input log compacted主题,另一个实例状态存储似乎会用整个状态刷新。 所以不需要为我的KTable存储启用日志记录(更改日志)? 我的源输入日志压缩主题可能有数百万条记录,所以如果我在KT

  • 我需要维护自己的全局表结构。基本上,全局存储与处理器相连,我使用一些计算来创建键,然后将其存储到键值存储。 由于全局状态存储没有更改日志主题,所以它使用原始主题作为更改日志。在状态恢复的情况下,它只是将主题数据加载到全局表(这将是错误的),因为我们构建了自己的密钥 我的要求是用我的自定义键(不是直接来自主题的键)创建一个全局存储。有什么解决方案吗? 下面的链接回答了我的问题Kafka stream

  • 我正在将日志消息写入Kafka Topic,我希望此主题的保留是永久的。我在Kafka和Kafka Connect(_schemas、连接-配置、连接-状态、连接-偏移等)中看到,有一些特殊主题不会因日志保留时间而删除。如何强制一个主题像这些其他特殊主题一样?是命名约定还是其他属性? 谢啦

  • 在构建Kafka Streams拓扑时,可以通过两种不同的方式对多个主题的读取进行建模: 读取具有相同源节点的所有主题。 选项1相对于选项2是否有相对优势,反之亦然?所有主题都包含相同类型的数据,并具有相同的数据处理逻辑。

  • [2020-06-01 08:00:27,746]警告[Consumer Clientid=Consumer-Console-Consumer-56224-1,GroupID=Console-Consumer-56224]提取相关id为2的元数据时出错:{Distance2 =Invalid_Topic_Exception}(org.apache.kafka.clients.NetworkClie