当前位置: 首页 > 工具软件 > Records > 使用案例 >

MemoryRecords分析

王声
2023-12-01

我们知道RecordBatch会拥有一个MemeoryRecords对象的引用,因为MemeoryRecords才是消息最终存放的地方

 

MemoryRecords表示多个消息的集合,其中封装看NIO ByteBuffer用来保存消息数据,Compressor用于对ByteBuffer中的消息进行压缩。

 

一 核心字段

ByteBuffer buffer: 用于保存消息的NIO ByteBuffer

Compressor compressor: 压缩器,对消息进行压缩,将压缩后的数据放到buffer里面

int writeLimit: 记录buffer字段最多可以写入多少字节的数据

int initialCapacity: buffer初始化大小

boolean writable: 该MemoryRecord是可读还是可写

二 Compressor

MemoryRecords很多地方涉及到Compressor,现在首先分析一下Compressor

2.1 核心字段

CompressionType type: 压缩类型

ByteBufferOutputStream bufferStream: 输出流,当添加record,buffer容量不够的时候,会自动扩容

DataOutputStream appendStream: 输出流,对bufferStream进行了包装,为其添加了压缩的功能

long numRecords: reocrd数量

float compressionRate: 压缩率

 

2.2 构造方法

public Compressor(ByteBuffer buffer, CompressionType type) {
    this.type = type; // 初始化压缩类型
   
this.initPos = buffer.position();// 获取bufferposition

   
this.numRecords = 0;
    this.writtenUncompressed = 0;
    this.compressionRate = 1;
    this.maxTimestamp = Record.NO_TIMESTAMP;
    // 如果没有指定压缩类型,重新设置position
   
if (type != CompressionType.NONE) {
        buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
    }
    // 创建输出流
   
bufferStream = new ByteBufferOutputStream(buffer);
    // bufferStream进行包装,给其添加压缩功能
   
appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
}

public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
    try {
        // 根据压缩类型,创建合适的压缩流
        switch (type) {
            case NONE:
                return new DataOutputStream(buffer);
            case GZIP:
                return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
            case SNAPPY:
                try {
                    OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
                    return new DataOutputStream(stream);
                } catch (Exception e) {
                    throw new KafkaException(e);
                }
            case LZ4:
                try {
                    OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
                    return new DataOutputStream(stream);
                } catch (Exception e) {
                    throw new KafkaException(e);
                }
            default:
                throw new IllegalArgumentException("Unknown compression type: " + type);
        }
    } catch (IOException e) {
        throw new KafkaException(e);
    }
}

 

三 重要方法

3.1 append 添加record到buffer

public void append(long offset, Record record) {
    // 判断是否为可写模式,否则抛出异常
    if (!writable)
        throw new IllegalStateException("Memory records is not writable");
    // 获取record的大小
    int size = record.size();
    // 添加offset,存放消息size以及消息本身size到ByteBuffer中
    compressor.putLong(offset);
    compressor.putInt(size);
    compressor.put(record.buffer());
    compressor.recordWritten(size + Records.LOG_OVERHEAD);
    record.buffer().rewind();
}

 

public long append(long offset, long timestamp, byte[] key, byte[] value) {
    if (!writable)
        throw new IllegalStateException("Memory records is not writable");

    int size = Record.recordSize(key, value);
    compressor.putLong(offset);
    compressor.putInt(size);
    long crc = compressor.putRecord(timestamp, key, value);
    compressor.recordWritten(size + Records.LOG_OVERHEAD);
    return crc;
}

 

3.2 hadRoomFor 判断我们是否有足够的空间创建一个新的record

public boolean hasRoomFor(byte[] key, byte[] value) {
    // 先判断是否可写
    if (!this.writable)
        return false;
    // 第一次写则判断初始的buffer容量是否大于record的(offset大小+存放消息大小的大小+消息本身的大小)
    // 如果不是第一次则判断已经写入的+当前要发送的record的大小是否超过阀值
    return this.compressor.numRecordsWritten() == 0 ?
        this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
        this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
}

 

3.3 sizeInBytes 计算MeoryRecords里的集合大小

对于可写的MemoryRecords,返回的是ByteBufferOutputStream的buffer字段大小;对于可读的MemoryRecords,返回的是MemoryRecords的buffer字段大小

public int sizeInBytes() {
    if (writable) {
        return compressor.buffer().position();
    } else {
        return buffer.limit();
    }
}
 类似资料: