当前位置: 首页 > 工具软件 > anylog > 使用案例 >

Kafka消息保留机制 log.retention

楚畅
2023-12-01

 

目录

背景

介绍

运行机制

参数

保留参数

log.retention.bytes

log.retention.hours

log.retention.minutes

log.retention.ms

log.retention.check.interval.ms

参数间关系

实验

实验 retentions.ms

实验 retentions.btyes

又挖一坑

参考


 

背景

因为在windows环境运行,遭遇了文件重写异常,于是研究了一下kafka的消息保留机制。

介绍

Kafka's configuration is very flexible due to its fine granularity, and it supports a plethora of per-topic configuration settings to help administrators set up multi-tenant clusters. For example, administrators often need to define data retention policies to control how much and/or for how long data will be stored in a topic, with settings such as retention.bytes (size) and retention.ms (time). This limits storage consumption within the cluster, and helps complying with legal requirements such as GDPR.

上述内容是kafka官方介绍中的有关保留机制的介绍。简要来说可分为两种方式控制消息保留,空间(retention.bytes)和时间(retention.ms)。

运行机制

生产者保存到broker中的消息,会保存在本地的logs/__consumer_offsets-xx/00000000000000000000.log文件中。

默认情况,这些文件不会永久保留,当超过了保留时间或体积后,kafka会对这些文件进行删除。

首先,根据log.retention条件判断,以segment为单位,判断该segment是否为可删除。

如果满足条件,将其标记为可删除。并且在日志文件cleaner-offset-checkpoint中记录当前清理到的位置。

由组件LogCleaner实现,将null写入该log文件中,这样该文件就被置空了。注:此处可再展开。

 

参数

保留参数

The maximum size of the log before deleting it

当文件大于该值后,会删除该文件。

Type:long
Default:-1
Valid Values: 
Importance:high
Update Mode:cluster-wide

当保留时间超过该时间后,删除文件。

The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property

Type:int
Default:168
Valid Values: 
Importance:high
Update Mode:read-only

 

当保留时间超过该时间后,删除文件。

The number of minutes to keep a log file before deleting it (in minutes), secondary to log.retention.ms property. If not set, the value in log.retention.hours is used

Type:int
Default:null
Valid Values: 
Importance:high
Update Mode:

read-only

 

 

当保留时间超过该时间后,删除文件。

The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in log.retention.minutes is used. If set to -1, no time limit is applied.

Type:long
Default:null
Valid Values: 
Importance:high
Update Mode:cluster-wide

 

log.retention.check.interval.ms

The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion

清理器检查日志符合被删除条件的轮询时间。

Type:long
Default:300000 (5 minutes)
Valid Values:[1,...]
Importance:medium
Update Mode:read-only

 

参数间关系

时间参数优先级

ms>minutes>hours

当时间参数与空间参数都存在时,谁先满足,就执行谁。

例:

log.retentions.hours=168

log.retentions.bytes=1024

当文件体积达到1024后,即便没有超过168小时,也会删除该文件。
 

实验

实验 retentions.ms

参数

log.retentions.ms=2000

log.retention.check.interval.ms=10000 #修改该值,是为了提高实验是速度,默认是300000。非本次实验重点。

log.segment.bytes=1024 #该值是每个日志片段的体积,如果超过该体积,就会新建一个日志。非本次实验重点。

 

日志

[2021-06-02 20:03:17,929] INFO [Log partition=testMcdull222-2, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Found deletable segments with base offsets [0] due to retention time 2000ms breach (kafka.log.Log)
[2021-06-02 20:03:17,931] INFO [ProducerStateManager partition=testMcdull222-2] Writing producer snapshot at offset 9 (kafka.log.ProducerStateManager)
[2021-06-02 20:03:17,932] INFO [Log partition=testMcdull222-2, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Rolled new log segment at offset 9 in 2 ms. (kafka.log.Log)
[2021-06-02 20:03:17,932] INFO [Log partition=testMcdull222-2, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Scheduling log segment [baseOffset 0, size 61525] for deletion. (kafka.log.Log)
[2021-06-02 20:03:17,935] INFO [Log partition=testMcdull222-2, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Incrementing log start offset to 9 (kafka.log.Log)

可以看到 Found deletable segments with base offsets [8] due to retention time 2000ms breach。

注意,这里用词时候 deletable可删除,而不是删除。这是因为Kafka不会因为保留时间到达而立刻删除,而是将这些信息(文件+偏移量)标记为可删除,之后再由删除进程进行删除。

 

当达到了删除条件后,会删除掉对应的偏移量的消息,注意,日志和索引文件不会被立刻删除,这里是将空置写入了该文件,所以文件体积会变为0,但是文件还存在。这里又关系到了日志清理机制,会单独展开说明。

[2021-06-02 20:11:07,916] INFO [Log partition=testMcdull222-0, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Deleting segment 8 (kafka.log.Log)
[2021-06-02 20:11:07,919] INFO Deleted log D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-0\00000000000000000008.log.deleted. (kafka.log.LogSegment)
[2021-06-02 20:11:07,926] INFO Deleted offset index D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-0\00000000000000000008.index.deleted. (kafka.log.LogSegment)

 

实验 retentions.btyes

参数

log.retention.hours=168

log.retention.bytes=1024

log.segment.bytes=1024

log.retention.check.interval.ms=10000

 

日志

[2021-06-02 20:33:44,760] INFO [ProducerStateManager partition=testMcdull222-1] Writing producer snapshot at offset 5717 (kafka.log.ProducerStateManager)
[2021-06-02 20:33:44,762] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Rolled new log segment at offset 5717 in 6 ms. (kafka.log.Log)
[2021-06-02 20:34:15,235] INFO [ProducerStateManager partition=testMcdull222-0] Writing producer snapshot at offset 5720 (kafka.log.ProducerStateManager)
[2021-06-02 20:34:15,236] INFO [Log partition=testMcdull222-0, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Rolled new log segment at offset 5720 in 3 ms. (kafka.log.Log)
[2021-06-02 20:34:15,239] INFO [ProducerStateManager partition=testMcdull222-2] Writing producer snapshot at offset 5720 (kafka.log.ProducerStateManager)
[2021-06-02 20:34:15,240] INFO [Log partition=testMcdull222-2, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Rolled new log segment at offset 5720 in 2 ms. (kafka.log.Log)
[2021-06-02 20:34:15,242] INFO [ProducerStateManager partition=testMcdull222-1] Writing producer snapshot at offset 5720 (kafka.log.ProducerStateManager)
[2021-06-02 20:34:15,243] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Rolled new log segment at offset 5720 in 3 ms. (kafka.log.Log)
[2021-06-02 20:34:17,154] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Found deletable segments with base offsets [5707] due to retention size in bytes 1024 breach (kafka.log.Log)
[2021-06-02 20:34:17,155] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Scheduling log segment [baseOffset 5707, size 407] for deletion. (kafka.log.Log)
[2021-06-02 20:34:17,160] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Incrementing log start offset to 5717 (kafka.log.Log)
[2021-06-02 20:35:17,169] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Deleting segment 5707 (kafka.log.Log)
[2021-06-02 20:35:17,172] INFO Deleted log D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-1\00000000000000005707.log.deleted. (kafka.log.LogSegment)
[2021-06-02 20:35:17,173] INFO Deleted offset index D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-1\00000000000000005707.index.deleted. (kafka.log.LogSegment)
[2021-06-02 20:35:17,178] INFO Deleted time index D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-1\00000000000000005707.timeindex.deleted. (kafka.log.LogSegment)

 

[2021-06-02 20:33:44,760] INFO [ProducerStateManager partition=testMcdull222-1] Writing producer snapshot at offset 5717 (kafka.log.ProducerStateManager)

这是生产者产生的日志,我向broker发送了消息,并且被存在了 offset 5717.

 

[2021-06-02 20:33:44,762] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Rolled new log segment at offset 5717 in 6 ms. (kafka.log.Log)

因为分段体积超过了log.segment.bytes=1024,所以产生了新的日志。

 

[2021-06-02 20:34:17,154] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Found deletable segments with base offsets [5707] due to retention size in bytes 1024 breach (kafka.log.Log)
[2021-06-02 20:34:17,155] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Scheduling log segment [baseOffset 5707, size 407] for deletion. (kafka.log.Log)
[2021-06-02 20:34:17,160] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Incrementing log start offset to 5717 (kafka.log.Log)
[2021-06-02 20:35:17,169] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Deleting segment 5707 (kafka.log.Log)

可以看体积已经检测到可被删除的段,因为其体积超过了log.retention.bytes=1024。


[2021-06-02 20:35:17,172] INFO Deleted log D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-1\00000000000000005707.log.deleted. (kafka.log.LogSegment)
[2021-06-02 20:35:17,173] INFO Deleted offset index D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-1\00000000000000005707.index.deleted. (kafka.log.LogSegment)
[2021-06-02 20:35:17,178] INFO Deleted time index D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-1\00000000000000005707.timeindex.deleted. (kafka.log.LogSegment)

删除消息。

又挖一坑

本来是想将最近遇到的东西归纳一下,结果发现每次归纳过程中有出现了很多需要补充的新知识。

kafka中日志清理机制、分段机制,需要再继续学习。

 

参考

http://kafka.apache.org/documentation/#retention.bytes

http://kafka.apache.org/documentation/#configuration

https://www.cnblogs.com/gao88/p/12539112.html

https://blog.csdn.net/u013332124/category_9279305.html

https://www.cnblogs.com/weixiuli/p/6413109.html

 

 类似资料: