当前位置: 首页 > 工具软件 > MemoryPool > 使用案例 >

dpdk学习 memorypool

鲍宁
2023-12-01

1、dpdk学习 memorypool

Memory Pool是一种内存分配方式。使用malloc/free(new/delete)会涉及大量的系统调用,以及可能导致内存碎片(频繁申请小段内存)。内存碎片的解决方法有placement new以及内存池

内存池则是在真正使用内存之前,先申请分配一定数量的、大小相等(一般情况下)的内存块留作备用。当有新的内存需求时,就从内存池中分出一部分内存块,若内存块不够再继续申请新的内存。

内存池是池化技术的一种,诸如此类还行进程池,线程池,对象池,连接池,协程池等。原理都是先申请(从系统等),然后自行管理。

具体方法就是大小固定、提前申请、重复利用。因为内存的申请和释放是很低效的,所以我们只在开始时申请一块大的内存(在该块内存不够用时在二次分配),然后每次需要时都从这块内存中取出,并标记下这块内存被用了,释放时标记此内存被释放了。释放时,并不真的把内存释放给操作系统,只要在一大块内存都空闲的时候,才释放给操作系统。

在调用内存分配函数的时候,大部分时间所分配的内存大小都是一定的,所以可以采用每次都分配固定大小的内存块,这样就避免了内存碎片产生的可能。

得先看项目的目录结构了

--lib
----mempool 该目录的是mempool的声明
--drivers
----mempool 该目录的是mempool的底层的操作方法

然后先分析声明

/**
 * The RTE mempool structure.
 */
struct rte_mempool {
	char name[RTE_MEMPOOL_NAMESIZE]; /** 内存池的名字 */
	RTE_STD_C11
	union {
		void *pool_data;         /** 实际存储池子的地方 */
		uint64_t pool_id;        
	};
	void *pool_config;               /**< optional args for ops alloc. */
	const struct rte_memzone *mz;    /** 池子的memzone */
	unsigned int flags;              /** mempool状态. */
	int socket_id;                   /**< Socket id passed at create. */
	uint32_t size;                   /**< mempool的size */
	uint32_t cache_size;             /**< Size of per-lcore default local cache. */

	uint32_t elt_size;               /**< element的Size */
	uint32_t header_size;            /**< header的Size */
	uint32_t trailer_size;           /**< trailer的size 看代码是cookies使用 */

	unsigned private_data_size;      /** 私有属性的大小 */

	int32_t ops_index; // 这里存储的是实际的环状数组的操作方法

	struct rte_mempool_cache *local_cache; /** cache进行加速 */

	// 以下的需要进行分析
	uint32_t populated_size;         /**< Number of populated objects. */
	struct rte_mempool_objhdr_list elt_list; /**< List of objects in pool */
	uint32_t nb_mem_chunks;          /**< Number of memory chunks */
	struct rte_mempool_memhdr_list mem_list; /**< List of memory chunks */

#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
	/** Per-lcore statistics. */
	struct rte_mempool_debug_stats stats[RTE_MAX_LCORE];
#endif
}  __rte_cache_aligned;

1.1 mempool的创建

以下是创建mempool的大体流程

/* create the mempool */
struct rte_mempool *
rte_mempool_create(const char *name, unsigned n, unsigned elt_size,
	unsigned cache_size, unsigned private_data_size,
	rte_mempool_ctor_t *mp_init, void *mp_init_arg,
	rte_mempool_obj_cb_t *obj_init, void *obj_init_arg,
	int socket_id, unsigned flags)
{
	int ret;
	struct rte_mempool *mp;

	// 这里是创建memzone, cache 以及链接到全局的rte_mempool_tailq
	mp = rte_mempool_create_empty(name, n, elt_size, cache_size,
		private_data_size, socket_id, flags);
	if (mp == NULL)
		return NULL;

	/*
	 * Since we have 4 combinations of the SP/SC/MP/MC examine the flags to
	 * set the correct index into the table of ops structs.
	 */
	// 根据flag设置具体的queue操作
	if ((flags & RTE_MEMPOOL_F_SP_PUT) && (flags & RTE_MEMPOOL_F_SC_GET))
		ret = rte_mempool_set_ops_byname(mp, "ring_sp_sc", NULL);
	else if (flags & RTE_MEMPOOL_F_SP_PUT)
		ret = rte_mempool_set_ops_byname(mp, "ring_sp_mc", NULL);
	else if (flags & RTE_MEMPOOL_F_SC_GET)
		ret = rte_mempool_set_ops_byname(mp, "ring_mp_sc", NULL);
	else
		ret = rte_mempool_set_ops_byname(mp, "ring_mp_mc", NULL);

	if (ret)
		goto fail;

	/* call the mempool priv initializer */
	// mp初始化回调
	if (mp_init)
		mp_init(mp, mp_init_arg);

	// 这里会初始化ring等
	if (rte_mempool_populate_default(mp) < 0)
		goto fail;

	/* call the object initializers */
	// 调用obj的回调
	if (obj_init)
		rte_mempool_obj_iter(mp, obj_init, obj_init_arg);

	rte_mempool_trace_create(name, n, elt_size, cache_size,
		private_data_size, mp_init, mp_init_arg, obj_init,
		obj_init_arg, flags, mp);
	return mp;

 fail:
	rte_mempool_free(mp);
	return NULL;
}

rte_mempool_create_empty从内存管理里面申请内存。

/* create an empty mempool */
struct rte_mempool *
rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size,
	unsigned cache_size, unsigned private_data_size,
	int socket_id, unsigned flags)
{
	char mz_name[RTE_MEMZONE_NAMESIZE];
	struct rte_mempool_list *mempool_list;
	struct rte_mempool *mp = NULL;
	struct rte_tailq_entry *te = NULL;
	const struct rte_memzone *mz = NULL;
	size_t mempool_size;
	unsigned int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
	struct rte_mempool_objsz objsz;
	unsigned lcore_id;
	int ret;

	// 供rte_mempool_tailq链接
	mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);

	/* asked for zero items */
	if (n == 0) {
		rte_errno = EINVAL;
		return NULL;
	}

	/* asked cache too big */
	// 安全校验
	if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
	    CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
		rte_errno = EINVAL;
		return NULL;
	}

	/* enforce only user flags are passed by the application */
	if ((flags & ~RTE_MEMPOOL_VALID_USER_FLAGS) != 0) {
		rte_errno = EINVAL;
		return NULL;
	}

	/*
	 * No objects in the pool can be used for IO until it's populated
	 * with at least some objects with valid IOVA.
	 */
	flags |= RTE_MEMPOOL_F_NON_IO;

	/* "no cache align" imply "no spread" */
	if (flags & RTE_MEMPOOL_F_NO_CACHE_ALIGN)
		flags |= RTE_MEMPOOL_F_NO_SPREAD;

	/* calculate mempool object sizes. */
	// 计算object的size
	if (!rte_mempool_calc_obj_size(elt_size, flags, &objsz)) {
		rte_errno = EINVAL;
		return NULL;
	}

	rte_mcfg_mempool_write_lock();

	/*
	 * reserve a memory zone for this mempool: private data is
	 * cache-aligned
	 */
	// private的size
	private_data_size = (private_data_size +
			     RTE_MEMPOOL_ALIGN_MASK) & (~RTE_MEMPOOL_ALIGN_MASK);


	/* try to allocate tailq entry */
	// 申请内存供链接到rte_mempool_tailq
	te = rte_zmalloc("MEMPOOL_TAILQ_ENTRY", sizeof(*te), 0);
	if (te == NULL) {
		RTE_LOG(ERR, MEMPOOL, "Cannot allocate tailq entry!\n");
		goto exit_unlock;
	}

	// 内存池大小
	mempool_size = RTE_MEMPOOL_HEADER_SIZE(mp, cache_size);
	mempool_size += private_data_size;
	mempool_size = RTE_ALIGN_CEIL(mempool_size, RTE_MEMPOOL_ALIGN);

	ret = snprintf(mz_name, sizeof(mz_name), RTE_MEMPOOL_MZ_FORMAT, name);
	if (ret < 0 || ret >= (int)sizeof(mz_name)) {
		rte_errno = ENAMETOOLONG;
		goto exit_unlock;
	}

	// 申请memzone的大小
	mz = rte_memzone_reserve(mz_name, mempool_size, socket_id, mz_flags);
	if (mz == NULL)
		goto exit_unlock;

	/* init the mempool structure */
	// memzone的前半部分是rte_mempool的内容。
	mp = mz->addr;
	memset(mp, 0, RTE_MEMPOOL_HEADER_SIZE(mp, cache_size));
	ret = strlcpy(mp->name, name, sizeof(mp->name));
	if (ret < 0 || ret >= (int)sizeof(mp->name)) {
		rte_errno = ENAMETOOLONG;
		goto exit_unlock;
	}
	mp->mz = mz;
	mp->size = n;
	mp->flags = flags;
	mp->socket_id = socket_id;
	mp->elt_size = objsz.elt_size;
	mp->header_size = objsz.header_size;
	mp->trailer_size = objsz.trailer_size;
	/* Size of default caches, zero means disabled. */
	mp->cache_size = cache_size;
	mp->private_data_size = private_data_size;
	STAILQ_INIT(&mp->elt_list);
	STAILQ_INIT(&mp->mem_list);

	/*
	 * local_cache pointer is set even if cache_size is zero.
	 * The local_cache points to just past the elt_pa[] array.
	 */
	// 进行指针偏移
	mp->local_cache = (struct rte_mempool_cache *)
		RTE_PTR_ADD(mp, RTE_MEMPOOL_HEADER_SIZE(mp, 0));

	/* Init all default caches. */
	if (cache_size != 0) {
		// 初始化cache 给cache赋值
		for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++)
			mempool_cache_init(&mp->local_cache[lcore_id],
					   cache_size);
	}

	// 链接
	te->data = mp;

	rte_mcfg_tailq_write_lock();
	// 插入rte_mempool_tailq
	TAILQ_INSERT_TAIL(mempool_list, te, next);
	rte_mcfg_tailq_write_unlock();
	rte_mcfg_mempool_write_unlock();

	rte_mempool_trace_create_empty(name, n, elt_size, cache_size,
		private_data_size, flags, mp);
	return mp;

exit_unlock:
	rte_mcfg_mempool_write_unlock();
	rte_free(te);
	rte_mempool_free(mp);
	return NULL;
}

rte_mempool_populate_default 该函数会初始化ring

int
rte_mempool_populate_default(struct rte_mempool *mp)
{
	unsigned int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
	char mz_name[RTE_MEMZONE_NAMESIZE];
	const struct rte_memzone *mz;
	ssize_t mem_size;
	size_t align, pg_sz, pg_shift = 0;
	rte_iova_t iova;
	unsigned mz_id, n;
	int ret;
	bool need_iova_contig_obj;
	size_t max_alloc_size = SIZE_MAX;

	// 这里会根据attribute(construtor)注册的table 然后调用alloc,创建ring 具体不展开了 pool_data指向ring
	ret = mempool_ops_alloc_once(mp);
	if (ret != 0)
		return ret;

	/* mempool must not be populated */
	if (mp->nb_mem_chunks != 0)
		return -EEXIST;

	/*
	 * the following section calculates page shift and page size values.
	 *
	 * these values impact the result of calc_mem_size operation, which
	 * returns the amount of memory that should be allocated to store the
	 * desired number of objects. when not zero, it allocates more memory
	 * for the padding between objects, to ensure that an object does not
	 * cross a page boundary. in other words, page size/shift are to be set
	 * to zero if mempool elements won't care about page boundaries.
	 * there are several considerations for page size and page shift here.
	 *
	 * if we don't need our mempools to have physically contiguous objects,
	 * then just set page shift and page size to 0, because the user has
	 * indicated that there's no need to care about anything.
	 *
	 * if we do need contiguous objects (if a mempool driver has its
	 * own calc_size() method returning min_chunk_size = mem_size),
	 * there is also an option to reserve the entire mempool memory
	 * as one contiguous block of memory.
	 *
	 * if we require contiguous objects, but not necessarily the entire
	 * mempool reserved space to be contiguous, pg_sz will be != 0,
	 * and the default ops->populate() will take care of not placing
	 * objects across pages.
	 *
	 * if our IO addresses are physical, we may get memory from bigger
	 * pages, or we might get memory from smaller pages, and how much of it
	 * we require depends on whether we want bigger or smaller pages.
	 * However, requesting each and every memory size is too much work, so
	 * what we'll do instead is walk through the page sizes available, pick
	 * the smallest one and set up page shift to match that one. We will be
	 * wasting some space this way, but it's much nicer than looping around
	 * trying to reserve each and every page size.
	 *
	 * If we fail to get enough contiguous memory, then we'll go and
	 * reserve space in smaller chunks.
	 */
	

	need_iova_contig_obj = !(mp->flags & RTE_MEMPOOL_F_NO_IOVA_CONTIG);
	// 获取pagesize
	ret = rte_mempool_get_page_size(mp, &pg_sz);
	if (ret < 0)
		return ret;

	if (pg_sz != 0)
		pg_shift = rte_bsf32(pg_sz);

	for (mz_id = 0, n = mp->size; n > 0; mz_id++, n -= ret) {
		size_t min_chunk_size;

		// 类mempool_ops_alloc_once 同样的调用ring的获取大小
		mem_size = rte_mempool_ops_calc_mem_size(
			mp, n, pg_shift, &min_chunk_size, &align);

		if (mem_size < 0) {
			ret = mem_size;
			goto fail;
		}

		ret = snprintf(mz_name, sizeof(mz_name),
			RTE_MEMPOOL_MZ_FORMAT "_%d", mp->name, mz_id);
		if (ret < 0 || ret >= (int)sizeof(mz_name)) {
			ret = -ENAMETOOLONG;
			goto fail;
		}

		/* if we're trying to reserve contiguous memory, add appropriate
		 * memzone flag.
		 */
		if (min_chunk_size == (size_t)mem_size)
			mz_flags |= RTE_MEMZONE_IOVA_CONTIG;

		/* Allocate a memzone, retrying with a smaller area on ENOMEM */
		do {
			mz = rte_memzone_reserve_aligned(mz_name,
				RTE_MIN((size_t)mem_size, max_alloc_size),
				mp->socket_id, mz_flags, align);

			if (mz != NULL || rte_errno != ENOMEM)
				break;

			max_alloc_size = RTE_MIN(max_alloc_size,
						(size_t)mem_size) / 2;
		} while (mz == NULL && max_alloc_size >= min_chunk_size);

		if (mz == NULL) {
			ret = -rte_errno;
			goto fail;
		}

		if (need_iova_contig_obj)
			iova = mz->iova;
		else
			iova = RTE_BAD_IOVA;

		if (pg_sz == 0 || (mz_flags & RTE_MEMZONE_IOVA_CONTIG))
			ret = rte_mempool_populate_iova(mp, mz->addr,
				iova, mz->len,
				rte_mempool_memchunk_mz_free,
				(void *)(uintptr_t)mz);
		else
			ret = rte_mempool_populate_virt(mp, mz->addr,
				mz->len, pg_sz,
				rte_mempool_memchunk_mz_free,
				(void *)(uintptr_t)mz);
		if (ret == 0) /* should not happen */
			ret = -ENOBUFS;
		if (ret < 0) {
			rte_memzone_free(mz);
			goto fail;
		}
	}

	rte_mempool_trace_populate_default(mp);
	return mp->size;

 fail:
	rte_mempool_free_memchunks(mp);
	return ret;
}

1.2 mempool的put

static __rte_always_inline void
rte_mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
		     unsigned int n)
{
	// cache加速
	struct rte_mempool_cache *cache;
	cache = rte_mempool_default_cache(mp, rte_lcore_id());
	rte_mempool_trace_put_bulk(mp, obj_table, n, cache);
	// 实际放入
	rte_mempool_generic_put(mp, obj_table, n, cache);
}
static __rte_always_inline void
rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,
			   unsigned int n, struct rte_mempool_cache *cache)
{
	void **cache_objs;

	/* increment stat now, adding in mempool always success */
	RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
	RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);

	/* No cache provided or if put would overflow mem allocated for cache */
	if (unlikely(cache == NULL || n > RTE_MEMPOOL_CACHE_MAX_SIZE))
		goto ring_enqueue;

	cache_objs = &cache->objs[cache->len];

	/*
	 * The cache follows the following algorithm
	 *   1. Add the objects to the cache
	 *   2. Anything greater than the cache min value (if it crosses the
	 *   cache flush threshold) is flushed to the ring.
	 */

	/* Add elements back into the cache */
	rte_memcpy(&cache_objs[0], obj_table, sizeof(void *) * n);

	cache->len += n;

	if (cache->len >= cache->flushthresh) {
		rte_mempool_ops_enqueue_bulk(mp, &cache->objs[cache->size],
				cache->len - cache->size);
		cache->len = cache->size;
	}

	return;

