目录
log.retention.check.interval.ms
因为在windows环境运行,遭遇了文件重写异常,于是研究了一下kafka的消息保留机制。
Kafka's configuration is very flexible due to its fine granularity, and it supports a plethora of per-topic configuration settings to help administrators set up multi-tenant clusters. For example, administrators often need to define data retention policies to control how much and/or for how long data will be stored in a topic, with settings such as retention.bytes (size) and retention.ms (time). This limits storage consumption within the cluster, and helps complying with legal requirements such as GDPR.
上述内容是kafka官方介绍中的有关保留机制的介绍。简要来说可分为两种方式控制消息保留,空间(retention.bytes)和时间(retention.ms)。
生产者保存到broker中的消息,会保存在本地的logs/__consumer_offsets-xx/00000000000000000000.log文件中。
默认情况,这些文件不会永久保留,当超过了保留时间或体积后,kafka会对这些文件进行删除。
首先,根据log.retention条件判断,以segment为单位,判断该segment是否为可删除。
如果满足条件,将其标记为可删除。并且在日志文件cleaner-offset-checkpoint中记录当前清理到的位置。
由组件LogCleaner实现,将null写入该log文件中,这样该文件就被置空了。注:此处可再展开。
The maximum size of the log before deleting it
当文件大于该值后,会删除该文件。
Type: | long |
---|---|
Default: | -1 |
Valid Values: | |
Importance: | high |
Update Mode: | cluster-wide |
当保留时间超过该时间后,删除文件。
The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property
Type: | int |
---|---|
Default: | 168 |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
当保留时间超过该时间后,删除文件。
The number of minutes to keep a log file before deleting it (in minutes), secondary to log.retention.ms property. If not set, the value in log.retention.hours is used
Type: | int |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | read-only
|
当保留时间超过该时间后,删除文件。
The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in log.retention.minutes is used. If set to -1, no time limit is applied.
Type: | long |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | cluster-wide |
The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion
清理器检查日志符合被删除条件的轮询时间。
Type: | long |
---|---|
Default: | 300000 (5 minutes) |
Valid Values: | [1,...] |
Importance: | medium |
Update Mode: | read-only |
时间参数优先级
ms>minutes>hours
当时间参数与空间参数都存在时,谁先满足,就执行谁。
例:
log.retentions.hours=168
log.retentions.bytes=1024
当文件体积达到1024后,即便没有超过168小时,也会删除该文件。
参数
log.retentions.ms=2000
log.retention.check.interval.ms=10000 #修改该值,是为了提高实验是速度,默认是300000。非本次实验重点。
log.segment.bytes=1024 #该值是每个日志片段的体积,如果超过该体积,就会新建一个日志。非本次实验重点。
日志
[2021-06-02 20:03:17,929] INFO [Log partition=testMcdull222-2, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Found deletable segments with base offsets [0] due to retention time 2000ms breach (kafka.log.Log)
[2021-06-02 20:03:17,931] INFO [ProducerStateManager partition=testMcdull222-2] Writing producer snapshot at offset 9 (kafka.log.ProducerStateManager)
[2021-06-02 20:03:17,932] INFO [Log partition=testMcdull222-2, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Rolled new log segment at offset 9 in 2 ms. (kafka.log.Log)
[2021-06-02 20:03:17,932] INFO [Log partition=testMcdull222-2, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Scheduling log segment [baseOffset 0, size 61525] for deletion. (kafka.log.Log)
[2021-06-02 20:03:17,935] INFO [Log partition=testMcdull222-2, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Incrementing log start offset to 9 (kafka.log.Log)
可以看到 Found deletable segments with base offsets [8] due to retention time 2000ms breach。
注意,这里用词时候 deletable可删除,而不是删除。这是因为Kafka不会因为保留时间到达而立刻删除,而是将这些信息(文件+偏移量)标记为可删除,之后再由删除进程进行删除。
当达到了删除条件后,会删除掉对应的偏移量的消息,注意,日志和索引文件不会被立刻删除,这里是将空置写入了该文件,所以文件体积会变为0,但是文件还存在。这里又关系到了日志清理机制,会单独展开说明。
[2021-06-02 20:11:07,916] INFO [Log partition=testMcdull222-0, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Deleting segment 8 (kafka.log.Log)
[2021-06-02 20:11:07,919] INFO Deleted log D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-0\00000000000000000008.log.deleted. (kafka.log.LogSegment)
[2021-06-02 20:11:07,926] INFO Deleted offset index D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-0\00000000000000000008.index.deleted. (kafka.log.LogSegment)
参数
log.retention.hours=168
log.retention.bytes=1024
log.segment.bytes=1024
log.retention.check.interval.ms=10000
日志
[2021-06-02 20:33:44,760] INFO [ProducerStateManager partition=testMcdull222-1] Writing producer snapshot at offset 5717 (kafka.log.ProducerStateManager)
[2021-06-02 20:33:44,762] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Rolled new log segment at offset 5717 in 6 ms. (kafka.log.Log)
[2021-06-02 20:34:15,235] INFO [ProducerStateManager partition=testMcdull222-0] Writing producer snapshot at offset 5720 (kafka.log.ProducerStateManager)
[2021-06-02 20:34:15,236] INFO [Log partition=testMcdull222-0, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Rolled new log segment at offset 5720 in 3 ms. (kafka.log.Log)
[2021-06-02 20:34:15,239] INFO [ProducerStateManager partition=testMcdull222-2] Writing producer snapshot at offset 5720 (kafka.log.ProducerStateManager)
[2021-06-02 20:34:15,240] INFO [Log partition=testMcdull222-2, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Rolled new log segment at offset 5720 in 2 ms. (kafka.log.Log)
[2021-06-02 20:34:15,242] INFO [ProducerStateManager partition=testMcdull222-1] Writing producer snapshot at offset 5720 (kafka.log.ProducerStateManager)
[2021-06-02 20:34:15,243] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Rolled new log segment at offset 5720 in 3 ms. (kafka.log.Log)
[2021-06-02 20:34:17,154] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Found deletable segments with base offsets [5707] due to retention size in bytes 1024 breach (kafka.log.Log)
[2021-06-02 20:34:17,155] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Scheduling log segment [baseOffset 5707, size 407] for deletion. (kafka.log.Log)
[2021-06-02 20:34:17,160] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Incrementing log start offset to 5717 (kafka.log.Log)
[2021-06-02 20:35:17,169] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Deleting segment 5707 (kafka.log.Log)
[2021-06-02 20:35:17,172] INFO Deleted log D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-1\00000000000000005707.log.deleted. (kafka.log.LogSegment)
[2021-06-02 20:35:17,173] INFO Deleted offset index D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-1\00000000000000005707.index.deleted. (kafka.log.LogSegment)
[2021-06-02 20:35:17,178] INFO Deleted time index D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-1\00000000000000005707.timeindex.deleted. (kafka.log.LogSegment)
[2021-06-02 20:33:44,760] INFO [ProducerStateManager partition=testMcdull222-1] Writing producer snapshot at offset 5717 (kafka.log.ProducerStateManager)
这是生产者产生的日志,我向broker发送了消息,并且被存在了 offset 5717.
[2021-06-02 20:33:44,762] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Rolled new log segment at offset 5717 in 6 ms. (kafka.log.Log)
因为分段体积超过了log.segment.bytes=1024,所以产生了新的日志。
[2021-06-02 20:34:17,154] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Found deletable segments with base offsets [5707] due to retention size in bytes 1024 breach (kafka.log.Log)
[2021-06-02 20:34:17,155] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Scheduling log segment [baseOffset 5707, size 407] for deletion. (kafka.log.Log)
[2021-06-02 20:34:17,160] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Incrementing log start offset to 5717 (kafka.log.Log)
[2021-06-02 20:35:17,169] INFO [Log partition=testMcdull222-1, dir=D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs] Deleting segment 5707 (kafka.log.Log)
可以看体积已经检测到可被删除的段,因为其体积超过了log.retention.bytes=1024。
[2021-06-02 20:35:17,172] INFO Deleted log D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-1\00000000000000005707.log.deleted. (kafka.log.LogSegment)
[2021-06-02 20:35:17,173] INFO Deleted offset index D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-1\00000000000000005707.index.deleted. (kafka.log.LogSegment)
[2021-06-02 20:35:17,178] INFO Deleted time index D:\DEV\MQ\kafka_2.12-2.3.0\kafka_2.12-2.3.0\logs\testMcdull222-1\00000000000000005707.timeindex.deleted. (kafka.log.LogSegment)
删除消息。
本来是想将最近遇到的东西归纳一下,结果发现每次归纳过程中有出现了很多需要补充的新知识。
kafka中日志清理机制、分段机制,需要再继续学习。
http://kafka.apache.org/documentation/#retention.bytes
http://kafka.apache.org/documentation/#configuration
https://www.cnblogs.com/gao88/p/12539112.html
https://blog.csdn.net/u013332124/category_9279305.html
https://www.cnblogs.com/weixiuli/p/6413109.html