本文参考源码版本为 redis6.2
前缀树是字符串查找时,经常使用的一种数据结构,能够在一个字符串集合中快速查找到某个字符串,如下图所示:
(f) ""
\
(o) "f"
\
(o) "fo"
/ \
(t) (b) "foo"
/ \
"foot" (e) (a) "foob"
/ \
"foote" (r) (r) "fooba"
/ \
"footer" [] [] "foobar"
由于树中每个节点只存储字符串中的一个字符,故而有时会造成空间的浪费。
Rax 的出现就是为了解决这一问题。Redis 对于 Rax 的解释为 A radix tree implement,基数树的一种实现。
在前缀树中,如果一系列单字符节点之间的分支连接是唯一的,那么这些单字符节点就可以合并成一个节点,而这种结构的树,就正是 Radix Tree,也被称为基数树
Rax中不仅可以存储字符串,同时还可以为这个字符串设置一个值,也就是key-value。
Radix Tree 是属于前缀树的一种类型。前缀树也称为 Trie Tree,其特点是,保存在树上的每个 key 会被拆分成单字符,然后逐一保存在树上的节点中。
前缀树的根节点不保存任何字符,而除了根节点以外的其他节点,每个节点只保存一个字符。当把从根节点到当前节点的路径上的字符拼接在一起时,就可以得到相应 key 的值了。
不同的是,rax 在前缀树上做了一些优化,对上图做一些改变:
["foo"] ""
|
[t b] "foo"
/ \
"foot" ("er") ("ar") "foob"
/ \
"footer" [] [] "foobar"
如果不理解,继续往下看~
raxNode定义:
typedef struct raxNode {
uint32_t iskey:1; //节点是否包含key
uint32_t isnull:1; //节点的值是否为NULL
uint32_t iscompr:1; //节点是否被压缩
uint32_t size:29; //节点大小
unsigned char data[]; //节点的实际存储数据
} raxNode;
该结构中的成员变量包括 4 个元数据,这四个元数据的含义分别如下。
这 4 个元数据就对应了压缩节点和非压缩节点头部的 HDR,其中,iskey、isnull 和 iscompr 分别用 1 bit 表示,而 size 占用 29 bit。
另外,从 raxNode 结构体中,可以看到,除了元数据,该结构体中还有 char 类型数组 data。需要知道的是,data 是用来保存实际数据的。不过,这里保存的数据会根据当前节点的类型而有所不同:
在 raxNode 的实现中,无论是非压缩节点还是压缩节点,具有两个特点:
而这两个特点给 Radix Tree 实际保存数据时的结构,有两个特征。
如果你觉得晦涩不易懂,坚持,继续往下看~
这类节点会包含多个指向不同子节点的指针,以及多个子节点所对应的字符。
同时,如果从根节点到一个非压缩节点的路径上的字符串,已经对应了 Radix Tree 中保存的一个 key,那么这个非压缩节点中还包含了指向这个 key 对应的 value 的指针。
结合 raxNode 结构来看看,如果是一个非压缩 node ,当我们有size个字符(bytes),就会有相对应size 个指向子节点 raxNode 的指针;
需要注意的是,字符不是存在子节点,而是存在于父节点中(可以思考下, 为什么不存在子节点中?)。
[header iscompr=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)
这类节点会包含一个指向子节点的指针,以及子节点所代表的合并的字符串。
和非压缩节点类似,如果从根节点到一个压缩节点的路径上的字符串,已经对应了 Radix Tree 中保存的一个 key,那么,这个压缩节点中还包含指向这个 key 对应的 value 的指针。
注意,压缩节点仅有一个子节点
[header iscompr=1][abc][c-ptr](value-ptr?)
假设Radix Tree树中包含以下几个字符串 “foo”, “foobar” 和 “footer”;如果node节点代表rax树中的一个key,就写在 [] 里面,反之写在 () 里面。
(f) ""
\
(o) "f"
\
(o) "fo"
\
[t b] "foo"
/ \
"foot" (e) (a) "foob"
/ \
"foote" (r) (r) "fooba"
/ \
"footer" [] [] "foobar"
然而,这里有个常见优化点,对于连续只有单个子节点的node可以压缩成一个 「压缩节点」,因此,上面的树形图变成了下面这种:
["foo"] ""
|
[t b] "foo"
/ \
"foot" ("er") ("ar") "foob"
/ \
"footer" [] [] "foobar"
这便是Radix Tree的模型图了。
不过,这种树形图实现上却是有点麻烦,比如 当字符串 “first” 要插入上面的 Radix Tree 中,这里就涉及到节点切割了,因为此时 “foo” 就不再是一个公共前缀了,最终树形图如下:
(f) ""
/
(i o) "f"
/ \
"fi" ("rst") (o) "fo"
/ \
"first" [] [t b] "foo"
/ \
"foot" ("er") ("ar") "foob"
/ \
"footer" [] [] "foobar"
rax提供了查找key的接口raxFind,该接口用于获取key对应的value值,定义如下:
/* Find a key in the rax, returns raxNotFound special void pointer value
* if the item was not found, otherwise the value associated with the
* item is returned. */
void *raxFind(rax *rax, unsigned char *s, size_t len) {
raxNode *h;
int splitpos = 0;
size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,NULL);
if (i != len || (h->iscompr && splitpos != 0) || !h->iskey)
return raxNotFound;
return raxGetData(h);
}
其中,该方法的作用有是,在rax中查找长度为 len 的字符串 s (s为rax中的key),找到之后返回 key 对应的 value。raxLowWalk 方法返回 s 的匹配长度,当 s ! = len 时,表示未查找到该key;当s == len时,需要检验 stopnode 是否为 key,并且当 stopnode 为压缩节点时,还需要检查splitpos是否为0(可能匹配到某个压缩节点中间的某个元素)。
raxLowWalk 方法作为底层方法,在增删查改操作中均有调用,接口定义为:
static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts)
其中,入参定义为:
具体实现如下:
raxNode *h = rax->head; // 从跟节点开始匹配
raxNode **parentlink = &rax->head;
size_t i = 0; /* Position in the string. */
size_t j = 0; /* Position in the node children (or bytes if compressed).*/
while(h->size && i < len) {
unsigned char *v = h->data;
if (h->iscompr) {
// 压缩节点,挨个比对,相同则继续往下找;反之,则退出,说明匹配到这就开始不同或者已经到末尾了
for (j = 0; j < h->size && i < len; j++, i++) {
if (v[j] != s[i]) break;
}
if (j != h->size) break;
} else { // 非压缩节点,只要找到一个相同的就可以继续往下,在子节点中查找了
/* Even when h->size is large, linear scan provides good
* performances compared to other approaches that are in theory
* more sounding, like performing a binary search. */
for (j = 0; j < h->size; j++) {
if (v[j] == s[i]) break;
}
if (j == h->size) break;
// 可以匹配到当前字符,继续往下匹配下一个字符,然后会移动到子节点进行匹配
i++;
}
// 记录路径信息
if (ts) raxStackPush(ts,h); /* Save stack of parent nodes. */
// 找到第一个子节点的地址
raxNode **children = raxNodeFirstChildPtr(h);
if (h->iscompr) j = 0; /* Compressed node only child is at index 0. */
// 将当前节点移动到其第j个子节点
memcpy(&h,children+j,sizeof(h));
parentlink = children+j;
j = 0; /* If the new node is non compressed and we do not
iterate again (since i == len) set the split
position to 0 to signal this node represents
the searched key. */
}
if (stopnode) *stopnode = h;
if (plink) *plink = parentlink;
if (splitpos && h->iscompr) *splitpos = j;
return i;
实现逻辑:
插入元素比较复杂,源码中也花了大量注释说明。
向 rax 中插入 key-value 对,对于已存在的 key, rax 提供了2种方案,覆盖或者不覆盖原有的 value,对应的接口分别为 raxInsert、raxTryInsert。
插入操作的真正实现函数 raxGenericInsert,方法定义如下:
int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old, int overwrite)
第一步,当完全匹配并且目标位置的节点不需要切割
size_t i;
int j = 0; // 切割位置,当定位到具体的压缩节点时,j用于定位该压缩节点待插入的位置
raxNode *h, **parentlink;
// 在插入元素前,会通过 raxLowWalk 判断 key 是否存在或者找到待插入位置。
i = raxLowWalk(rax,s,len,&h,&parentlink,&j,NULL);
// 1. i == len(代表已经匹配到了整个字符串)
// 2. 如果不是压缩节点 或者 是压缩节点但该压缩节点不需要切割(j==0)
// 注:对于压缩节点node,如果要在node中插入新元素 或者 该node中代表了这次要插入元素的key, 那么将会重新分配该node, 确保空间足够
if (i == len && (!h->iscompr || j == 0 /* not in the middle if j is 0 */)) {
/* Make space for the value pointer if needed. */
if (!h->iskey || (h->isnull && overwrite)) {
h = raxReallocForData(h,data);
if (h) memcpy(parentlink,&h,sizeof(h));
}
if (h == NULL) {
errno = ENOMEM;
return 0;
}
// 如果key存在,根据方法入参进行处理(获取旧数据或者重写)
if (h->iskey) {
if (old) *old = raxGetData(h);
if (overwrite) raxSetData(h,data);
errno = 0;
return 0; /* Element already exists. */
}
// 给新增节点赋值
raxSetData(h,data);
rax->numele++;
return 1; /* Element inserted. */
}
第二步:对于需要切割的压缩节点,有几种情况:
假设我们当前所在节点 h 包含字符串 “ANNIBALE” ,h 有且仅有一个子节点(“SC”),并且该节点没有子节点,用 ‘0’ 表示,结构如下:
"ANNIBALE" -> "SCO" -> []
案例1: 插入 “ANNIENTARE”
|B| -> "ALE" -> "SCO" -> []
"ANNI" -> |-|
|E| -> (... continue algo ...) "NTARE" -> []
需要将 “ANNIBALE” 拆分成3部分:压缩节点,非压缩节点,压缩节点
案例2: 插入 “ANNIBALI”
|E| -> "SCO" -> []
"ANNIBAL" -> |-|
|I| -> (... continue algo ...) []
需要将 “ANNIBALE” 拆成2部分:压缩节点,非压缩节点。
案例3: 插入 “AGO”(和案例1类似,不过,需要在原节点中设置 iscompr = 0)
|N| -> "NIBALE" -> "SCO" -> []
|A| -> |-|
|G| -> (... continue algo ...) |O| -> []
需要将 “ANNIBALE” 节点拆分为3部分:非压缩节点,非压缩节点,压缩节点。
案例4: 插入 “CIAO”
|A| -> "NNIBALE" -> "SCO" -> []
|-|
|C| -> (... continue algo ...) "IAO" -> []
此时需要将 “ANNIBALE” 节点拆分为2部分:第一部分是非压缩节点,第二部分为压缩节点。
案例5: 插入 “ANNI”
"ANNI" -> "BALE" -> "SCO" -> []
将 “ANNIBALE” 拆分成2个压缩节点。
源码实现上,是用了两种算法覆盖以上5种案例:
算法1: 覆盖案例 1~4,出现字符不匹配的位置,是在某个压缩节点上。
SPLITPOS 代表压缩节点切割位置,例如 压缩节点包含 “ANNIBALE”,将插入 “ANNIENTARE”,那么 SPLITPOS = 4,也就是 未匹配元素的下标。
源码实现如下:
/* ------------------------- ALGORITHM 1 --------------------------- */
if (h->iscompr && i != len) {
/* 1: Save next pointer. */
// 用 NEXT 保存 h的子节点指针
raxNode **childfield = raxNodeLastChildPtr(h);
raxNode *next;
memcpy(&next,childfield,sizeof(next));
if (h->iskey) {
debugf("key value is %p\n", raxGetData(h));
}
// 设置新增节点长度,这里 j 是压缩节点切割位置,因此压缩节点会被切割成两部分,长度分别是 j 和 postfixlen
size_t trimmedlen = j;
size_t postfixlen = h->size - j - 1;
// 仔细想想?当压缩节点被切割成两部分,并且没有公共前缀;如果该压缩节点本身 iskey, 那么切割后 split_node 自然也是 iskey (注意:iskey表示从根节点到压缩节点的父节点组成的key)
int split_node_is_key = !trimmedlen && h->iskey && !h->isnull;
size_t nodesize;
/* 2: 创建 split node 节点,并分配空间 */
// 可以看到,这里会切割成三部分,当切割位置在第一个字符时(j==0),即trimmedlen=0 表示没有公共前缀,此时会切割成两部分
raxNode *splitnode = raxNewNode(1, split_node_is_key); // 切割位置
raxNode *trimmed = NULL; // 已匹配的前缀部分
raxNode *postfix = NULL; // 切割位置之后,未匹配的部分
// 如果匹配前缀大于0,分配新空间
if (trimmedlen) {
nodesize = sizeof(raxNode)+trimmedlen+raxPadding(trimmedlen)+
sizeof(raxNode*);
if (h->iskey && !h->isnull) nodesize += sizeof(void*);
trimmed = rax_malloc(nodesize);
}
// 未匹配部分长度大于0,分配空间
if (postfixlen) {
nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+
sizeof(raxNode*);
postfix = rax_malloc(nodesize);
}
/* OOM? Abort now that the tree is untouched. */
// 分配失败
if (splitnode == NULL ||
(trimmedlen && trimmed == NULL) ||
(postfixlen && postfix == NULL))
{
rax_free(splitnode);
rax_free(trimmed);
rax_free(postfix);
errno = ENOMEM;
return 0;
}
splitnode->data[0] = h->data[j];
// j == 0,意味着压缩节点只需拆分成两部分,split node 与 后缀部分已经拆分了,因此 parentlink 将指向 split node
if (j == 0) {
// 3a. 仔细想想?原压缩节点 iskey, 到这一步,split node 同样 iskey,因此将数据拷贝过去
if (h->iskey) {
void *ndata = raxGetData(h);
raxSetData(splitnode,ndata);
}
memcpy(parentlink,&splitnode,sizeof(splitnode));
} else {
// 3b. 切割公共前缀,也就意味着,压缩节点将被分成三部分
trimmed->size = j;
memcpy(trimmed->data,h->data,j);
trimmed->iscompr = j > 1 ? 1 : 0;
trimmed->iskey = h->iskey;
trimmed->isnull = h->isnull;
if (h->iskey && !h->isnull) {
void *ndata = raxGetData(h);
raxSetData(trimmed,ndata);
}
raxNode **cp = raxNodeLastChildPtr(trimmed);
memcpy(cp,&splitnode,sizeof(splitnode));
memcpy(parentlink,&trimmed,sizeof(trimmed));
parentlink = cp; /* Set parentlink to splitnode parent. */
rax->numnodes++;
}
// 4. 创建 postfix 节点,即原压缩节点未匹配的后缀部分
if (postfixlen) {
/* 4a: create a postfix node. */
postfix->iskey = 0;
postfix->isnull = 0;
postfix->size = postfixlen;
postfix->iscompr = postfixlen > 1;
memcpy(postfix->data,h->data+j+1,postfixlen);
raxNode **cp = raxNodeLastChildPtr(postfix);
memcpy(cp,&next,sizeof(next));
rax->numnodes++;
} else {
/* 4b: just use next as postfix node. */
postfix = next;
}
// 5. 设置 postfix 节点作为 splitnode 的第一个子节点
raxNode **splitchild = raxNodeLastChildPtr(splitnode);
memcpy(splitchild,&postfix,sizeof(postfix));
// 6. 继续插入。以上只处理了原压缩节点,对于新插入key, 未匹配部分还未插入,源码稍后介绍;这里插入之后,splitnode将会多一个子节点
rax_free(h);
h = splitnode;
}
算法2:应对场景5,即在压缩节点某个位置处,完成了字符串 s 的匹配,此时,只需将压缩节点拆分成两部分即可。
例如,rax树中存在节点 “ANNIBALE”,将插入 “ANNI”,此时 SPLITPOS = 4。
源码实现如下:
else if (h->iscompr && i == len) {
/* ------------------------- ALGORITHM 2 --------------------------- */
// 切割并分配 postfix 和 trimmed 节点
size_t postfixlen = h->size - j;
size_t nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+
sizeof(raxNode*);
if (data != NULL) nodesize += sizeof(void*);
raxNode *postfix = rax_malloc(nodesize);
nodesize = sizeof(raxNode)+j+raxPadding(j)+sizeof(raxNode*);
if (h->iskey && !h->isnull) nodesize += sizeof(void*);
raxNode *trimmed = rax_malloc(nodesize);
// 分配失败
if (postfix == NULL || trimmed == NULL) {
rax_free(postfix);
rax_free(trimmed);
errno = ENOMEM;
return 0;
}
/* 1: Save next pointer. */
raxNode **childfield = raxNodeLastChildPtr(h);
raxNode *next;
memcpy(&next,childfield,sizeof(next));
/* 2: Create the postfix node. */
postfix->size = postfixlen;
postfix->iscompr = postfixlen > 1;
postfix->iskey = 1;
postfix->isnull = 0;
memcpy(postfix->data,h->data+j,postfixlen);
raxSetData(postfix,data);
raxNode **cp = raxNodeLastChildPtr(postfix);
memcpy(cp,&next,sizeof(next));
rax->numnodes++;
/* 3: Trim the compressed node. */
trimmed->size = j;
trimmed->iscompr = j > 1;
trimmed->iskey = 0;
trimmed->isnull = 0;
memcpy(trimmed->data,h->data,j);
memcpy(parentlink,&trimmed,sizeof(trimmed));
if (h->iskey) {
void *aux = raxGetData(h);
raxSetData(trimmed,aux);
}
/* Fix the trimmed node child pointer to point to
* the postfix node. */
cp = raxNodeLastChildPtr(trimmed);
memcpy(cp,&postfix,sizeof(postfix));
/* Finish! We don't need to continue with the insertion
* algorithm for ALGO 2. The key is already inserted. */
rax->numele++;
rax_free(h);
return 1; /* Key inserted. */
}
到这里,还有一点工作需要完成, 那就是新key未匹配的部分,还需要插入rax树,源码实现如下:
while(i < len) {
raxNode *child;
// 这里就是判断要处理成压缩节点还是非压缩节点
if (h->size == 0 && len-i > 1) {
debugf("Inserting compressed node\n");
size_t comprsize = len-i;
if (comprsize > RAX_NODE_MAX_SIZE)
comprsize = RAX_NODE_MAX_SIZE;
raxNode *newh = raxCompressNode(h,s+i,comprsize,&child);
if (newh == NULL) goto oom;
h = newh;
memcpy(parentlink,&h,sizeof(h));
parentlink = raxNodeLastChildPtr(h);
i += comprsize;
} else {
debugf("Inserting normal node\n");
raxNode **new_parentlink;
raxNode *newh = raxAddChild(h,s[i],&child,&new_parentlink);
if (newh == NULL) goto oom;
h = newh;
memcpy(parentlink,&h,sizeof(h));
parentlink = new_parentlink;
i++;
}
rax->numnodes++;
h = child;
}
raxNode *newh = raxReallocForData(h,data);
if (newh == NULL) goto oom;
h = newh;
if (!h->iskey) rax->numele++;
raxSetData(h,data);
memcpy(parentlink,&h,sizeof(h));
return 1; /* Element inserted. */
以上便是插入新key的全部过程。可以看出,实现上根据列举的5种案例进行处理。
rax 的删除操作主要有3个接口,可以删除 rax 中的某个 key,或者释放整个 rax,在释放 rax 时,还可以设置释放回调函数,在释放rax 的每个 key 时,都会调用这个回调函数,3个接口的定义如下:
// 在 rax 中删除长度为 len 的 s
int raxRemove(rax *rax, unsigned char *s, size_t len, void **old);
// 释放 rax
void raxFree(rax *rax);
// 释放 rax, 释放每个 key 时,都会调用 free_callback 方法
void raxFreeWithCallback(rax *rax, void (*free_callback)(void*));
其中,rax 的释放操作,采用的是深度优先算法。
下面重点介绍 raxRemove 方法:
第一步:当删除 rax 中的某个 key-value 对时,首先查找 key 是否存在,不存在则直接返回,存在则需要进行删除操作,实现如下:
raxNode *h;
raxStack ts;
raxStackInit(&ts); // 保存路径
int splitpos = 0;
size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,&ts);
if (i != len || (h->iscompr && splitpos != 0) || !h->iskey) { // 未找到目标 key, 直接退出
raxStackFree(&ts);
return 0;
}
第二步,如果 key 存在,则需要进行删除操作,删除操作完成后,rax 树可能需要进行压缩。具体可以分为下面 2 种情况,此处所说的压缩是指将某个节点与其子节点压缩成一个节点,叶子节点没有子节点,不能进行压缩。
案例1:
假设 rax 树存储了 “FOO” = 1 和 “FOOBAR” = 2,结构如下:
"FOO" -> "BAR" -> [] (2)
(1)
删除 “FOO” ,压缩后,结构如下:
"FOOBAR" -> [] (2)
案例2:
假设 rax 树存储了 “FOOBAR” = 1 和 “FOOTER” = 2
|B| -> "AR" -> [] (1)
"FOO" -> |-|
|T| -> "ER" -> [] (2)
删除 “FOOTER” 后,结构如下:
"FOO" -> |B| -> "AR" -> [] (1)
可以压缩成的结果如下:
"FOOBAR" -> [] (1)
删除操作具体可以分为2个阶段,删除阶段以及压缩阶段:
1)删除阶段:
利用查找key时记录的匹配路径,依次向上直到无法删除为止。
if (h->size == 0) {
debugf("Key deleted in node without children. Cleanup needed.\n");
raxNode *child = NULL;
while(h != rax->head) {
child = h;
debugf("Freeing child %p [%.*s] key:%d\n", (void*)child,
(int)child->size, (char*)child->data, child->iskey);
rax_free(child);
rax->numnodes--;
h = raxStackPop(&ts);
// 如果节点为 key,或者子节点 size != 1,则无法删除
if (h->iskey || (!h->iscompr && h->size != 1)) break;
}
if (child) {
raxNode *new = raxRemoveChild(h,child);
if (new != h) {
raxNode *parent = raxStackPeek(&ts);
raxNode **parentlink;
if (parent == NULL) {
parentlink = &rax->head;
} else {
parentlink = raxFindParentLink(parent,h);
}
memcpy(parentlink,&new,sizeof(new));
}
// 删除后尝试进行压缩
if (new->size == 1 && new->iskey == 0) {
trycompress = 1;
h = new;
}
}
} else if (h->size == 1) {
// 可以尝试进行压缩
trycompress = 1;
}
2)压缩阶段: 压缩过程可以细化为2步。
① 找到可以进行压缩的第一个元素,之后将所有可进行压缩的节点进行压缩。由于raxRowWalk函数已经记录了查找key的过程,压缩时只需从记录栈中不断弹出元素,即可找到可进行压缩的第一个元素,过程如下:
raxNode *parent;
while(1) {
parent = raxStackPop(&ts);
if (!parent || parent->iskey ||
(!parent->iscompr && parent->size != 1)) break;
h = parent;
}
// 可以进行压缩的第一个节点
raxNode *start = h;
② 找到第一个可压缩节点后,进行数据压缩。由于可压缩的节点都只有一个子节点,压缩过程只需要读取每个节点的内容,创建新的节点,并填充新节点的内容即可:
/* Scan chain of nodes we can compress. */
size_t comprsize = h->size;
int nodes = 1;
while(h->size != 0) {
raxNode **cp = raxNodeLastChildPtr(h);
memcpy(&h,cp,sizeof(h));
if (h->iskey || (!h->iscompr && h->size != 1)) break;
/* Stop here if going to the next node would result into
* a compressed node larger than h->size can hold. */
if (comprsize + h->size > RAX_NODE_MAX_SIZE) break;
nodes++;
comprsize += h->size;
}
if (nodes > 1) {
/* If we can compress, create the new node and populate it. */
size_t nodesize =
sizeof(raxNode)+comprsize+raxPadding(comprsize)+sizeof(raxNode*);
raxNode *new = rax_malloc(nodesize);
/* An out of memory here just means we cannot optimize this
* node, but the tree is left in a consistent state. */
if (new == NULL) {
raxStackFree(&ts);
return 1;
}
new->iskey = 0;
new->isnull = 0;
new->iscompr = 1;
new->size = comprsize;
rax->numnodes++;
/* Scan again, this time to populate the new node content and
* to fix the new node child pointer. At the same time we free
* all the nodes that we'll no longer use. */
comprsize = 0;
h = start;
while(h->size != 0) {
memcpy(new->data+comprsize,h->data,h->size);
comprsize += h->size;
raxNode **cp = raxNodeLastChildPtr(h);
raxNode *tofree = h;
memcpy(&h,cp,sizeof(h));
rax_free(tofree); rax->numnodes--;
if (h->iskey || (!h->iscompr && h->size != 1)) break;
}
/* Now 'h' points to the first node that we still need to use,
* so our new node child pointer will point to it. */
raxNode **cp = raxNodeLastChildPtr(new);
memcpy(cp,&h,sizeof(h));
/* Fix parent link. */
if (parent) {
raxNode **parentlink = raxFindParentLink(parent,start);
memcpy(parentlink,&new,sizeof(new));
} else {
rax->head = new;
}
}
为了能够遍历 rax 中所有的 key, Redis 提供了迭代器操作。Redis 中实现的迭代器为双向迭代器,可以向前,也可以向后,顺序是按照 key 的字典序排列的。通过 rax 的结构图可以看出,如果某个节点为 key,则其子节点的 key 按照字典序比该节点的 key 大。另外,如果当前节点为非压缩节点,则其最左侧节点的key是其所有子节点的 key 中最小的。主要方法有:
void raxStart(raxIterator *it, rax *rt);
int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);
int raxNext(raxIterator *it);
int raxPrev(raxIterator *it);
1)raxStart 用于初始化 raxIterator 结构:
/* Initialize a Rax iterator. This call should be performed a single time
* to initialize the iterator, and must be followed by a raxSeek() call,
* otherwise the raxPrev()/raxNext() functions will just return EOF. */
void raxStart(raxIterator *it, rax *rt) {
it->flags = RAX_ITER_EOF; /* No crash if the iterator is not seeked. */
it->rt = rt;
it->key_len = 0;
it->key = it->key_static_string;
it->key_max = RAX_ITER_STATIC_LEN;
it->data = NULL;
it->node_cb = NULL;
raxStackInit(&it->stack);
}
2)raxSeek,在raxStart初始化迭代器后,必须调用raxSeek函数初始化迭代器的位置。
int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);
查找末尾元素可以直接在Rax中找到最右侧的叶子节点,查找首个元素被转换为查找大于等于空的操作。
处理大于、小于、等于等操作主要分为以下几步:
3)raxNext & raxPrev,主要用于迭代上一个或者下一个key
/* Go to the next element in the scope of the iterator 'it'.
* If EOF (or out of memory) is reached, 0 is returned, otherwise 1 is
* returned. In case 0 is returned because of OOM, errno is set to ENOMEM. */
int raxNext(raxIterator *it) {
if (!raxIteratorNextStep(it,0)) {
errno = ENOMEM;
return 0;
}
if (it->flags & RAX_ITER_EOF) {
errno = 0;
return 0;
}
return 1;
}
/* Go to the previous element in the scope of the iterator 'it'.
* If EOF (or out of memory) is reached, 0 is returned, otherwise 1 is
* returned. In case 0 is returned because of OOM, errno is set to ENOMEM. */
int raxPrev(raxIterator *it) {
if (!raxIteratorPrevStep(it,0)) {
errno = ENOMEM;
return 0;
}
if (it->flags & RAX_ITER_EOF) {
errno = 0;
return 0;
}
return 1;
}
4)raxStop & raxEOF:
raxEOF用于标识迭代器迭代结束,raxStop用于结束迭代并释放相关资源:
/* Free the iterator. */
void raxStop(raxIterator *it) {
if (it->key != it->key_static_string) rax_free(it->key);
raxStackFree(&it->stack);
}
/* Return if the iterator is in an EOF state. This happens when raxSeek()
* failed to seek an appropriate element, so that raxNext() or raxPrev()
* will return zero, or when an EOF condition was reached while iterating
* with raxNext() and raxPrev(). */
int raxEOF(raxIterator *it) {
return it->flags & RAX_ITER_EOF;
}
首先,要理解 rax 底层实现,第一步需要理解 rax 的结构;其次,要理解 压缩节点 和 非压缩节点 的区别
来,再回顾下 压缩节点 和 非压缩节点 的区别:
它们的相同之处在于:
不同之处在于:
相关参考: