现在用到的cuckoo hash算法比较多,下面具体分析在dpdk代码中cuckoo实现,在lib/librte_hash/下有其他若干种hash就不一一介绍了,比较简单,先文字介绍下bloom filter和cuckoo hash。
bloom filter:“似于bitmap这样的hashset,所以空间利用率很高。其独特的地方在于它使用多个哈希函数来避免哈希碰撞”,“带来的问题:一个是误报(false positives),在查询时能提供“一定不存在”,但只能提供“可能存在”,因为存在其它元素被映射到部分相同bit位上,导致该位置1,那么一个不存在的元素可能会被误报成存在;另一个是漏报(false nagatives),同样道理,如果删除了某个元素,导致该映射bit位被置0,那么本来存在的元素会被漏报成不存在。由于后者问题严重得多,所以bloom filter必须确保“definitely no”从而容忍“probably yes”,不允许元素的删除。”
在一些场景中比如区块链中交易和区块数据的快速判断,使用数组只存储key不存储数据,根据交易和区块的sha256哈希值快速判断当前需要同步给对等节点的交易数据和区块数据是否已同步过,这样虽然可能存在漏报,即交易a和交易c映射的bit置为1后,同步a,c,后面来了交易b,然后映射后的位置bit都为1误以为已经同步过,故不同步,当然这些不成问题,会由其他的对等节点同步。截取bitcoin/src/bloom.h中CBloomFilter类部分声明:
44 class CBloomFilter
45 {
46 private:
47 std::vector<unsigned char> vData;
48 bool isFull;
49 bool isEmpty;
50 unsigned int nHashFuncs;
51 unsigned int nTweak;
52 unsigned char nFlags;
cuckoo hash:“哈希函数是成对的,每一个元素都有两个,分别映射到两个位置,一个是记录的位置,另一个是备用位置。这个备用位置是处理碰撞时用的。cuckoo hashing处理碰撞的方法,就是把原来占用位置的这个元素踢走,被踢出去的元素有一个备用位置可以安置,如果备用位置上还有元素,再把它踢走,如此往复。直到被踢的次数达到一个上限,才确认哈希表已满,并执行rehash操作。” 下面开始真正分析源码中的实现。
先来看看cuckoo中key的比较函数,求key的hash函数原型和hash表结构:
66 /** Signature of key that is stored internally. */
67 typedef uint32_t hash_sig_t;
68
69 /** Type of function that can be used for calculating the hash value. */
70 typedef uint32_t (*rte_hash_function)(const void *key, uint32_t key_len, uint32_t init_val);
72
73 /** Type of function used to compare the hash key. */
74 typedef int (*rte_hash_cmp_eq_t)(const void *key1, const void *key2, size_t key_len);
183 /** A hash table structure. */
184 struct rte_hash {
185 char name[RTE_HASH_NAMESIZE]; /**< Name of the hash. */
186 uint32_t entries; /**< Total table entries. */
187 uint32_t num_buckets; /**< Number of buckets in table. */
188 uint32_t key_len; /**< Length of hash key. */
189 rte_hash_function hash_func; /**< Function used to calculate hash. */
190 uint32_t hash_func_init_val; /**< Init value used by hash_func. */
191 rte_hash_cmp_eq_t rte_hash_custom_cmp_eq;
192 /**< Custom function used to compare keys. */
193 enum cmp_jump_table_case cmp_jump_table_idx;
194 /**< Indicates which compare function to use. */
195 uint32_t bucket_bitmask; /**< Bitmask for getting bucket index
196 from hash signature. */
197 uint32_t key_entry_size; /**< Size of each key entry. */
198
199 struct rte_ring *free_slots; /**< Ring that stores all indexes
200 of the free slots in the key table */
201 void *key_store; /**< Table storing all keys and data */
202 struct rte_hash_bucket *buckets; /**< Table with buckets storing all the
203 hash values and key indexes
204 to the key table*/
212 } __rte_cache_aligned;
165 /* Structure that stores key-value pair */
166 struct rte_hash_key {
167 union {
168 uintptr_t idata;
169 void *pdata;
170 };
171 /* Variable key size */
172 char key[0];
173 } __attribute__((aligned(KEY_ALIGNMENT)));
154 /* Structure storing both primary and secondary hashes */
155 struct rte_hash_signatures {
156 union {
157 struct {
158 hash_sig_t current;
159 hash_sig_t alt;
160 };
161 uint64_t sig;
162 };
163 };
175 /** Bucket structure */
176 struct rte_hash_bucket {
177 struct rte_hash_signatures signatures[RTE_HASH_BUCKET_ENTRIES];
178 /* Includes dummy key index that always contains index 0 */
179 uint32_t key_idx[RTE_HASH_BUCKET_ENTRIES + 1];
180 uint8_t flag[RTE_HASH_BUCKET_ENTRIES];
181 } __rte_cache_aligned;
其中rte_hash_custom_cmp_eq和cmp_jump_table_idx是用于指定哪种类型的key比较函数,用户也可以自己实现,其他字段会在下面使用到的地方作说明,这里仅考虑单线程操作hash表,即线程a不会去操作线程b的hash表[设计错误],也不会出现线程a和b操作hash表[有锁费性能,尽量充分理解业务选择适合的方案]。截取部分如下:
72 * based on the key size and custom function.
73 */
74 enum cmp_jump_table_case {
75 KEY_CUSTOM = 0,
76 KEY_16_BYTES,
77 KEY_32_BYTES,
78 //more
86 };
87
88 /*
89 * Table storing all different key compare functions
90 * (multi-process supported)
91 */
92 const rte_hash_cmp_eq_t cmp_jump_table[NUM_KEY_CMP_CASES] = {
93 NULL,
94 rte_hash_k16_cmp_eq,
95 rte_hash_k32_cmp_eq,
96 //more
97 };
34 /* Functions to compare multiple of 16 byte keys (up to 128 bytes) */
35 static int
36 rte_hash_k16_cmp_eq(const void *key1, const void *key2,
37 size_t key_len __rte_unused)
38 {
39 uint64_t x0, x1, y0, y1;
40
41 asm volatile(
42 "ldp %x[x1], %x[x0], [%x[p1]]"
43 : [x1]"=r"(x1), [x0]"=r"(x0)
44 : [p1]"r"(key1)
45 );
46 asm volatile(
47 "ldp %x[y1], %x[y0], [%x[p2]]"
48 : [y1]"=r"(y1), [y0]"=r"(y0)
49 : [p2]"r"(key2)
50 );
51 x0 ^= y0;
52 x1 ^= y1;
53 return !(x0 == 0 && x1 == 0);
54 }
55
56 static int
57 rte_hash_k32_cmp_eq(const void *key1, const void *key2, size_t key_len)
58 {
59 return rte_hash_k16_cmp_eq(key1, key2, key_len) ||
60 rte_hash_k16_cmp_eq((const char *) key1 + 16,
61 (const char *) key2 + 16, key_len);
62 }
以下代码分析省略了些细节。
创建[实现200多行]:
113 struct rte_hash *
114 rte_hash_create(const struct rte_hash_parameters *params)
115 {
116 struct rte_hash *h = NULL;
117 struct rte_tailq_entry *te = NULL;
118 struct rte_hash_list *hash_list;
119 struct rte_ring *r = NULL;
120 char hash_name[RTE_HASH_NAMESIZE];
121 void *k = NULL;
122 void *buckets = NULL;
123 char ring_name[RTE_RING_NAMESIZE];
124 unsigned num_key_slots;
125 unsigned hw_trans_mem_support = 0;
126 unsigned i;
127
128 hash_list = RTE_TAILQ_CAST(rte_hash_tailq.head, rte_hash_list);
159 num_key_slots = params->entries + 1;
160
161 snprintf(ring_name, sizeof(ring_name), "HT_%s", params->name);
162 /* Create ring (Dummy slot index is not enqueued) */
163 r = rte_ring_create(ring_name, rte_align32pow2(num_key_slots - 1),
164 params->socket_id, 0);
165 if (r == NULL) {
166 RTE_LOG(ERR, HASH, "memory allocation failed\n");
167 goto err;
168 }
169
170 snprintf(hash_name, sizeof(hash_name), "HT_%s", params->name);
171
172 rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
173
174 /* guarantee there's no existing: this is normally already checked
175 * by ring creation above */
176 TAILQ_FOREACH(te, hash_list, next) {
177 h = (struct rte_hash *) te->data;
178 if (strncmp(params->name, h->name, RTE_HASH_NAMESIZE) == 0)
179 break;
180 }
181 h = NULL;
182 if (te != NULL) {
183 rte_errno = EEXIST;
184 te = NULL;
185 goto err_unlock;
186 }
187
188 te = rte_zmalloc("HASH_TAILQ_ENTRY", sizeof(*te), 0);
189 if (te == NULL) {
190 RTE_LOG(ERR, HASH, "tailq entry allocation failed\n");
191 goto err_unlock;
192 }
193
194 h = (struct rte_hash *)rte_zmalloc_socket(hash_name, sizeof(struct rte_hash),
195 RTE_CACHE_LINE_SIZE, params->socket_id);
196
197 if (h == NULL) {
198 RTE_LOG(ERR, HASH, "memory allocation failed\n");
199 goto err_unlock;
200 }
201
202 const uint32_t num_buckets = rte_align32pow2(params->entries)
203 / RTE_HASH_BUCKET_ENTRIES;
204
205 buckets = rte_zmalloc_socket(NULL,
206 num_buckets * sizeof(struct rte_hash_bucket),
207 RTE_CACHE_LINE_SIZE, params->socket_id);
208
209 if (buckets == NULL) {
210 RTE_LOG(ERR, HASH, "memory allocation failed\n");
211 goto err_unlock;
212 }
213
214 const uint32_t key_entry_size = sizeof(struct rte_hash_key) + params->key_len;
215 const uint64_t key_tbl_size = (uint64_t) key_entry_size * num_key_slots;
216
217 k = rte_zmalloc_socket(NULL, key_tbl_size,
218 RTE_CACHE_LINE_SIZE, params->socket_id);
219
220 if (k == NULL) {
221 RTE_LOG(ERR, HASH, "memory allocation failed\n");
222 goto err_unlock;
223 }
270 /* Setup hash context */
271 snprintf(h->name, sizeof(h->name), "%s", params->name);
272 h->entries = params->entries;
273 h->key_len = params->key_len;
274 h->key_entry_size = key_entry_size;
275 h->hash_func_init_val = params->hash_func_init_val;
276
277 h->num_buckets = num_buckets;
278 h->bucket_bitmask = h->num_buckets - 1;
279 h->buckets = buckets;
280 h->hash_func = (params->hash_func == NULL) ?
281 DEFAULT_HASH_FUNC : params->hash_func;
282 h->key_store = k;
283 h->free_slots = r;
302 /* Populate free slots ring. Entry zero is reserved for key misses. */
303 for (i = 1; i < params->entries + 1; i++)
304 rte_ring_sp_enqueue(r, (void *)((uintptr_t) i));
305
306 te->data = (void *) h;
307 TAILQ_INSERT_TAIL(hash_list, te, next);
308 rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
309
310 return h;
320 }
132 /** Number of items per bucket. */
133 #define RTE_HASH_BUCKET_ENTRIES 4
以上创建过程主要包括:设置key-value的个数num_key_slots;创建无锁环形buffer;加写锁先判断是否已存在名字一样的hash表,如果有的话则跳转至err_unlock,释放写锁和相关的资源[代码被省略],没有的话创建并关联hash表;每个桶可以容纳4个数据[至于为什么是4而不是其他值,会在最后说明],计算多少个桶并申请空间用于存储hash值和状态等信息;计算存储num_key_slots个key_entry_size 的空间用于存储key和value的指针[这里桶里并不存储实际的数据];最后给hash表各个变量关联;把hash表加入到全局hash_list中并释放写锁;
查找:
698 static inline int32_t
699 __rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
700 hash_sig_t sig, void **data)
701 {
702 uint32_t bucket_idx;
703 hash_sig_t alt_hash;
704 unsigned i;
705 struct rte_hash_bucket *bkt;
706 struct rte_hash_key *k, *keys = h->key_store;
707
708 bucket_idx = sig & h->bucket_bitmask;
709 bkt = &h->buckets[bucket_idx];
710
711 /* Check if key is in primary location */
712 for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
713 if (bkt->signatures[i].current == sig &&
714 bkt->signatures[i].sig != NULL_SIGNATURE) {
715 k = (struct rte_hash_key *) ((char *)keys +
716 bkt->key_idx[i] * h->key_entry_size);
717 if (rte_hash_cmp_eq(key, k->key, h) == 0) {
718 if (data != NULL)
719 *data = k->pdata;
720 /*
721 * Return index where key is stored,
722 * substracting the first dummy index
723 */
724 return bkt->key_idx[i] - 1;
725 }
726 }
727 }
728
729 /* Calculate secondary hash */
730 alt_hash = rte_hash_secondary_hash(sig);
731 bucket_idx = alt_hash & h->bucket_bitmask;
732 bkt = &h->buckets[bucket_idx];
733
734 /* Check if key is in secondary location */
735 for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
736 if (bkt->signatures[i].current == alt_hash &&
737 bkt->signatures[i].alt == sig) {
738 k = (struct rte_hash_key *) ((char *)keys +
739 bkt->key_idx[i] * h->key_entry_size);
740 if (rte_hash_cmp_eq(key, k->key, h) == 0) {
741 if (data != NULL)
742 *data = k->pdata;
743 /*
744 * Return index where key is stored,
745 * substracting the first dummy index
746 */
747 return bkt->key_idx[i] - 1;
748 }
749 }
750 }
751
752 return -ENOENT;
753 }
根据sig值[key的hash值]索引到可能在哪个桶,然后先在primary location试着查找,一共可能要比较RTE_HASH_BUCKET_ENTRIES次,如果桶上的hash有效且等于该hash值,那么根据索引和key-value大小计算出该key-value位置,并对key的内容进行比较,相同的话则取得指向实际数据的指针;没有找到对sig再求hash,在secondary location位置上找,过程同上,找不到就返回-ENOENT;
删除:
813 static inline int32_t
814 __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
815 hash_sig_t sig)
816 {
817 uint32_t bucket_idx;
818 hash_sig_t alt_hash;
819 unsigned i;
820 struct rte_hash_bucket *bkt;
821 struct rte_hash_key *k, *keys = h->key_store;
822 int32_t ret;
823
824 bucket_idx = sig & h->bucket_bitmask;
825 bkt = &h->buckets[bucket_idx];
826
827 /* Check if key is in primary location */
828 for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
829 if (bkt->signatures[i].current == sig &&
830 bkt->signatures[i].sig != NULL_SIGNATURE) {
831 k = (struct rte_hash_key *) ((char *)keys +
832 bkt->key_idx[i] * h->key_entry_size);
833 if (rte_hash_cmp_eq(key, k->key, h) == 0) {
834 remove_entry(h, bkt, i);
835
836 /*
837 * Return index where key is stored,
838 * substracting the first dummy index
839 */
840 ret = bkt->key_idx[i] - 1;
841 bkt->key_idx[i] = 0;
842 return ret;
843 }
844 }
845 }
846
847 /* Calculate secondary hash */
848 alt_hash = rte_hash_secondary_hash(sig);
849 bucket_idx = alt_hash & h->bucket_bitmask;
850 bkt = &h->buckets[bucket_idx];
851
852 /* Check if key is in secondary location */
853 for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
854 if (bkt->signatures[i].current == alt_hash &&
855 bkt->signatures[i].sig != NULL_SIGNATURE) {
856 k = (struct rte_hash_key *) ((char *)keys +
857 bkt->key_idx[i] * h->key_entry_size);
858 if (rte_hash_cmp_eq(key, k->key, h) == 0) {
859 remove_entry(h, bkt, i);
860
861 /*
862 * Return index where key is stored,
863 * substracting the first dummy index
864 */
865 ret = bkt->key_idx[i] - 1;
866 bkt->key_idx[i] = 0;
867 return ret;
868 }
869 }
870 }
871
872 return -ENOENT;
873 }
785 static inline void
786 remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
787 {
808 rte_ring_sp_enqueue(h->free_slots,
809 (void *)((uintptr_t)bkt->key_idx[i]));
811 }
删除过程同查找有一大部分相同的,primary location找不到在secondary location上找,在找到对应的数据后,从桶中删除元素[并非删除指针指向的数据,交给用户自己处理];这里remove_entry把索引值强转成void *压入ring buffer,作用会在hash表插入过程中说明;
插入:
493 static inline int32_t
494 __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
495 hash_sig_t sig, void *data)
496 {
497 hash_sig_t alt_hash;
498 uint32_t prim_bucket_idx, sec_bucket_idx;
499 unsigned i;
500 struct rte_hash_bucket *prim_bkt, *sec_bkt;
501 struct rte_hash_key *new_k, *k, *keys = h->key_store;
502 void *slot_id = NULL;
503 uint32_t new_idx;
504 int ret;
505 unsigned n_slots;
506 unsigned lcore_id;
508
512 prim_bucket_idx = sig & h->bucket_bitmask;
513 prim_bkt = &h->buckets[prim_bucket_idx];
514 rte_prefetch0(prim_bkt);
515
516 alt_hash = rte_hash_secondary_hash(sig);
517 sec_bucket_idx = alt_hash & h->bucket_bitmask;
518 sec_bkt = &h->buckets[sec_bucket_idx];
519 rte_prefetch0(sec_bkt);
540 if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0)
541 return -ENOSPC;
544 new_k = RTE_PTR_ADD(keys, (uintptr_t)slot_id * h->key_entry_size);
545 rte_prefetch0(new_k);
546 new_idx = (uint32_t)((uintptr_t) slot_id);
547
548 /* Check if key is already inserted in primary location */
549 for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
550 if (prim_bkt->signatures[i].current == sig &&
551 prim_bkt->signatures[i].alt == alt_hash) {
552 k = (struct rte_hash_key *) ((char *)keys +
553 prim_bkt->key_idx[i] * h->key_entry_size);
554 if (rte_hash_cmp_eq(key, k->key, h) == 0) {
555 /* Enqueue index of free slot back in the ring. */
556 enqueue_slot_back(h, cached_free_slots, slot_id);
557 /* Update data */
558 k->pdata = data;
559 /*
560 * Return index where key is stored,
561 * substracting the first dummy index
562 */
563 return prim_bkt->key_idx[i] - 1;
564 }
565 }
566 }
568 /* Check if key is already inserted in secondary location */
569 for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
570 if (sec_bkt->signatures[i].alt == sig &&
571 sec_bkt->signatures[i].current == alt_hash) {
572 k = (struct rte_hash_key *) ((char *)keys +
573 sec_bkt->key_idx[i] * h->key_entry_size);
574 if (rte_hash_cmp_eq(key, k->key, h) == 0) {
575 /* Enqueue index of free slot back in the ring. */
576 enqueue_slot_back(h, cached_free_slots, slot_id);
577 /* Update data */
578 k->pdata = data;
579 /*
580 * Return index where key is stored,
581 * substracting the first dummy index
582 */
583 return sec_bkt->key_idx[i] - 1;
584 }
585 }
586 }
587
588 /* Copy key */
589 rte_memcpy(new_k->key, key, h->key_len);
590 new_k->pdata = data;
591
614 for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
615 /* Check if slot is available */
616 if (likely(prim_bkt->signatures[i].sig == NULL_SIGNATURE)) {
617 prim_bkt->signatures[i].current = sig;
618 prim_bkt->signatures[i].alt = alt_hash;
619 prim_bkt->key_idx[i] = new_idx;
620 break;
621 }
622 }
623
624 if (i != RTE_HASH_BUCKET_ENTRIES) {
627 return new_idx - 1;
628 }
636 ret = make_space_bucket(h, prim_bkt);
637 if (ret >= 0) {
638 prim_bkt->signatures[ret].current = sig;
639 prim_bkt->signatures[ret].alt = alt_hash;
640 prim_bkt->key_idx[ret] = new_idx;
643 return new_idx - 1;
644 }
648 /* Error in addition, store new slot back in the ring and return error */
649 enqueue_slot_back(h, cached_free_slots, (void *)((uintptr_t) new_idx));
653 return ret;
654 }
以上代码删除了一些根据编译设置不同而相关的代码[对齐不太好办],应该不影响分析。这段代码先计算出primary和secondary桶,然后进行预取;然后检查free_slots是否还有空闲,对应着hash表结构的注释“Ring that stores all indexes of the free slots in the key table”,和删除一个元素后的操作remove_entry,只是把hash表key_idx的索引值压入buffer,后期插入的时候需要获得一个;然后查找主和备,如果存在key一样的则更新data,否则就拷贝key和保存data地址,行589〜590;以上查找primary location时的代码段和secondary location的判断条件:
if (prim_bkt->signatures[i].current == sig && prim_bkt->signatures[i].alt == alt_hash)
if (sec_bkt->signatures[i].current == alt_hash && sec_bkt->signatures[i].alt == sig)
可以思考下“为什么这么写?如果只判断第一个条件而不加上第二个条件会有什么问题?两个if第一个等号左值不一样?”;然后修改primary桶元素信息,614〜622这里有空位置插入成功则返回否则进入make_space_bucket函数:
409 static inline int
410 make_space_bucket(const struct rte_hash *h, struct rte_hash_bucket *bkt)
411 {
412 static unsigned int nr_pushes;
413 unsigned i, j;
414 int ret;
415 uint32_t next_bucket_idx;
416 struct rte_hash_bucket *next_bkt[RTE_HASH_BUCKET_ENTRIES];
417
418 /*
419 * Push existing item (search for bucket with space in
420 * alternative locations) to its alternative location
421 */
422 for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
423 /* Search for space in alternative locations */
424 next_bucket_idx = bkt->signatures[i].alt & h->bucket_bitmask;
425 next_bkt[i] = &h->buckets[next_bucket_idx];
426 for (j = 0; j < RTE_HASH_BUCKET_ENTRIES; j++) {
427 if (next_bkt[i]->signatures[j].sig == NULL_SIGNATURE)
428 break;
429 }
430
431 if (j != RTE_HASH_BUCKET_ENTRIES)
432 break;
433 }
435 /* Alternative location has spare room (end of recursive function) */
436 if (i != RTE_HASH_BUCKET_ENTRIES) {
437 next_bkt[i]->signatures[j].alt = bkt->signatures[i].current;
438 next_bkt[i]->signatures[j].current = bkt->signatures[i].alt;
439 next_bkt[i]->key_idx[j] = bkt->key_idx[i];
440 return i;
441 }
442
443 /* Pick entry that has not been pushed yet */
444 for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++)
445 if (bkt->flag[i] == 0)
446 break;
447
448 /* All entries have been pushed, so entry cannot be added */
449 if (i == RTE_HASH_BUCKET_ENTRIES || nr_pushes > RTE_HASH_MAX_PUSHES)
450 return -ENOSPC;
451
452 /* Set flag to indicate that this entry is going to be pushed */
453 bkt->flag[i] = 1;
454
455 nr_pushes++;
456 /* Need room in alternative bucket to insert the pushed entry */
457 ret = make_space_bucket(h, next_bkt[i]);
458 /*
459 * After recursive function.
460 * Clear flags and insert the pushed entry
461 * in its alternative location if successful,
462 * or return error
463 */
464 bkt->flag[i] = 0;
465 nr_pushes = 0;
466 if (ret >= 0) {
467 next_bkt[i]->signatures[ret].alt = bkt->signatures[i].current;
468 next_bkt[i]->signatures[ret].current = bkt->signatures[i].alt;
469 next_bkt[i]->key_idx[ret] = bkt->key_idx[i];
470 return i;
471 } else
472 return ret;
473
474 }
如果不能在primary location上插入的话,则尝试调用make_space_bucket在secondary location上进行首次插入(b);这个会引起refresh操作,即secondary location被占用了,然后相应的数据要被重新hash等,故会形成递归调用;以上实现大致是:422〜433行对该桶上的元数(a)的alt重新求索引到哪个桶,有空位置了break,为要插入的元素(b)让出位置;435〜441行更新(a)[由primary变成secondary,则secondary变成primary如此反复,即第一次插入是主,被踢一次变成备,再被踢一次变成主...],然后返回并更新(b);如果没有找到则要递归,在递归前需要标示什么时候hash表满了,返回-ENOSPC,444〜455行干了这个事;如果递归返回到464行清标示[要反返回成功要么-ENOSPC],并根据返回值进行更新(a),466〜470行干了这个事;最后结束,633〜644行更新(b);
在以上过程中refresh会在一定的条件下终止:nr_pushes > RTE_HASH_MAX_PUSHES[100]或某个桶上的slot都make_space_bucket过。
还有一些接口比较简单,基本都是以上增删查过程的补充,可以对着声明和注释看下该接口实现的功能。
整个过程梳理一下,总结下hash表中有几个关键性成员变量的作用:由于实际的数据并不存在于hash表中,但会拷贝key的数据,由rte_hash_key中key表示,只是存储了该数据起始地址,和key的地址;故key_store用来存储key和value的address,但是我怎么知道rte_hash_key存在哪个地方呢?是由rte_hash_bucket结构中key_idx的索引值乘以一个偏移量key_entry_size来决定的;其实由可以看出来k = (struct rte_hash_key *) ((char *)keys + bkt->key_idx[i] * h->key_entry_size);buckets用于表明key有没有存在,在创建的时候作用也说明了if (bkt->signatures[i].current == sig && bkt->signatures[i].sig != NULL_SIGNATURE);free_slots用于存储key_store哪些是空着的,毕竟key_store有元素的时候不一定是连续占位的。
原文链接:https://www.jianshu.com/p/b09b26c9052a
更多DPDK学习资料有需要的可以自行添加进入学习交流君 羊 793599096 免费获取,或自行报名学习,免费订阅,永久学习,关注我持续更新哦!!!
学习地址:http://ke.qq.com/course/5066203?flowToken=1043717