VPP插件DPDK在注册以太网接口之后,设定处理此接口的节点索引,即由当前的节点dpdk_input_node来处理此接口。
static clib_error_t *
dpdk_lib_init (dpdk_main_t * dm)
{
dpdk_device_t *xd;
xd->hw_if_index = vnet_eth_register_interface (vnm, &eir);
/* assign worker threads */
vnet_hw_if_set_input_node (vnm, xd->hw_if_index, dpdk_input_node.index);
将dpdk_input_node节点的索引赋值给接口结构的成员input_node_index。
void
vnet_hw_if_set_input_node (vnet_main_t *vnm, u32 hw_if_index, u32 node_index)
{
vlib_main_t *vm = vlib_get_main ();
vnet_hw_interface_t *hi = vnet_get_hw_interface (vnm, hw_if_index);
hi->input_node_index = node_index;
log_debug ("set_input_node: node %U for interface %v", format_vlib_node_name,
vm, node_index, hi->name);
}
如果配置了worker进程(配置参数corelist-workers),遍历worker位图,首先确保配置的接收队列(num-rx-queues)不能少于worker的数量,否则触发ASSERT。为每个worker注册接收队列,将接口的队列与worker绑定。
否则,如果没有配置corelist-workers,则根据配置的接收队列数量,进行注册,最后的线程索引使用VNET_HW_IF_RXQ_THREAD_ANY。返回的队列索引赋值与DPDK接收队列成员queue_index。
static clib_error_t *
dpdk_lib_init (dpdk_main_t * dm)
{
if (devconf->workers)
{
int j;
q = 0;
clib_bitmap_foreach (j, devconf->workers)
{
dpdk_rx_queue_t *rxq = vec_elt_at_index (xd->rx_queues, q);
rxq->queue_index = vnet_hw_if_register_rx_queue (
vnm, xd->hw_if_index, q++, vdm->first_worker_thread_index + j);
}
}
else
for (q = 0; q < xd->conf.n_rx_queues; q++)
{
dpdk_rx_queue_t *rxq = vec_elt_at_index (xd->rx_queues, q);
rxq->queue_index = vnet_hw_if_register_rx_queue (
vnm, xd->hw_if_index, q, VNET_HW_IF_RXQ_THREAD_ANY);
}
首先检查是否已经注册过此接口的接收队列,重复注册将导致panic。如果thread_index非法,函数next_thread_index将分配一个合法的线程索引值。最后,由hw_if_rx_queues池中分配vnet接口接收队列结构vnet_hw_if_rx_queue_t,并初始化其中的接口索引、队列接口、线程索引等。
u32
vnet_hw_if_register_rx_queue (vnet_main_t *vnm, u32 hw_if_index, u32 queue_id,
u32 thread_index)
{
vnet_interface_main_t *im = &vnm->interface_main;
vnet_hw_interface_t *hi = vnet_get_hw_interface (vnm, hw_if_index);
vnet_hw_if_rx_queue_t *rxq;
u64 key = rx_queue_key (hw_if_index, queue_id);
if (hash_get_mem (im->rxq_index_by_hw_if_index_and_queue_id, &key))
clib_panic ("Trying to register already registered queue id (%u) in the "
"interface %v\n",
queue_id, hi->name);
thread_index = next_thread_index (vnm, thread_index);
pool_get_zero (im->hw_if_rx_queues, rxq);
queue_index = rxq - im->hw_if_rx_queues;
vec_add1 (hi->rx_queue_indices, queue_index);
hash_set_mem_alloc (&im->rxq_index_by_hw_if_index_and_queue_id, &key,
queue_index);
rxq->hw_if_index = hw_if_index;
rxq->dev_instance = hi->dev_instance;
rxq->queue_id = queue_id;
rxq->thread_index = thread_index;
rxq->mode = VNET_HW_IF_RX_MODE_POLLING;
rxq->file_index = ~0;
函数vnet_hw_if_update_runtime_data将绑定接口接收队列与节点运行数据。如下,遍历接口接收队列结构,将其中的dev_instance和队列索引queue_id赋值到线程对应的vnet_hw_if_rxq_poll_vector_t结构,如下d[ti]表示的向量。
void
vnet_hw_if_update_runtime_data (vnet_main_t *vnm, u32 hw_if_index)
{
vnet_interface_main_t *im = &vnm->interface_main;
vnet_hw_interface_t *hi = vnet_get_hw_interface (vnm, hw_if_index);
u32 node_index = hi->input_node_index;
vnet_hw_if_rx_queue_t *rxq;
vnet_hw_if_rxq_poll_vector_t *pv, **d = 0, **a = 0;
/* construct per-thread polling vectors */
pool_foreach (rxq, im->hw_if_rx_queues)
{
u32 ti = rxq->thread_index;
if (per_thread_node_state[ti] != VLIB_NODE_STATE_POLLING)
continue;
vec_add2_aligned (d[ti], pv, 1, CLIB_CACHE_LINE_BYTES);
pv->dev_instance = rxq->dev_instance;
pv->queue_id = rxq->queue_id;
}
这里,将线程对应的接收队列向量d[i],绑定到每个线程相关的运行数据中的成员rxq_vector_int,每个线程拥有独立的runtime数据。
if (something_changed_on_rx || something_changed_on_tx)
{
for (int i = 0; i < n_threads; i++)
{
vlib_main_t *vm = vlib_get_main_by_index (i);
vnet_hw_if_rx_node_runtime_t *rt;
rt = vlib_node_get_runtime_data (vm, node_index);
pv = rt->rxq_vector_int;
rt->rxq_vector_int = d[i];
d[i] = pv;
在节点dpdk_input_node,由函数vnet_hw_if_get_rxq_poll_vector获得当前进程要处理的接收队列向量。,由函数dpdk_device_input依次处理其中的队列。
VLIB_NODE_FN (dpdk_input_node) (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * f)
{
dpdk_main_t *dm = &dpdk_main;
dpdk_device_t *xd;
vnet_hw_if_rxq_poll_vector_t *pv;
u32 thread_index = vm->thread_index;
/* Poll all devices on this cpu for input/interrupts.
*/
pv = vnet_hw_if_get_rxq_poll_vector (vm, node);
for (int i = 0; i < vec_len (pv); i++)
{
xd = vec_elt_at_index (dm->devices, pv[i].dev_instance);
n_rx_packets +=
dpdk_device_input (vm, dm, xd, node, thread_index, pv[i].queue_id);
}
return n_rx_packets;
对于VLIB_NODE_STATE_POLLING(DPDK节点默认),返回的是运行数据中的rxq_vector_int接收队列向量。
static_always_inline vnet_hw_if_rxq_poll_vector_t *
vnet_hw_if_get_rxq_poll_vector (vlib_main_t *vm, vlib_node_runtime_t *node)
{
vnet_hw_if_rx_node_runtime_t *rt = (void *) node->runtime_data;
vnet_hw_if_rxq_poll_vector_t *pv = rt->rxq_vector_int;
if (PREDICT_FALSE (node->state == VLIB_NODE_STATE_INTERRUPT))
pv = vnet_hw_if_generate_rxq_int_poll_vector (vm, node);
else if (node->flags & VLIB_NODE_FLAG_ADAPTIVE_MODE)
pv = rt->rxq_vector_poll;
return pv;