ring_enqueue:

	/* push remaining objects in ring */
#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
	if (rte_mempool_ops_enqueue_bulk(mp, obj_table, n) < 0)
		rte_panic("cannot put objects in mempool\n");
#else
	rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
#endif
}
static __rte_always_inline void
rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,
			   unsigned int n, struct rte_mempool_cache *cache)
{
	void **cache_objs;

	/* increment stat now, adding in mempool always success */
	RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
	RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);

	/* No cache provided or if put would overflow mem allocated for cache */
	// 对cache操作
	if (unlikely(cache == NULL || n > RTE_MEMPOOL_CACHE_MAX_SIZE))
		goto ring_enqueue;

	cache_objs = &cache->objs[cache->len];

	/*
	 * The cache follows the following algorithm
	 *   1. Add the objects to the cache
	 *   2. Anything greater than the cache min value (if it crosses the
	 *   cache flush threshold) is flushed to the ring.
	 */

	/* Add elements back into the cache */
	rte_memcpy(&cache_objs[0], obj_table, sizeof(void *) * n);

	cache->len += n;

	if (cache->len >= cache->flushthresh) {
		// 这里入cache
		rte_mempool_ops_enqueue_bulk(mp, &cache->objs[cache->size],
				cache->len - cache->size);
		cache->len = cache->size;
	}

	return;

ring_enqueue:

	/* push remaining objects in ring */
#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
	if (rte_mempool_ops_enqueue_bulk(mp, obj_table, n) < 0)
		rte_panic("cannot put objects in mempool\n");
#else
	// 这里直接调用table 然后入队
	rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
#endif
}

1.3 mempool的get

类似put 所以不再进行分析了

static __rte_always_inline int
rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
			   unsigned int n, struct rte_mempool_cache *cache)
{
	int ret;
	uint32_t index, len;
	void **cache_objs;

	/* No cache provided or cannot be satisfied from cache */
	if (unlikely(cache == NULL || n >= cache->size))
		goto ring_dequeue;

	cache_objs = cache->objs;

	/* Can this be satisfied from the cache? */
	if (cache->len < n) {
		/* No. Backfill the cache first, and then fill from it */
		uint32_t req = n + (cache->size - cache->len);

		/* How many do we require i.e. number to fill the cache + the request */
		ret = rte_mempool_ops_dequeue_bulk(mp,
			&cache->objs[cache->len], req);
		if (unlikely(ret < 0)) {
			/*
			 * In the off chance that we are buffer constrained,
			 * where we are not able to allocate cache + n, go to
			 * the ring directly. If that fails, we are truly out of
			 * buffers.
			 */
			goto ring_dequeue;
		}

		cache->len += req;
	}

	/* Now fill in the response ... */
	for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++)
		*obj_table = cache_objs[len];

	cache->len -= n;

	RTE_MEMPOOL_STAT_ADD(mp, get_success_bulk, 1);
	RTE_MEMPOOL_STAT_ADD(mp, get_success_objs, n);

	return 0;

ring_dequeue:

	/* get remaining objects from ring */
	ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n);

	if (ret < 0) {
		RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
		RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
	} else {
		RTE_MEMPOOL_STAT_ADD(mp, get_success_bulk, 1);
		RTE_MEMPOOL_STAT_ADD(mp, get_success_objs, n);
	}

	return ret;
}
 类似资料: