当前位置: 首页 > 工具软件 > LevelDB JNI > 使用案例 >

levelDB WriteBatch源码解析

李华茂
2023-12-01

levelDB WriteBatch源码解析

上一篇文章中讲了leveldb中WriteBatch、Snapshot使用,但是leveldb毕竟只是一个基础的存储引擎,没有太多的特性或者api使用可以讲,因此便直接从源码上开始,我这就从WriteBatch开始吧。有一些遗漏的以后补充进来,以后再慢慢进行整理。

WriteBatch源码


我们先看下WriteBatchWritebatch本身只是一个接口,只提供了两个方法的定义,如下:

package org.iq80.leveldb;

import java.io.Closeable;

/**
 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
 */
public interface WriteBatch
        extends Closeable
{
    WriteBatch put(byte[] key, byte[] value);

    WriteBatch delete(byte[] key);
}

WriteBatch的实现类为WriteBatchImpl,leveldb在java源码实现中还有另外一个实现,是leveldb-jni,对应的实现类为JniWriteBatch,简单的看了下,感觉要比当前源码简单一些,有兴趣的可以了解一下。

接下来看WriteBatchImpl类的源码,代码不多,如下:

public class WriteBatchImpl
        implements WriteBatch
{
    private final List<Entry<Slice, Slice>> batch = new ArrayList<>();
    private int approximateSize;

    public int getApproximateSize()
    {
        return approximateSize;
    }

    public int size()
    {
        return batch.size();
    }

    @Override
    public WriteBatchImpl put(byte[] key, byte[] value)
    {
        requireNonNull(key, "key is null");
        requireNonNull(value, "value is null");
        batch.add(Maps.immutableEntry(Slices.wrappedBuffer(key), Slices.wrappedBuffer(value)));
        approximateSize += 12 + key.length + value.length;
        return this;
    }

    public WriteBatchImpl put(Slice key, Slice value)
    {
        requireNonNull(key, "key is null");
        requireNonNull(value, "value is null");
        batch.add(Maps.immutableEntry(key, value));
        approximateSize += 12 + key.length() + value.length();
        return this;
    }

    @Override
    public WriteBatchImpl delete(byte[] key)
    {
        requireNonNull(key, "key is null");
        batch.add(Maps.immutableEntry(Slices.wrappedBuffer(key), (Slice) null));
        approximateSize += 6 + key.length;
        return this;
    }

    public WriteBatchImpl delete(Slice key)
    {
        requireNonNull(key, "key is null");
        batch.add(Maps.immutableEntry(key, (Slice) null));
        approximateSize += 6 + key.length();
        return this;
    }

    @Override
    public void close()
    {
    }

    public void forEach(Handler handler)
    {
        for (Entry<Slice, Slice> entry : batch) {
            Slice key = entry.getKey();
            Slice value = entry.getValue();
            if (value != null) {
                handler.put(key, value);
            }
            else {
                handler.delete(key);
            }
        }
    }

    public interface Handler
    {
        void put(Slice key, Slice value);

        void delete(Slice key);
    }
}

首先可以看到WriteBatch中有一个变量为batch用来保存每一次的操作,还有一个变量为approximateSize,用来保存key和value的大小。

private final List<Entry<Slice, Slice>> batch = new ArrayList<>();
    private int approximateSize;

实现的这些方法中,我们首先看第一个put方法,put方法如下:

@Override
    public WriteBatchImpl put(byte[] key, byte[] value)
    {
        requireNonNull(key, "key is null");
        requireNonNull(value, "value is null");
        batch.add(Maps.immutableEntry(Slices.wrappedBuffer(key), Slices.wrappedBuffer(value)));
        approximateSize += 12 + key.length + value.length;
        return this;
    }

这里batch add了一个不可变的Entry,同时将key和value进行了一层Slice的包装,但在approximateSize值不仅增加了key和value的length,同时增加了12,这个最开始我以为需要看下Slices的源码,应该是和Slice的方法有关系,wrappedBuffer源码如下:

public static Slice wrappedBuffer(byte[] array)
    {
        if (array.length == 0) {
            return EMPTY_SLICE;
        }
        return new Slice(array);
    }

Slice的源码为:

private final byte[] data;
    private final int offset;
    private final int length;

    private int hash;

    public Slice(byte[] data)
    {
        requireNonNull(data, "array is null");
        this.data = data;
        this.offset = 0;
        this.length = data.length;
    }

如上,Slice包装byte数组后,同时增加一个offset记载偏移量与增加length记录数组长度,并增加hash记录hash值,但是此处在WriteBatch保存kv时与这些应该是无关的,因此应该与这无关,最后上网查询、查看原c++的源码,里面在存储当前字符串时,需要有一个8字节的序列号和4字节的记录数作为头,因此在申请空间存放的时候要多加上12个字节的大小,因此对于插入的记录:

由kTypeValue+key长度+key+value长度+value组成

delete方法与put方法类似,在这就不再详述了,再说一下最后的foreach和Handler接口,foreach为:

public void forEach(Handler handler)
    {
        for (Entry<Slice, Slice> entry : batch) {
            Slice key = entry.getKey();
            Slice value = entry.getValue();
            if (value != null) {
                handler.put(key, value);
            }
            else {
                handler.delete(key);
            }
        }
    }

就是for循环依次处理add进来的key与value。

Handler为:

public interface Handler
    {
        void put(Slice key, Slice value);

        void delete(Slice key);
    }

我们在查询源码,可知在DbImpl中使用上述foreach与Handler,源码如下:

 private Slice writeWriteBatch(WriteBatchImpl updates, long sequenceBegin)
    {
        Slice record = Slices.allocate(SIZE_OF_LONG + SIZE_OF_INT + updates.getApproximateSize());
        final SliceOutput sliceOutput = record.output();
        sliceOutput.writeLong(sequenceBegin);
        sliceOutput.writeInt(updates.size());
        updates.forEach(new Handler()
        {
            @Override
            public void put(Slice key, Slice value)
            {
                sliceOutput.writeByte(VALUE.getPersistentId());
                writeLengthPrefixedBytes(sliceOutput, key);
                writeLengthPrefixedBytes(sliceOutput, value);
            }

            @Override
            public void delete(Slice key)
            {
                sliceOutput.writeByte(DELETION.getPersistentId());
                writeLengthPrefixedBytes(sliceOutput, key);
            }
        });
        return record.slice(0, sliceOutput.size());
    }

这个方法就是对之前的数据进行批量的操作,持久化存储或者删除等操作。


WriteBatch的demo演示就不再写了,之前文章中写过一回,对WriteBatch的源码解析就到这,下一篇文章讲Snapshot。

 类似资料: