//DataRecord与item的不同是,item只保存键值,而record保存键值和value值,但是内存里只存PADDING的大小
typedef struct data_record {
char *value;
union {
bool free_value; // free value or not,改为need_free比较好
uint32_t crc;
};
int32_t tstamp; //时间戳
int32_t flag; //record.c开头的那几个const int标志的组合。
int32_t version;
uint32_t ksz; //key大小
uint32_t vsz; //v大小
char key[0];
} DataRecord;
/*
* Beansdb - A high available distributed key-value storage system:
*
* http://beansdb.googlecode.com
*
* Copyright 2010 Douban Inc. All rights reserved.
*
* Use and distribution licensed under the BSD license. See
* the LICENSE file for full text.
*
* Authors:
* Davies Liu <davies.liu@gmail.com>
*
*/
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include "record.h"
#include "crc32.c"
#include "quicklz.h"
//#include "fnv1a.h"
const int PADDING = 256; //PADDING是为了留出低8位,来记录bucket的下标
const int32_t COMPRESS_FLAG = 0x00010000;
const int32_t CLIENT_COMPRESS_FLAG = 0x00000010;
const float COMPRESS_RATIO_LIMIT = 0.7;//最小的压缩比例
const int TRY_COMPRESS_SIZE = 1024 * 10;
uint32_t gen_hash(char *buf, int len)
{
uint32_t hash = len * 97;
if (len <= 1024){
hash += fnv1a(buf, len); //整个
}else{
hash += fnv1a(buf, 512); //前512个
hash *= 97;
hash += fnv1a(buf + len - 512, 512); //后512个
}
return hash;
}
typedef struct hint_record {
uint32_t ksize:8;
uint32_t pos:24;
int32_t version;
uint16_t hash;
char name[2]; // allign
} HintRecord;
const int NAME_IN_RECORD = 2;
//| | |
//----------------------------------
//buf 已写 cur 可写 size
//param中存放了多个(HintRecord+key),而HintRecord又是根据Item得到的。
struct param {
int size;
int curr;
char* buf;
};
//将it存入param中
void collect_items(Item* it, void* param)
{
//-NAME_IN_RECORD是为了减少HintRecord中name的那两个比特
//+1是为了后面空出一个位置放'\0'
int length = sizeof(HintRecord) + strlen(it->name) + 1 - NAME_IN_RECORD;
struct param *p = (struct param *)param;
//不够存,扩大param
if (p->size - p->curr < length) {
p->size *= 2;
p->buf = (char*)realloc(p->buf, p->size);
}
//相当于replacement new
HintRecord *r = (HintRecord*)(p->buf + p->curr);
r->ksize = strlen(it->name);
//it->pos的低8位表示file_id,高24位表示在file中的pos
r->pos = it->pos >> 8;
r->version = it->ver;
r->hash = it->hash;
memcpy(r->name, it->name, r->ksize + 1);
p->curr += length;
}
//将buf中的内容写入到一个临时文件中,最后用这个文件代替path的文件。
void write_file(char *buf, int size, const char* path)
{
char tmp[255];
sprintf(tmp, "%s.tmp", path);
FILE *hf = fopen(tmp, "wb");
if (NULL==hf){
fprintf(stderr, "open %s failed\n", tmp);
return;
}
//写入size个字符,每个字符的大小为1
int n = fwrite(buf, 1, size, hf);
fclose(hf);
if (n == size) {
//删除path所指文件
unlink(path);
//改变这个已经写入的文件的名字为path
rename(tmp, path);
}else{
fprintf(stderr, "write to %s failed \n", tmp);
}
}
//将tree中的数据放入到hint文件中,这个tree(其实是bitcast中的cur_tree)会被销毁
//1.从tree中收集Item存入一个buf中,然后将treee销毁
//2.压缩buf
//3.将buf写入到一个hintfile中
void build_hint(HTree* tree, const char* hintpath)
{
struct param p;
p.size = 1024 * 1024;
p.curr = 0;
p.buf = malloc(p.size);
//1
//将tree里的item都搜集到p中
//ver<0的也收集了
ht_visit(tree, collect_items, &p);
ht_destroy(tree);
// 2
//如果后缀是.qlz说明数据要经过压缩
if (strcmp(hintpath + strlen(hintpath) - 4, ".qlz") == 0) {
char* wbuf = malloc(QLZ_SCRATCH_COMPRESS);
char* dst = malloc(p.size + 400);
//将p中的数据压缩成dst_size个字节存到dst中
int dst_size = qlz_compress(p.buf, dst, p.curr, wbuf);
free(p.buf);
p.curr = dst_size;
p.buf = dst;
free(wbuf);
}
//3
write_file(p.buf, p.curr, hintpath);
free(p.buf);
}
//扫描hintfile,将其中的HintRecord放入到tree中。
//tree -- 实际是BitCask的tree
//bucket -- 是这个hintfile在BitCask中的编号
//path -- hintfile文件的目录
//new_path -- 把hintfile文件中的内容存入这个文件中
//1.打开hintfile并使用mmap得到里面的全部内容
//2.解压缩
//3.依次读取每个HintRecord放入到tree中。
void scanHintFile(HTree* tree, int bucket, const char* path, const char* new_path)
{
char *addr;
int fd;
struct stat sb;
size_t length;
fd = open(path, O_RDONLY);
if (fd == -1) {
fprintf(stderr, "open %s failed\n", path);
return;
}
if (fstat(fd, &sb) == -1 || sb.st_size == 0){
close(fd);
return ;
}
//1
addr = (char*) mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
if (addr == MAP_FAILED){
fprintf(stderr, "mmap failed %s\n", path);
close(fd);
return;
}
//2
char *start = addr, *end = addr + sb.st_size;
if (strcmp(path + strlen(path) - 4, ".qlz") == 0) {
char wbuf[QLZ_SCRATCH_DECOMPRESS];
int size = qlz_size_decompressed(addr);
start = malloc(size);
int vsize = qlz_decompress(addr, start, wbuf);
if (vsize < size) {
fprintf(stderr, "decompress %s failed: %d < %d, remove it\n", path, vsize, size);
unlink(path);
exit(1);
}
end = start + vsize;
}
//为什么不把这一步放到前面,直接将addr对应的内容拷贝到new_path中?
if (new_path != NULL) {
if (strcmp(new_path + strlen(new_path) - 4, ".qlz") == 0) {
char* wbuf = malloc(QLZ_SCRATCH_COMPRESS);
char* dst = malloc(sb.st_size + 400);
int dst_size = qlz_compress(start, dst, end - start, wbuf);
write_file(dst, dst_size, new_path);
free(dst);
free(wbuf);
} else {
write_file(start, end - start, new_path);
}
}
//3
char *p = start;
while (p < end) {
HintRecord *r = (HintRecord*) p;
p += sizeof(HintRecord) - NAME_IN_RECORD + r->ksize + 1;
if (p > end){
fprintf(stderr, "scan %s: unexpected end, need %ld byte\n", path, p - end);
break;
}
uint32_t pos = (r->pos << 8) | (bucket & 0xff);
if (strlen(r->name) == r->ksize) {
ht_add(tree, r->name, pos, r->hash, r->version);
}else{
fprintf(stderr, "scan %s: key length not match %d\n", path, r->ksize);
}
}
munmap(addr, sb.st_size);
if (start != addr ) free(start);
close(fd);
}
//返回r中的value值
char* record_value(DataRecord *r)
{
char *res = r->value;
if (res == r->key + r->ksz + 1) {
// value was alloced in record
res = malloc(r->vsz);
memcpy(res, r->value, r->vsz);
}
return res;
}
void free_record(DataRecord *r)
{
if (r == NULL) return;
if (r->value != NULL && r->free_value) free(r->value);
free(r);
}
void compress_record(DataRecord *r)
{
int ksz = r->ksz, vsz = r->vsz;
int n = sizeof(DataRecord) - sizeof(char*) + ksz + vsz;
//比一个PADDING还大,而且没有被压缩过
if (n > PADDING && (r->flag & (COMPRESS_FLAG|CLIENT_COMPRESS_FLAG)) == 0) {
char *wbuf = malloc(QLZ_SCRATCH_COMPRESS);
char *v = malloc(vsz + 400);
if (wbuf == NULL || v == NULL) return ;
//先尝试压缩一部分,如果没压缩完,就重新压缩
//取较小的
int try_size = vsz > TRY_COMPRESS_SIZE ? TRY_COMPRESS_SIZE : vsz;
int vsize = qlz_compress(r->value, v, try_size, wbuf);
//没有压缩完,并且尝试压缩的压缩比例达到了0.7,重新压缩
if (try_size < vsz && vsize < try_size * COMPRESS_RATIO_LIMIT){
try_size = vsz;
vsize = qlz_compress(r->value, v, try_size, wbuf);
}
free(wbuf);
//如果压缩失败,返回
if (vsize > try_size * COMPRESS_RATIO_LIMIT || try_size < vsz) {
free(v);
return;
}
//压缩成功,更新r
if (r->free_value) {
free(r->value);
}
r->value = v;
r->free_value = true; //r的value需要free
r->vsz = vsize;
r->flag |= COMPRESS_FLAG;
}
}
DataRecord* decompress_record(DataRecord *r)
{
if (r->flag & COMPRESS_FLAG) {
char scratch[QLZ_SCRATCH_DECOMPRESS];
//先验证原数据有没有被破坏
int csize = qlz_size_compressed(r->value);
if (csize != r->vsz) {
fprintf(stderr, "broken compressed data: %d != %d, flag=%x\n", csize, r->vsz, r->flag);
goto DECOMP_END;
}
//解压
//解压本应得到的大小
int size = qlz_size_decompressed(r->value);
char *v = malloc(size);
//内存申请不成功也
if (v == NULL) {
fprintf(stderr, "malloc(%d)\n", size);
goto DECOMP_END;
}
int ret = qlz_decompress(r->value, v, scratch);
//解压得到的数据少,发生错误
if (ret < size) {
fprintf(stderr, "decompress %s failed: %d < %d\n", r->key, ret, size);
goto DECOMP_END;
}
//更新r
if (r->free_value) {
free(r->value);
}
r->value = v;
r->free_value = true;
r->vsz = size;
r->flag &= ~COMPRESS_FLAG;
}
return r;
//r是错误的,释放
DECOMP_END:
free_record(r);
return NULL;
}
DataRecord* decode_record(char* buf, int size)
{
DataRecord *r = (DataRecord *) (buf - sizeof(char*));
int ksz = r->ksz, vsz = r->vsz;
if (ksz < 0 || ksz > 200 || vsz < 0 || vsz > 100 * 1024 * 1024){
fprintf(stderr, "invalid ksz=: %d, vsz=%d\n", ksz, vsz);
return NULL;
}
int need = sizeof(DataRecord) - sizeof(char*) + ksz + vsz;
if (size < need) {
fprintf(stderr, "not enough data in buffer: %d < %d\n", size, need);
return NULL;
}
// CRC check ?
DataRecord *r2 = (DataRecord *) malloc(need + 1 + sizeof(char*));
memcpy(r2, r, sizeof(DataRecord) + ksz);
r2->key[ksz] = 0; // c str
r2->free_value = false;
r2->value = r2->key + ksz + 1;
memcpy(r2->value, r->key + ksz, vsz);
return decompress_record(r2);
}
//从f中读取一个DataRecord
//1.分步骤读取。
// 1.1.首先从文件中读一个PADDING出来,这是一个DataRecord所占的最小的文件空间。
// 1.2.计算读取的内容中是否包含完整的value
//2.crc校验
//3.解压缩
DataRecord* read_record(FILE *f, bool decomp)
{
//1
//申请的空间比DataRecord的size大没有关系。
DataRecord *r = (DataRecord*) malloc(PADDING + sizeof(char*));
r->value = NULL;
//1.1
if (fread(&r->crc, 1, PADDING, f) != PADDING) {//或者到达f的末尾,或者f为空。
fprintf(stderr, "read record faied\n");
goto READ_END;
}
int ksz = r->ksz, vsz = r->vsz;
if (ksz < 0 || ksz > 200 || vsz < 0 || vsz > 100 * 1024 * 1024){
fprintf(stderr, "invalid ksz=: %d, vsz=%d\n", ksz, vsz);
goto READ_END;
}
uint32_t crc_old = r->crc;
//1.2
//计算PADDING的数据中除了DataRecord和它的key以外,还有多少数据。
//sizeof(char*)是DataRecord最后的key[0]
int read_size = PADDING - (sizeof(DataRecord) - sizeof(char*)) - ksz;
if (vsz < read_size) {//value只存在于刚才读取的PADDING里
r->value = r->key + ksz + 1; //key的最后一个字节是结束符'\0',所以加1
r->free_value = false;
//后移一个字节,腾出空间给key的0
memmove(r->value, r->key + ksz, vsz);
//注意如果包含完整的value,那么读取的这个PADDING里也没有其它DataRecord的内容了。
//因为是按照PADDING对齐的。
}else{//刚才的PADDING没有读完,在f中还有残留
r->value = malloc(vsz);
r->free_value = true;
//先把可以读的读到
memcpy(r->value, r->key + ksz, read_size);
int need = vsz - read_size;
int ret = 0;
//然后再从文件中读
if (need > 0 && need != (ret=fread(r->value + read_size, 1, need, f))) {
r->key[ksz] = 0; // c str
fprintf(stderr, "read record %s faied: %d < %d @%ld\n", r->key, ret, need, ftell(f));
goto READ_END;
}
}
r->key[ksz] = 0; // c str
//2
uint32_t crc = crc32(0, (char*)(&r->tstamp),
sizeof(DataRecord) - sizeof(char*) - sizeof(uint32_t) + ksz);
crc = crc32(crc, r->value, vsz);
if (crc != crc_old){
fprintf(stderr, "%s @%ld crc32 check failed %d != %d\n", r->key, ftell(f), crc, r->crc);
goto READ_END;
}
//3
if (decomp) {
r = decompress_record(r);
}
return r;
READ_END:
free_record(r);
return NULL;
}
//encode与compress的不同是,encode是整个的记录,这包括crc,而compress只是K、V
char* encode_record(DataRecord *r, int *size)
{
compress_record(r);
int m, n;
int ksz = r->ksz, vsz = r->vsz;
int hs = sizeof(char*); // over header
m = n = sizeof(DataRecord) - hs + ksz + vsz;
//凑成PADDING的整数倍,这样,m的低八位就全为0了
if (n % PADDING != 0) {
m += PADDING - (n % PADDING);
}
char *buf = malloc(m);
DataRecord *data = (DataRecord*)(buf - hs);
memcpy(&data->crc, &r->crc, sizeof(DataRecord)-hs);
memcpy(data->key, r->key, ksz);
memcpy(data->key + ksz, r->value, vsz);
data->crc = crc32(0, (char*)&data->tstamp, n - sizeof(uint32_t));
*size = m;
return buf;
}
//向文件f中写记录r,f已经定位
int write_record(FILE *f, DataRecord *r)
{
int size;
char *data = encode_record(r, &size);
if (fwrite(data, 1, size, f) < size){
fprintf(stderr, "write %d byte failed\n", size);
free(data);
return -1;
}
free(data);
return 0;
}
//遍历DataFile中的DataRecord加入到tree中。
//注意这个函数的调用情境,是在bc_open时,发现对应hintfile不存在后才调用的。
//bc_open是datafile决定tree(因为tree一开始是不存在的),
//而optimize是tree决定datafile(因为tree中的数据是最新的)
//1.准备工作:打开datafile,新建一个htree来记录hint
//2.依次读取DataRecord,加入到tree中。
//3.新建hintfile文件。
void scanDataFile(HTree* tree, int bucket, const char* path, const char* hintpath)
{
if (bucket < 0 || bucket > 255) return;
//1
FILE *df = fopen(path, "rb");
if (NULL==df){
fprintf(stderr, "open %s failed\n", path);
return;
}
fprintf(stderr, "scan datafile %s \n", path);
//datafile对应的tree
HTree *cur_tree = ht_new(0);
fseek(df, 0, SEEK_END);
uint32_t total = ftell(df);
fseek(df, 0, SEEK_SET);
uint32_t pos = 0;
//2
while (pos < total) {
DataRecord *r = read_record(df, true);
if (r != NULL) {
uint16_t hash = gen_hash(r->value, r->vsz);
//datafile决定tree
//pos是Item->pos的前24位,bucket是后8位
if (r->version > 0){
ht_add(tree, r->key, pos | bucket, hash, r->version);
}else{
ht_remove(tree, r->key);
}
ht_add(cur_tree, r->key, pos | bucket, hash, r->version);
free_record(r);
}
//datafile文件是以PADDING个字节对齐的
pos = ftell(df);
if (pos % PADDING != 0){
int left = PADDING - (pos % PADDING);
fseek(df, left, SEEK_CUR);
pos += left;
}
}
fclose(df);
//3
build_hint(cur_tree, hintpath);
}
//只考察befor之前的record
void scanDataFileBefore(HTree* tree, int bucket, const char* path, time_t before)
{
if (bucket < 0 || bucket > 255) return;
FILE *df = fopen(path, "rb");
if (NULL == df){
fprintf(stderr, "open %s failed\n", path);
return;
}
fprintf(stderr, "scan datafile %s before %ld\n", path, before);
fseek(df, 0, SEEK_END);
uint32_t total = ftell(df);
fseek(df, 0, SEEK_SET);
uint32_t pos = 0;
while (pos < total) {
DataRecord *r = read_record(df, true);
if (r != NULL) {
//这个记录是在时间戳之后才有的
if (r->tstamp >= before ){
break;
}
if (r->version > 0){
uint16_t hash = gen_hash(r->value, r->vsz);
ht_add(tree, r->key, pos | bucket, hash, r->version);
}else{
ht_remove(tree, r->key);
}
free_record(r);
}
pos = ftell(df);
if (pos % PADDING != 0){
int left = PADDING - (pos % PADDING);
fseek(df, left, SEEK_CUR);
pos += left;
}
}
fclose(df);
}
//计算删除掉的记录
//从path对应的hint文件中,逐一扫描HintRecord,如果发现HintRecord跟tree中的key对应的
//Item不符,或者tree中不存在,或者tree中的ver小于0,那么deleted++
//total记录hint文件中总的HintRecord的数目
//1.打开path(hint)处的文件,读取内容并解压,存入到一个buf中
//2.从buf中依次得到HintRecord
//3.比较这些record在tree中是否被删除了(ver<0或者tree中不存在)或者被移动到了其它的文件
static int count_deleted_record(HTree* tree, int bucket, const char* path, int *total)
{
char *addr;
int fd;
struct stat sb;
size_t length;
*total = 0;
//1
fd = open(path, O_RDONLY);
if (fd == -1) {
fprintf(stderr, "open %s failed\n", path);
return 0;
}
if (fstat(fd, &sb) == -1 || sb.st_size == 0){
close(fd);
return 0;
}
addr = (char*) mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
if (addr == MAP_FAILED){
fprintf(stderr, "mmap failed %s\n", path);
close(fd);
return 0;
}
//解压
char *start = addr, *end = addr + sb.st_size;
if (strcmp(path + strlen(path) - 4, ".qlz") == 0) {
char wbuf[QLZ_SCRATCH_DECOMPRESS];
int size = qlz_size_decompressed(addr);
start = malloc(size);
int vsize = qlz_decompress(addr, start, wbuf);
if (vsize < size) {
fprintf(stderr, "decompress %s failed: %d < %d, remove it\n", path, vsize, size);
unlink(path);
return 0;
}
end = start + vsize;
}
char *p = start;
int deleted = 0;
while (p < end) {
HintRecord *r = (HintRecord*) p;
p += sizeof(HintRecord) - NAME_IN_RECORD + r->ksize + 1;
if (p > end){
fprintf(stderr, "scan %s: unexpected end, need %ld byte\n", path, p - end);
break;
}
(*total) ++;
Item *it = ht_get(tree, r->name);
//关于it->pos != ((r->pos << 8) | bucket):
//如果一个record被删除了,然后相同的key又被插入,这样两个datafile中就会有
//相同的key对应的data,但是bc->tree中是只有一个的,可以据此消除重复
if (it == NULL || it->pos != ((r->pos << 8) | bucket) || it->ver <= 0) {
deleted ++;
}
if (it) free(it);
}
munmap(addr, sb.st_size);
if (start != addr) free(start);
close(fd);
return deleted;
}
//优化,通过hintpath的统计记录,来决定是否优化data文件
//将有效record对应的item保存至一棵新建的树中,也就是用来进行hint的tree
//1.估算是否值得优化,如果是,打开一个临时文件进行写入
//2.扫面datafile中的每个DataRecord,看看它
// a.在tree中不存在
// b.改变了位置——或者不在这个文件中,或者在文件中的其它位置
// c.ver < 0
// 如果以上条件都不满足,才能写进新的文件中
//3.修改临时文件名,完成优化。
HTree* optimizeDataFile(HTree* tree, int bucket, const char* path, const char* hintpath, int limit)
{
//1
int all = 0;
//hintpath的文件中保存的是老数据。需要跟tree里的新数据比较。
int deleted = count_deleted_record(tree, bucket, hintpath, &all);
//只有删除的record占到总record的十分之一,才进行优化
if (deleted <= all * 0.1 && deleted <= limit) {
fprintf(stderr, "only %d records deleted in %d, skip %s\n", deleted, all, path);
return NULL;
}
FILE *df = fopen(path, "rb");
if (NULL==df){
fprintf(stderr, "open %s failed\n", path);
return NULL;
}
char tmp[255];
sprintf(tmp, "%s.tmp", path);
FILE *new_df = fopen(tmp, "wb");
if (NULL==new_df){
fprintf(stderr, "open %s failed\n", tmp);
fclose(df);
return NULL;
}
//1
HTree *cur_tree = ht_new(0);
fseek(df, 0, SEEK_END);
uint32_t total = ftell(df);
fseek(df, 0, SEEK_SET);
uint32_t pos = 0;
deleted = 0;
while (pos < total) {
DataRecord *r = read_record(df, false);
if (r != NULL) {
Item *it = ht_get(tree, r->key);
//这个item是在这个datafile中的
//与scanDataFIle相对应,这里是tree决定datafile
if (it && it->pos == (pos | bucket) && it->ver > 0) {
r->version = it->ver;
uint32_t new_pos = ftell(new_df);
uint16_t hash = it->hash;
//数据在datafile中的pos改变了。
ht_add(cur_tree, r->key, new_pos | bucket, hash, it->ver);
if (write_record(new_df, r) != 0) {
ht_destroy(cur_tree);
fclose(df);
fclose(new_df);
return NULL;
}
}else{
deleted ++;
}
if (it) free(it);
free_record(r);
}
//对齐
pos = ftell(df);
if (pos % PADDING != 0){
int left = PADDING - (pos % PADDING);
fseek(df, left, SEEK_CUR);
pos += left;
}
}
uint32_t deleted_bytes = ftell(df) - ftell(new_df);
fclose(df);
fclose(new_df);
//3
unlink(hintpath);
unlink(path);
rename(tmp, path);
fprintf(stderr, "optimize %s complete, %d records deleted, %d bytes came back\n",
path, deleted, deleted_bytes);
return cur_tree;
}
//对datafile中的record进行遍历。
void visit_record(const char* path, RecordVisitor visitor, void *arg1, void *arg2, bool decomp)
{
FILE *df = fopen(path, "rb");
if (NULL==df){
fprintf(stderr, "open %s failed\n", path);
return;
}
fprintf(stderr, "scan datafile %s \n", path);
fseek(df, 0, SEEK_END);
uint32_t total = ftell(df);
fseek(df, 0, SEEK_SET);
uint32_t pos = 0;
while (pos < total) {
DataRecord *r = read_record(df, decomp);
if (r != NULL) {
bool cont = visitor(r, arg1, arg2);
if (cont) break;
}
pos = ftell(df);
if (pos % PADDING != 0){
int left = PADDING - (pos % PADDING);
fseek(df, left, SEEK_CUR);
pos += left;
}
}
fclose(df);
}