当前位置: 首页 > 工具软件 > VPP > 使用案例 >

VPP接口队列设置

阎辰钊
2023-12-01

可通过VPP启动配置文件/etc/vpp/startup.conf的dpdk段来配置接口的队列参数。

dpdk {
   dev default {
      num-rx-desc 512
      num-tx-desc 512
	  num-tx-queues 2
   }

   dev 0000:02:00.1 {
      num-rx-queues 2
      name eth0
   }
}

dpdk段配置的解析函数dpdk_device_config如下,将设备的队列配置读进devconf结构的相应成员变量中。

static clib_error_t *
dpdk_device_config (dpdk_config_main_t *conf, void *addr,
            dpdk_device_addr_type_t addr_type, unformat_input_t *input, u8 is_default)
{
  unformat_skip_white_space (input);
  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
    {
      if (unformat (input, "num-rx-queues %u", &devconf->num_rx_queues))
    ;
      else if (unformat (input, "num-tx-queues %u", &devconf->num_tx_queues))
    ;
      else if (unformat (input, "num-rx-desc %u", &devconf->num_rx_desc))
    ;
      else if (unformat (input, "num-tx-desc %u", &devconf->num_tx_desc))
    ;
      else if (unformat (input, "name %s", &devconf->name))
    ;
      else if (unformat (input, "workers %U", unformat_bitmap_list,
             &devconf->workers))
    ;

      else if (unformat (input, "rss-queues %U",
             unformat_bitmap_list, &devconf->rss_queues))
    ;

通过workers可以指定对接口进行处理的线程,在指定workers的情况下,如果接收队列的数量为零,那么将接收队列的数量设置为workers的数量。反之,如果workers的数量和配置的接收队列的数量不相等,返回错误,需要保障每个workers有一个接收队列。

  if (devconf->workers && devconf->num_rx_queues == 0)
    devconf->num_rx_queues = clib_bitmap_count_set_bits (devconf->workers);
  else if (devconf->workers &&
       clib_bitmap_count_set_bits (devconf->workers) !=
       devconf->num_rx_queues)
    error = clib_error_return (0,
                   "%U: number of worker threads must be "
                   "equal to number of rx queues",
                   format_vlib_pci_addr, addr);
int
rte_eth_dev_info_get(uint16_t port_id, struct rte_eth_dev_info *dev_info)
{
    int diag;

    diag = (*dev->dev_ops->dev_infos_get)(dev, dev_info);
    if (diag != 0) {
        /* Cleanup already filled in device information */
        memset(dev_info, 0, sizeof(struct rte_eth_dev_info));
        return eth_err(port_id, diag);
    }

    /* Maximum number of queues should be <= RTE_MAX_QUEUES_PER_PORT */
    dev_info->max_rx_queues = RTE_MIN(dev_info->max_rx_queues,
            RTE_MAX_QUEUES_PER_PORT);
    dev_info->max_tx_queues = RTE_MIN(dev_info->max_tx_queues,
            RTE_MAX_QUEUES_PER_PORT);

    dev_info->driver_name = dev->device->driver->name;
    dev_info->nb_rx_queues = dev->data->nb_rx_queues;
    dev_info->nb_tx_queues = dev->data->nb_tx_queues;

在DPDK的设备初始化过程中,设备的发送队列在支持的最大队列和运行的线程数量之间取较小的值。另外,如果配置的发送队列数量小于以上计算的值,使用配置的发送队列数量,但是此数量需要大于零。

static clib_error_t *
dpdk_lib_init (dpdk_main_t * dm)
{
  dpdk_device_t *xd;

  /* *INDENT-OFF* */
  RTE_ETH_FOREACH_DEV(i)
    {

      xd->tx_q_used = clib_min (dev_info.max_tx_queues, tm->n_vlib_mains);

      if (devconf->num_tx_queues > 0
      && devconf->num_tx_queues < xd->tx_q_used)
    xd->tx_q_used = clib_min (xd->tx_q_used, devconf->num_tx_queues);

接收队列的最小值为1。如果配置的接收队列数量小于接口的最大接收队列长度,表明可使用配置值,接收队列大于1,开启RSS,如果没有指定RSS哈希算法,默认使用IP头部的源/目的地址,和UDP/TCP头部的源/目的端口计算哈希。

否则,使用指定的RSS算法,如果其中指定的某些字段不支持,进行警告。

      if (devconf->num_rx_queues > 1
      && dev_info.max_rx_queues >= devconf->num_rx_queues)
    {
      xd->rx_q_used = devconf->num_rx_queues;
      xd->port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
      if (devconf->rss_fn == 0)
        xd->port_conf.rx_adv_conf.rss_conf.rss_hf =
          ETH_RSS_IP | ETH_RSS_UDP | ETH_RSS_TCP;
      else
        {
          u64 unsupported_bits;
          xd->port_conf.rx_adv_conf.rss_conf.rss_hf = devconf->rss_fn;
          unsupported_bits = xd->port_conf.rx_adv_conf.rss_conf.rss_hf;
          unsupported_bits &= ~dev_info.flow_type_rss_offloads;
          if (unsupported_bits)
        dpdk_log_warn ("Unsupported RSS hash functions: %U",
                   format_dpdk_rss_hf_name, unsupported_bits);
        }
      xd->port_conf.rx_adv_conf.rss_conf.rss_hf &=
        dev_info.flow_type_rss_offloads;
    }
      else
    xd->rx_q_used = 1;

如果设备的pmd驱动名称没有进行设置,例如,对于i40e驱动,VNET_DPDK_PMD_I40E。初始化好PMD相关参数,发送和接收的描述符默认都是1024。

      if (!xd->pmd)
    {

#define _(s,f) else if (dev_info.driver_name &&                 \
                        !strcmp(dev_info.driver_name, s))       \
                 xd->pmd = VNET_DPDK_PMD_##f;
      if (0)
        ;
      foreach_dpdk_pmd
#undef _
        else
        xd->pmd = VNET_DPDK_PMD_UNKNOWN;

      xd->port_type = VNET_DPDK_PORT_TYPE_UNKNOWN;
      xd->nb_rx_desc = DPDK_NB_RX_DESC_DEFAULT;
      xd->nb_tx_desc = DPDK_NB_TX_DESC_DEFAULT;

如果配置了接收描述符的数量,使用配置值。否则,对于2M大小的巨页,如果处理器没有L3级缓存cache,接收描述符设置为512。

      if (devconf->num_rx_desc)
        xd->nb_rx_desc = devconf->num_rx_desc;
          else {

            /* If num_rx_desc is not specified by VPP user, the current CPU is working
            with 2M page and has no L3 cache, default num_rx_desc is changed to 512
            from original 1024 to help reduce TLB misses.
            */
            if ((clib_mem_get_default_hugepage_size () == 2 << 20)
              && check_l3cache() == 0)
              xd->nb_rx_desc = 512;
          }

如果配置了发送描述符的数量,使用配置值。否则,对于2M大小的巨页,如果处理器没有L3级缓存cache,发送描述符设置为512,以降低TLB缺失。

      if (devconf->num_tx_desc)
        xd->nb_tx_desc = devconf->num_tx_desc;
          else {

            /* If num_tx_desc is not specified by VPP user, the current CPU is working
            with 2M page and has no L3 cache, default num_tx_desc is changed to 512
            from original 1024 to help reduce TLB misses.
            */
            if ((clib_mem_get_default_hugepage_size () == 2 << 20)
              && check_l3cache() == 0)
              xd->nb_tx_desc = 512;
      }
       }

如果配置了处理接口的workers集合,将接收队列注册给相应的worker。否则,所有接收队列依次注册给系统中的worker线程。

      if (devconf->workers)
    {
      int i;
      q = 0;
      clib_bitmap_foreach (i, devconf->workers)  {
          dpdk_rx_queue_t *rxq = vec_elt_at_index (xd->rx_queues, q);
          rxq->queue_index = vnet_hw_if_register_rx_queue (
        dm->vnet_main, xd->hw_if_index, q++,
        vdm->first_worker_thread_index + i);
      }
    }
      else
    for (q = 0; q < xd->rx_q_used; q++)
      {
        dpdk_rx_queue_t *rxq = vec_elt_at_index (xd->rx_queues, q);
        rxq->queue_index = vnet_hw_if_register_rx_queue (
          dm->vnet_main, xd->hw_if_index, q, VNET_HW_IF_RXQ_THREAD_ANY);
      }
 类似资料: