当前位置: 首页 > 工具软件 > DPDK > 使用案例 >

DPDK插件接收队列

鲁永福
2023-12-01

VPP插件DPDK在注册以太网接口之后,设定处理此接口的节点索引,即由当前的节点dpdk_input_node来处理此接口。

static clib_error_t *
dpdk_lib_init (dpdk_main_t * dm)
{
  dpdk_device_t *xd;
  
      xd->hw_if_index = vnet_eth_register_interface (vnm, &eir);
      /* assign worker threads */
      vnet_hw_if_set_input_node (vnm, xd->hw_if_index, dpdk_input_node.index);

将dpdk_input_node节点的索引赋值给接口结构的成员input_node_index。

void   
vnet_hw_if_set_input_node (vnet_main_t *vnm, u32 hw_if_index, u32 node_index)
{      
  vlib_main_t *vm = vlib_get_main ();
  vnet_hw_interface_t *hi = vnet_get_hw_interface (vnm, hw_if_index);
  hi->input_node_index = node_index;
  log_debug ("set_input_node: node %U for interface %v", format_vlib_node_name,
         vm, node_index, hi->name);
} 

如果配置了worker进程(配置参数corelist-workers),遍历worker位图,首先确保配置的接收队列(num-rx-queues)不能少于worker的数量,否则触发ASSERT。为每个worker注册接收队列,将接口的队列与worker绑定。

否则,如果没有配置corelist-workers,则根据配置的接收队列数量,进行注册,最后的线程索引使用VNET_HW_IF_RXQ_THREAD_ANY。返回的队列索引赋值与DPDK接收队列成员queue_index。

static clib_error_t *
dpdk_lib_init (dpdk_main_t * dm)
{
      if (devconf->workers)
    {
      int j;
      q = 0;
      clib_bitmap_foreach (j, devconf->workers)
        {
          dpdk_rx_queue_t *rxq = vec_elt_at_index (xd->rx_queues, q);
          rxq->queue_index = vnet_hw_if_register_rx_queue (
        vnm, xd->hw_if_index, q++, vdm->first_worker_thread_index + j);
        } 
    }
      else   
    for (q = 0; q < xd->conf.n_rx_queues; q++)
      {
        dpdk_rx_queue_t *rxq = vec_elt_at_index (xd->rx_queues, q);
        rxq->queue_index = vnet_hw_if_register_rx_queue (
          vnm, xd->hw_if_index, q, VNET_HW_IF_RXQ_THREAD_ANY);
      }

首先检查是否已经注册过此接口的接收队列,重复注册将导致panic。如果thread_index非法,函数next_thread_index将分配一个合法的线程索引值。最后,由hw_if_rx_queues池中分配vnet接口接收队列结构vnet_hw_if_rx_queue_t,并初始化其中的接口索引、队列接口、线程索引等。

u32
vnet_hw_if_register_rx_queue (vnet_main_t *vnm, u32 hw_if_index, u32 queue_id,
                  u32 thread_index)
{
  vnet_interface_main_t *im = &vnm->interface_main;
  vnet_hw_interface_t *hi = vnet_get_hw_interface (vnm, hw_if_index);
  vnet_hw_if_rx_queue_t *rxq;
  u64 key = rx_queue_key (hw_if_index, queue_id);

  if (hash_get_mem (im->rxq_index_by_hw_if_index_and_queue_id, &key))
    clib_panic ("Trying to register already registered queue id (%u) in the "
        "interface %v\n",
        queue_id, hi->name);

  thread_index = next_thread_index (vnm, thread_index);

  pool_get_zero (im->hw_if_rx_queues, rxq);
  queue_index = rxq - im->hw_if_rx_queues;
  vec_add1 (hi->rx_queue_indices, queue_index);
  hash_set_mem_alloc (&im->rxq_index_by_hw_if_index_and_queue_id, &key,
              queue_index);
  rxq->hw_if_index = hw_if_index;
  rxq->dev_instance = hi->dev_instance;
  rxq->queue_id = queue_id;
  rxq->thread_index = thread_index;
  rxq->mode = VNET_HW_IF_RX_MODE_POLLING;
  rxq->file_index = ~0;

节点运行数据

函数vnet_hw_if_update_runtime_data将绑定接口接收队列与节点运行数据。如下,遍历接口接收队列结构,将其中的dev_instance和队列索引queue_id赋值到线程对应的vnet_hw_if_rxq_poll_vector_t结构,如下d[ti]表示的向量。

void     
vnet_hw_if_update_runtime_data (vnet_main_t *vnm, u32 hw_if_index)
{
  vnet_interface_main_t *im = &vnm->interface_main;
  vnet_hw_interface_t *hi = vnet_get_hw_interface (vnm, hw_if_index);
  u32 node_index = hi->input_node_index;
  vnet_hw_if_rx_queue_t *rxq;
  vnet_hw_if_rxq_poll_vector_t *pv, **d = 0, **a = 0;

  /* construct per-thread polling vectors */
  pool_foreach (rxq, im->hw_if_rx_queues)
    {
      u32 ti = rxq->thread_index;

      if (per_thread_node_state[ti] != VLIB_NODE_STATE_POLLING)
    continue;

      vec_add2_aligned (d[ti], pv, 1, CLIB_CACHE_LINE_BYTES);
      pv->dev_instance = rxq->dev_instance;
      pv->queue_id = rxq->queue_id;
    }

这里,将线程对应的接收队列向量d[i],绑定到每个线程相关的运行数据中的成员rxq_vector_int,每个线程拥有独立的runtime数据。

  if (something_changed_on_rx || something_changed_on_tx)
    {
      for (int i = 0; i < n_threads; i++)
        {
          vlib_main_t *vm = vlib_get_main_by_index (i);
          vnet_hw_if_rx_node_runtime_t *rt;
          rt = vlib_node_get_runtime_data (vm, node_index);
          pv = rt->rxq_vector_int;
          rt->rxq_vector_int = d[i];
          d[i] = pv;

DPDK接收节点

在节点dpdk_input_node,由函数vnet_hw_if_get_rxq_poll_vector获得当前进程要处理的接收队列向量。,由函数dpdk_device_input依次处理其中的队列。

VLIB_NODE_FN (dpdk_input_node) (vlib_main_t * vm, vlib_node_runtime_t * node,
                vlib_frame_t * f) 
{             
  dpdk_main_t *dm = &dpdk_main;
  dpdk_device_t *xd; 
  vnet_hw_if_rxq_poll_vector_t *pv;
  u32 thread_index = vm->thread_index;
  
  /* Poll all devices on this cpu for input/interrupts.
   */  
  pv = vnet_hw_if_get_rxq_poll_vector (vm, node);

  for (int i = 0; i < vec_len (pv); i++)
    {
      xd = vec_elt_at_index (dm->devices, pv[i].dev_instance);
      n_rx_packets +=
    dpdk_device_input (vm, dm, xd, node, thread_index, pv[i].queue_id);
    }
  return n_rx_packets; 

对于VLIB_NODE_STATE_POLLING(DPDK节点默认),返回的是运行数据中的rxq_vector_int接收队列向量。

static_always_inline vnet_hw_if_rxq_poll_vector_t *
vnet_hw_if_get_rxq_poll_vector (vlib_main_t *vm, vlib_node_runtime_t *node)
{
  vnet_hw_if_rx_node_runtime_t *rt = (void *) node->runtime_data;
  vnet_hw_if_rxq_poll_vector_t *pv = rt->rxq_vector_int;
      
  if (PREDICT_FALSE (node->state == VLIB_NODE_STATE_INTERRUPT))
    pv = vnet_hw_if_generate_rxq_int_poll_vector (vm, node);
  else if (node->flags & VLIB_NODE_FLAG_ADAPTIVE_MODE)
    pv = rt->rxq_vector_poll;

  return pv;
 类似资料: