3.2 Topic级别配置

优质
小牛编辑
135浏览
2023-12-01
与Topic相关的配置既包含服务器默认值,也包含可选的每个Topic覆盖值。 如果没有给出每个Topic的配置,那么服务器默认值就会被使用。 通过提供一个或多个 --config 选项,可以在创建Topic时设置覆盖值。 本示例使用自定义的最大消息大小和刷新率创建了一个名为 my-topic 的topic:
  > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1  --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
  
也可以在使用alter configs命令稍后更改或设置覆盖值. 本示例重置my-topic的最大消息的大小:
  > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic  --alter --add-config max.message.bytes=128000
  
您可以执行如下操作来检查topic设置的覆盖值
  > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe
  
您可以执行如下操作来删除一个覆盖值
  > bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
  
以下是Topic级别配置。 “服务器默认属性”列是该属性的默认配置。 一个Topic如果没有给出一个明确的覆盖值,相应的服务器默认配置将会生效。
名称描述类型默认值有效值服务器默认属性重要性
cleanup.policy该配置项可以是 "delete" 或 "compact"。 它指定在旧日志段上使用的保留策略。 默认策略 ("delete") 将在达到保留时间或大小限制时丢弃旧段。 "compact" 设置将启用该topic的日志压缩 。listdelete[compact, delete]log.cleanup.policymedium
compression.type为给定的topic指定最终压缩类型。这个配置接受标准的压缩编解码器 ('gzip', 'snappy', lz4) 。它为'uncompressed'时意味着不压缩,当为'producer'时,这意味着保留producer设置的原始压缩编解码器。stringproducer[uncompressed, snappy, lz4, gzip, producer]compression.typemedium
delete.retention.ms保留 日志压缩 topics的删除墓碑标记的时间。此设置还对consumer从偏移量0开始时必须完成读取的时间进行限制,以确保它们获得最后阶段的有效快照(否则,在完成扫描之前可能会收集到删除墓碑)。long86400000[0,...]log.cleaner.delete.retention.msmedium
file.delete.delay.ms删除文件系统上的一个文件之前所需等待的时间。long60000[0,...]log.segment.delete.delay.msmedium
flush.messages这个设置允许指定一个时间间隔n,每隔n个消息我们会强制把数据fsync到log。例如,如果设置为1,我们会在每条消息之后同步。如果是5,我们会在每五个消息之后进行fsync。一般来说,我们建议您不要设置它,而是通过使用replication机制来持久化数据,和允许更高效的操作系统后台刷新功能。这个设置可以针对每个topic的情况自定义 (请参阅 topic的配置部分).long9223372036854775807[0,...]log.flush.interval.messagesmedium
flush.ms这个设置允许指定一个时间间隔,每隔一段时间我们将强制把数据fsync到log。例如,如果这个设置为1000,我们将在1000 ms后执行fsync。一般来说,我们建议您不要设置它,而是通过使用replication机制来持久化数据,和允许更高效的操作系统后台刷新功能。long9223372036854775807[0,...]log.flush.interval.msmedium
follower.replication.throttled.replicas应该在follower侧限制日志复制的副本列表。该列表应以[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:...的形式描述一组副本,或者也可以使用通配符“*”来限制该topic的所有副本。list""[partitionId],[brokerId]:[partitionId],[brokerId]:...follower.replication.throttled.replicasmedium
index.interval.bytes此设置控制Kafka向其偏移索引添加索引条目的频率。默认设置确保我们大约每4096个字节索引一条消息。更多的索引允许读取更接近日志中的确切位置,但这会使索引更大。您可能不需要改变该值。int4096[0,...]log.index.interval.bytesmedium
leader.replication.throttled.replicas应该在leader侧限制日志复制的副本列表。该列表应以[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:...的形式描述一组副本,或者也可以使用通配符“*”来限制该topic的所有副本。list""[partitionId],[brokerId]:[partitionId],[brokerId]:...leader.replication.throttled.replicasmedium
max.message.bytes

Kafka允许的最大记录批次大小。如果这个参数被增加了且consumers是早于0.10.2版本,那么consumers的fetch size必须增加到该值,以便他们可以取得这么大的记录批次。

在最新的消息格式版本中,记录总是分组成多个批次以提高效率。在以前的消息格式版本中,未压缩的记录不会分组到多个批次,并且限制在该情况下只能应用单条记录。

int1000012[0,...]message.max.bytesmedium
message.format.version指定broker将用于将消息附加到日志的消息格式版本。该值应该是有效的ApiVersion。如:0.8.2,0.9.0.0,0.10.0,查看ApiVersion获取更多细节。通过设置特定的消息格式版本,用户将发现磁盘上的所有现有消息都小于或等于指定的版本。不正确地设置此值将导致旧版本的使用者中断,因为他们将收到他们不理解的格式的消息。string1.0-IV0log.message.format.versionmedium
message.timestamp.difference.max.msbroker接收消息时所允许的时间戳与消息中指定的时间戳之间的最大差异。如果message.timestamp.type=CreateTime,则如果时间戳的差异超过此阈值,则将拒绝消息。如果message.timestamp.type=LogAppendTime,则忽略此配置。long9223372036854775807[0,...]log.message.timestamp.difference.max.msmedium
message.timestamp.type定义消息中的时间戳是消息创建时间还是日志附加时间。值应该是“CreateTime”或“LogAppendTime”stringCreateTimelog.message.timestamp.typemedium
min.cleanable.dirty.ratio此配置控制日志compaction程序尝试清理日志的频率(假设启用了log compaction )。默认情况下,我们将避免清除超过50%的日志已经合并的日志。这个比率限制了重复在日志中浪费的最大空间(最多为50%,日志中最多有50%可能是重复的)。一个更高的比率将意味着更少,更高效的清理,但将意味着在日志中浪费更多的空间。double0.5[0,...,1]log.cleaner.min.cleanable.ratiomedium
min.compaction.lag.ms消息在日志中保持未压缩的最短时间。仅适用于被合并的日志。long0[0,...]log.cleaner.min.compaction.lag.msmedium
min.insync.replicas当producer将ack设置为“all”(或“-1”)时,此配置指定必须确认写入才能被认为成功的副本的最小数量。如果这个最小值无法满足,那么producer将引发一个异常(NotEnough Replicas或NotEnough ReplicasAfterAppend)。
当使用时,min.insync.Copicas和ack允许您执行更好的持久化保证。一个典型的场景是创建一个复制因子为3的topic,将min.insync.Copicas设置为2,并生成带有“All”的ack。这将确保如果大多数副本没有接收到写,则producer将引发异常。
int1[1,...]min.insync.replicasmedium
preallocate如果在创建新的日志段时应该预先分配磁盘上的文件,则为True。booleanfalselog.preallocatemedium
retention.bytes如果使用“delete”保留策略,此配置控制分区(由日志段组成)在放弃旧日志段以释放空间之前的最大大小。默认情况下,没有大小限制,只有时间限制。由于此限制是在分区级别强制执行的,因此,将其乘以分区数,计算出topic保留值,以字节为单位。long-1log.retention.bytesmedium
retention.ms如果使用“delete”保留策略,此配置控制保留日志的最长时间,然后将旧日志段丢弃以释放空间。这代表了用户读取数据的速度的SLA。long604800000log.retention.msmedium
segment.bytes此配置控制日志的段文件大小。保留和清理总是一次完成一个文件,所以更大的段大小意味着更少的文件,但对保留的粒度控制更少。int1073741824[14,...]log.segment.bytesmedium
segment.index.bytes此配置控制将偏移量映射到文件位置的索引大小。我们预先分配这个索引文件并且只在日志滚动后收缩它。您通常不需要更改此设置。int10485760[0,...]log.index.size.max.bytesmedium
segment.jitter.ms从预定的分段滚动时间减去最大随机抖动,以避免段滚动产生惊群效应。long0[0,...]log.roll.jitter.msmedium
segment.ms这个配置控制在一段时间后,Kafka将强制日志滚动,即使段文件没有满,以确保保留空间可以删除或合并旧数据。long604800000[0,...]log.roll.msmedium
unclean.leader.election.enable指示是否启用不在ISR集合中的副本选为领导者作为最后的手段,即使这样做可能导致数据丢失。booleanfalseunclean.leader.election.enablemedium