我们知道RecordBatch会拥有一个MemeoryRecords对象的引用,因为MemeoryRecords才是消息最终存放的地方
MemoryRecords表示多个消息的集合,其中封装看NIO ByteBuffer用来保存消息数据,Compressor用于对ByteBuffer中的消息进行压缩。
一 核心字段
ByteBuffer buffer: 用于保存消息的NIO ByteBuffer
Compressor compressor: 压缩器,对消息进行压缩,将压缩后的数据放到buffer里面
int writeLimit: 记录buffer字段最多可以写入多少字节的数据
int initialCapacity: buffer初始化大小
boolean writable: 该MemoryRecord是可读还是可写
二 Compressor
MemoryRecords很多地方涉及到Compressor,现在首先分析一下Compressor
2.1 核心字段
CompressionType type: 压缩类型
ByteBufferOutputStream bufferStream: 输出流,当添加record,buffer容量不够的时候,会自动扩容
DataOutputStream appendStream: 输出流,对bufferStream进行了包装,为其添加了压缩的功能
long numRecords: reocrd数量
float compressionRate: 压缩率
2.2 构造方法
public Compressor(ByteBuffer buffer, CompressionType type) {
this.type = type; // 初始化压缩类型
this.initPos = buffer.position();// 获取buffer的position
this.numRecords = 0;
this.writtenUncompressed = 0;
this.compressionRate = 1;
this.maxTimestamp = Record.NO_TIMESTAMP;
// 如果没有指定压缩类型,重新设置position
if (type != CompressionType.NONE) {
buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
}
// 创建输出流
bufferStream = new ByteBufferOutputStream(buffer);
// 对bufferStream进行包装,给其添加压缩功能
appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
}
public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) { try { // 根据压缩类型,创建合适的压缩流 switch (type) { case NONE: return new DataOutputStream(buffer); case GZIP: return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); case SNAPPY: try { OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize); return new DataOutputStream(stream); } catch (Exception e) { throw new KafkaException(e); } case LZ4: try { OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer); return new DataOutputStream(stream); } catch (Exception e) { throw new KafkaException(e); } default: throw new IllegalArgumentException("Unknown compression type: " + type); } } catch (IOException e) { throw new KafkaException(e); } }
三 重要方法
3.1 append 添加record到buffer
public void append(long offset, Record record) { // 判断是否为可写模式,否则抛出异常 if (!writable) throw new IllegalStateException("Memory records is not writable"); // 获取record的大小 int size = record.size(); // 添加offset,存放消息size以及消息本身size到ByteBuffer中 compressor.putLong(offset); compressor.putInt(size); compressor.put(record.buffer()); compressor.recordWritten(size + Records.LOG_OVERHEAD); record.buffer().rewind(); }
public long append(long offset, long timestamp, byte[] key, byte[] value) { if (!writable) throw new IllegalStateException("Memory records is not writable"); int size = Record.recordSize(key, value); compressor.putLong(offset); compressor.putInt(size); long crc = compressor.putRecord(timestamp, key, value); compressor.recordWritten(size + Records.LOG_OVERHEAD); return crc; }
3.2 hadRoomFor 判断我们是否有足够的空间创建一个新的record
public boolean hasRoomFor(byte[] key, byte[] value) { // 先判断是否可写 if (!this.writable) return false; // 第一次写则判断初始的buffer容量是否大于record的(offset大小+存放消息大小的大小+消息本身的大小) // 如果不是第一次则判断已经写入的+当前要发送的record的大小是否超过阀值 return this.compressor.numRecordsWritten() == 0 ? this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) : this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value); }
3.3 sizeInBytes 计算MeoryRecords里的集合大小
对于可写的MemoryRecords,返回的是ByteBufferOutputStream的buffer字段大小;对于可读的MemoryRecords,返回的是MemoryRecords的buffer字段大小
public int sizeInBytes() { if (writable) { return compressor.buffer().position(); } else { return buffer.limit(); } }