上一篇文章中讲了leveldb中WriteBatch、Snapshot使用,但是leveldb毕竟只是一个基础的存储引擎,没有太多的特性或者api使用可以讲,因此便直接从源码上开始,我这就从WriteBatch开始吧。有一些遗漏的以后补充进来,以后再慢慢进行整理。
我们先看下WriteBatch,Writebatch本身只是一个接口,只提供了两个方法的定义,如下:
package org.iq80.leveldb;
import java.io.Closeable;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public interface WriteBatch
extends Closeable
{
WriteBatch put(byte[] key, byte[] value);
WriteBatch delete(byte[] key);
}
WriteBatch的实现类为WriteBatchImpl,leveldb在java源码实现中还有另外一个实现,是leveldb-jni,对应的实现类为JniWriteBatch,简单的看了下,感觉要比当前源码简单一些,有兴趣的可以了解一下。
接下来看WriteBatchImpl类的源码,代码不多,如下:
public class WriteBatchImpl
implements WriteBatch
{
private final List<Entry<Slice, Slice>> batch = new ArrayList<>();
private int approximateSize;
public int getApproximateSize()
{
return approximateSize;
}
public int size()
{
return batch.size();
}
@Override
public WriteBatchImpl put(byte[] key, byte[] value)
{
requireNonNull(key, "key is null");
requireNonNull(value, "value is null");
batch.add(Maps.immutableEntry(Slices.wrappedBuffer(key), Slices.wrappedBuffer(value)));
approximateSize += 12 + key.length + value.length;
return this;
}
public WriteBatchImpl put(Slice key, Slice value)
{
requireNonNull(key, "key is null");
requireNonNull(value, "value is null");
batch.add(Maps.immutableEntry(key, value));
approximateSize += 12 + key.length() + value.length();
return this;
}
@Override
public WriteBatchImpl delete(byte[] key)
{
requireNonNull(key, "key is null");
batch.add(Maps.immutableEntry(Slices.wrappedBuffer(key), (Slice) null));
approximateSize += 6 + key.length;
return this;
}
public WriteBatchImpl delete(Slice key)
{
requireNonNull(key, "key is null");
batch.add(Maps.immutableEntry(key, (Slice) null));
approximateSize += 6 + key.length();
return this;
}
@Override
public void close()
{
}
public void forEach(Handler handler)
{
for (Entry<Slice, Slice> entry : batch) {
Slice key = entry.getKey();
Slice value = entry.getValue();
if (value != null) {
handler.put(key, value);
}
else {
handler.delete(key);
}
}
}
public interface Handler
{
void put(Slice key, Slice value);
void delete(Slice key);
}
}
首先可以看到WriteBatch中有一个变量为batch用来保存每一次的操作,还有一个变量为approximateSize,用来保存key和value的大小。
private final List<Entry<Slice, Slice>> batch = new ArrayList<>();
private int approximateSize;
实现的这些方法中,我们首先看第一个put方法,put方法如下:
@Override
public WriteBatchImpl put(byte[] key, byte[] value)
{
requireNonNull(key, "key is null");
requireNonNull(value, "value is null");
batch.add(Maps.immutableEntry(Slices.wrappedBuffer(key), Slices.wrappedBuffer(value)));
approximateSize += 12 + key.length + value.length;
return this;
}
这里batch add了一个不可变的Entry,同时将key和value进行了一层Slice的包装,但在approximateSize值不仅增加了key和value的length,同时增加了12,这个最开始我以为需要看下Slices的源码,应该是和Slice的方法有关系,wrappedBuffer源码如下:
public static Slice wrappedBuffer(byte[] array)
{
if (array.length == 0) {
return EMPTY_SLICE;
}
return new Slice(array);
}
Slice的源码为:
private final byte[] data;
private final int offset;
private final int length;
private int hash;
public Slice(byte[] data)
{
requireNonNull(data, "array is null");
this.data = data;
this.offset = 0;
this.length = data.length;
}
如上,Slice包装byte数组后,同时增加一个offset记载偏移量与增加length记录数组长度,并增加hash记录hash值,但是此处在WriteBatch保存kv时与这些应该是无关的,因此应该与这无关,最后上网查询、查看原c++的源码,里面在存储当前字符串时,需要有一个8字节的序列号和4字节的记录数作为头,因此在申请空间存放的时候要多加上12个字节的大小,因此对于插入的记录:
由kTypeValue+key长度+key+value长度+value组成
delete方法与put方法类似,在这就不再详述了,再说一下最后的foreach和Handler接口,foreach为:
public void forEach(Handler handler)
{
for (Entry<Slice, Slice> entry : batch) {
Slice key = entry.getKey();
Slice value = entry.getValue();
if (value != null) {
handler.put(key, value);
}
else {
handler.delete(key);
}
}
}
就是for循环依次处理add进来的key与value。
Handler为:
public interface Handler
{
void put(Slice key, Slice value);
void delete(Slice key);
}
我们在查询源码,可知在DbImpl中使用上述foreach与Handler,源码如下:
private Slice writeWriteBatch(WriteBatchImpl updates, long sequenceBegin)
{
Slice record = Slices.allocate(SIZE_OF_LONG + SIZE_OF_INT + updates.getApproximateSize());
final SliceOutput sliceOutput = record.output();
sliceOutput.writeLong(sequenceBegin);
sliceOutput.writeInt(updates.size());
updates.forEach(new Handler()
{
@Override
public void put(Slice key, Slice value)
{
sliceOutput.writeByte(VALUE.getPersistentId());
writeLengthPrefixedBytes(sliceOutput, key);
writeLengthPrefixedBytes(sliceOutput, value);
}
@Override
public void delete(Slice key)
{
sliceOutput.writeByte(DELETION.getPersistentId());
writeLengthPrefixedBytes(sliceOutput, key);
}
});
return record.slice(0, sliceOutput.size());
}
这个方法就是对之前的数据进行批量的操作,持久化存储或者删除等操作。
WriteBatch的demo演示就不再写了,之前文章中写过一回,对WriteBatch的源码解析就到这,下一篇文章讲Snapshot。