5.3 消息格式

优质
小牛编辑
147浏览
2023-12-01

消息通常按照批量的方式写入.record batch 是批量消息的技术术语,它包含一条或多条 records.不良情况下, record batch 只包含一条 record.Record batches 和 records 都有他们自己的 headers.在 kafka 0.11.0及后续版本中(消息格式的版本为 v2 或者 magic=2)解释了每种消息格式.点击查看消息格式详情.

5.3.1 Record Batch

以下为 RecordBatch 在硬盘上的格式.

	baseOffset: int64
	batchLength: int32
	partitionLeaderEpoch: int32
	magic: int8 (current magic value is 2)
	crc: int32
	attributes: int16
		bit 0~2:
			0: no compression
			1: gzip
			2: snappy
			3: lz4
		bit 3: timestampType
		bit 4: isTransactional (0 means not transactional)
		bit 5: isControlBatch (0 means not a control batch)
		bit 6~15: unused
	lastOffsetDelta: int32
	firstTimestamp: int64
	maxTimestamp: int64
	producerId: int64
	producerEpoch: int16
	baseSequence: int32
	records: [Record]

请注意,启用压缩时,压缩的记录数据将直接按照记录数进行序列化。

CRC(一种数据校验码) 会覆盖从属性到批处理结束的数据, (即 CRC 后的所有字节数据). CRC 位于 magic 之后,这意味着,在决定如何解释批次的长度和 magic 类型之前,客户端需要解析 magic 类型.CRC 计算不包括分区 learder epoch 字段,是为了避免 broker 收到每个批次的数据时 需要重新分配计算 CRC . CRC-32C (Castagnoli) 多项式用于计算.

压缩: 不同于旧的消息格式, magic v2 及以上版本在清理日志时保留原始日志中首次及最后一次 offset/sequence .这是为了能够在日志重新加载时恢复生产者的状态.例如,如果我们不保留最后一次序列号,当分区 learder 失败以后,生产者会报 OutOfSequence 的错误.必须保留基础序列号来做重复检查(broker 通过检查生产者该批次请求中第一次及最后一次序列号是否与上一次的序列号相匹配来判断是否重复).因此,当批次中所有的记录被清理但批次数据依然保留是为了保存生产者最后一次的序列号,日志中可能有空的数据.不解的是在压缩中时间戳可能不会被保留,所以如果批次中的第一条记录被压缩,时间戳也会改变

5.3.1.1 批次控制

批次控制包含成为控制记录的单条记录. 控制记录不应该传送给应用程序,相反,他们是消费者用来过滤中断的事务消息.

控制记录的 key 符合以下模式:

   version: int16 (current version is 0)   type: int16 (0 indicates an abort marker, 1 indicates a commit)

批次记录值的模式依赖于类型. 对客户端来说它是透明的.

5.3.2 Record(记录)

Record level headers were introduced in Kafka 0.11.0. The on-disk format of a record with Headers is delineated below.

Record 级别的头部信息在0.11.0 版本引入. 拥有 headers 的 Record 的磁盘格式如下.

	length: varint
	attributes: int8
		bit 0~7: unused
	timestampDelta: varint
	offsetDelta: varint
	keyLength: varint
	key: byte[]
	valueLen: varint
	value: byte[]
	Headers => [Header]

5.4.2.1 Record Header

	headerKeyLength: varint
	headerKey: String
	headerValueLength: varint
	Value: byte[]

我们使用了和 Protobuf 编码格式相同的 varint 编码. 更多后者相关的信息 在这里. Record 中 headers 的数量也被编码为 varint .