编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

从零开始学习DPDK:掌握这些常用库函数就够了

wxchong 2025-03-08 01:16:21 开源技术 203 ℃ 0 评论

一、概念

Intel(R) DPDK全称Intel Data Plane Development Kit,是intel提供的数据平面开发工具集,为Intel architecture(IA)处理器架构下用户空间高效的数据包处理提供库函数和驱动的支持,它不同于Linux系统以通用性设计为目的,而是专注于网络应用中数据包的高性能处理。目前已经验证可以运行在大多数Linux操作系统上。DPDK使用了BSDLicense,极大的方便了企业在其基础上来实现自己的协议栈或者应用。目前出现了很多基于 dpdk 的高性能网络框架,OVS 和 VPP 是常用的数据面框架,mTCP 和 f-stack 是常用的用户态协议栈。

需要强调的是,DPDK应用程序是运行在用户空间上利用自身提供的数据平面库来收发数据包,绕过了Linux内核协议栈对数据包处理过程。Linux内核将DPDK应用程序看作是一个普通的用户态进程,包括它的编译、连接和加载方式和普通程序没有什么两样。如下图2所示DPDK包处理流程绕过了内核直接到用户层进行处理,区别于传统的数据包先到内核最后再到用户层。

图1 传统网络包处理路径 图2 DPDK数据包处理路径

Kni(Kernel NIC Interface)内核网卡接口,是DPDK允许用户态和内核态交换报文的解决方案,例如DPDK的协议栈是专门处理DNS报文,其余的报文通过KNI接口返回给内核来处理。KNI模拟了一个虚拟的网口,提供dpdk的应用程序和linux内核之间通讯。用于DPDK和内核的交互,kni接口允许报文从用户态接收后转发到内核协议栈去。

DPDK的包全部在用户空间使用内存池管理,内核空间与用户空间的内存交互不用进行拷贝,只做控制权转移。DPDK的主要对外函数接口都以rte_作为前缀,抽象化函数接口是典型软件设计思路,rte是指runtime environment,eal是指environmentabstraction layer。下图3是kni的mbuf使用流程图,可以看出报文的流向,因为报文在代码中其实就是一个个内存指针。其中rx_q右边是用户态,左边是内核态。最后通过调用netif_rx()将报文送入linux协议栈,这其中需要将dpdk的mbuf转换成skb_buf。当linux向kni端口发送报文时,调用回调函数kni_net_tx(),然后报文经过转换之后发送到端口上。rte_pktmbut_free()把内存重新释放到mbuf内存池中。

图3 数据包通过KNI的流程

二、DPDK的Helloworld代码示例

HelloWorld是最基础的入门程序,代码简短,功能也不复杂。它建立了一个多核(线程)运行的基础环境,每个线程会打印“hello from core #”,core # 是由操作系统管理的。

int  main(int    argc, char **argv)
{
   int ret;
   unsigned lcore_id; 
   ret = rte_eal_init(argc, argv);
 
   )
          rte_panic("Cannot init    EAL\n");
 
   /* call lcore_hello() on every slave    lcore */
   RTE_LCORE_FOREACH_SLAVE(lcore_id)    {
          rte_eal_remote_launch(lcore_hello,    NULL, lcore_id);
   } 
 
   /* call it on master lcore too */
   lcore_hello(NULL); 
   rte_eal_mp_wait_lcore(); 
   ;
}

对于HelloWorld这个实例,最需要的参数是“-c ”,线程掩码(coremask)指定了需要参与运行的线程(核)集合。rte_eal_init本身所完成的工作是复杂的,它读取入口参数,解析并保存作为DPDK运行的系统信息,依赖这些信息,构建一个针对包处理设计的运行环境。主要动作分解为:配置初始化-->内存初始化-->内存池初始化-->队列初始化-->告警初始化-->中断初始化-->PCI初始化-->定时器初始化-->检测内存本地化(NUMA)-->插件初始化-->主线程初始化-->轮询设备初始化-->建立主从线程通道-->将从线程设置在等待模式-->PCI设备的探测与初始化...对于DPDK库的使用者,这些操作已经被EAL封装起来,接口清晰。如果需要对DPDK进行深度定制,二次开发,需要仔细研究内部操作,详见官方网站
https://doc.dpdk.org/guides/prog_guide/。

还不熟悉的朋友,这里可以先领取一份dpdk新手学习资料包(入坑不亏):

DPDK技术交流群(1106675687):正在跳转

DPDK面向多核设计,程序会试图独占运行在逻辑核(lcore)上。Main函数里重要部分是启动多核运行环境,RTE_LCORE_FOREACH_SLAVE(lcore_id)如名所示,遍历所有EAL指定可以使用的lcore,然后通过rte_eal_remote_launch在每个lcore上,启动被指定的线程。

int rte_eal_remote_launch(int (*f)(void *),
void *arg, unsignedslave_id);

第一个参数是从线程,是被征召的线程,第二个参数是传给从线程的参数,第三个参数是指定的逻辑核,从线程会执行在这个core上。具体来说,int rte_eal_remote_launch(lcore_hello,NULL, lcore_id);参数lcore_id指定了从线程ID,运行入口函数lcore_hello.
运行函数lcore_hello,它读取自己的逻辑核编号(lcore_id), 打印出“hellofrom core #”

static    int
lcore_hello(__attribute__((unused))    void *arg)
{
 
   unsigned lcore_id;
 
   lcore_id = rte_lcore_id();
 
   printf("hello from core    %u\n", lcore_id);
 
   ;
 
}

以上仅是个简单示例,在真实的DPDK处理场景中,该处理函数会是一个循环运行的处理过程。

DPDK也有自身的劣势,对于低负荷的场景不建议使用DPDK:

  • 内核栈转移至用户层增加了开发成本.
  • 低负荷服务器不实用,会造成内核空转.

三、DPDK库函数

3.1 EAL库

EAL环境适配层

环境抽象层(EAL)负责获得对底层资源(如硬件和内存空间)的访问。对于应用程序和其他库来说,使用这个通用接口可以不用关系具体操作系统的环境细节。rte_eal_init初始化例程负责决定如何分配操作系统的这些资源(即内存空间、设备、定时器、控制台等等)。

EAL(Environment Abstraction Layer,环境抽象层)对 DPDK 的运行环境(e.g. Linux 操作系统)进行初始化,并为上层应用(用户态 DPDK App)提供了一个通用接口,隐藏了与底层库与设备打交道的相关细节。EAL 主要实现了 DPDK 运行的初始化工作,包括:HugePage 内存分配、NUMA 亲和性、CPU 绑定、Memory 划分、Buffer 划分、Ring 队列分配、原子性无锁操作等,并通过 UIO 或 VFIO 技术将 PCI/PCIe 设备地址映射到用户空间,方便了用户态的 DPDK App 调用。

  • DPDK App 的加载和启动:将 DPDK App 和 DPDK Lib 链接成一个独立的进程,并以指定的方式加载。
  • NUMA 亲和性与 CPU 绑定:将 DPDK App 的执行单元(进程、线程)绑定到特定的 Core 上。
  • 内存分配:EAL 实现了不同区域的内存分配,例如:为 PCI 设备接口提供了物理内存。
  • PCI 地址(BAR)抽象:EAL 提供了对 PCI 地址空间的访问接口。
  • 跟踪调试功能(Debug):日志信息,堆栈打印、异常挂起等等。
  • 通用功能:提供了标准 libc,不提供的自旋锁、原子计数器等。
  • CPU Feature 标识功能:用于决定 CPU 运行时的一些特殊功能,决定当前 CPU 所支持的特性,以便编译对应的二进制文件。
  • 中断处理:提供接口作为中断注册/解注册的回调函数。
  • 告警功能:提供接口用于设置特定环境下的告警。

Linux环境下的EAL

在 Linux 用户空间中,DPDK App 通过 pthread 库作为一个用户态的进程运行。PCI 设备的信息和 BAR 地址空间通过 /sys 内核接口及内核模块,例如:uio_pci_generic 或 igb_uio 来发现并注册的。

正如 Linux 内核文档中对 UIO 的描述,PCI 设备的存储器空间信息是通过 mmap 重新映射到用户态的,相对于传统的 read/write 调用少了一次数据拷贝(内核缓存 => 用户态缓冲)的过程。EAL 通过对 hugetlb 使用 mmap 接口来实现 hugetlbfs 文件系统的空间映射到用户进程的虚拟内存地址空间。这部分内存暴露给 DPDK 服务层,如:Mempool Library。最后,DPDK 通过设置 CPU 绑定和 NUMA 亲和性调用,将每个执行单元(进程、线程)分配给特定的逻辑核,以 User-level 等级运行。

另外,DPDK 的定时器是通过 CPU 的 TSC(时间戳计数器)或者通过 mmap 调用内核的 HPET 系统接口来实现的。

DPDK App 的初始化和运行

DPDK App 的初始化从 glibc 的 start() 开始执行,检查也在初始化过程中被执行,用于保证配置文件所选择的处理器架构宏定义是当前 CPU 所支持的,然后才开始调用 DPDK App 的 main()。Master/logic Core 的初始化和运行时在 rte_eal_init() 上执行的,包括:对 pthread 库的调用。初始化流程如下图所示:

  • rte_eal_init() 的初始化,包括:内存区间、Ring、内存池、lPM 表或 HASH 表等,必须作为整个 DPDK App 初始化的一部分,在 Master Core 上完成。创建和初始化这些对象的函数不是多线程安全的,但是,一旦初始化完成后,这些对象本身是线程安全的。
  • 多进程支持:Linux 上运行的 DPDK App 支持多进程运行模式。
  • 内存分配:连续的物理内存分配是通过 hugetlbfs 内核文件系统来实现的。 EAL 提供了相应的接口(函数)用于申请指定名字的、连续的内存空间。 这个接口同时会将这段连续空间的地址返回给用户程序。内存申请是使用 rte_malloc() 接口来完成的,是 hugetlbfs 文件系统支持的调用。
  • PCI 设备访问:EAL 使用了 Linux 内核提供的文件系统 /sys/bus/pci 来扫描 PCI Bus(总线)上的内容。通过 uio_pci_generic(Linux 内核提供的原生 UIO 模块)或 igb_uio(DPDK 提供的 UIO 内核模块)提供的 /dev/uioX 设备文件,以及 /sys 下对应的资源文件(resource0…N)用于访问 PCI 设备。
  • 逻辑核与共享内存(变量):逻辑核就是处理器的逻辑单元,即:线程(Thread),线程之间的交互默认通过共享内存(变量)来完成。共享内存是 Linux 上的一种 IPC(Inter-Process Communication,进程间通信)技术,提供了进程间通信的方法。实际上是通过线程局部存储技术 TLS 来实现的,它提供了每个线程访问本地存储的功能。
  • 日志:EAL 提供了日志信息接口。 默认的,Linux 的应用程序(包括 DPDK App)的日志信息被发送到 syslog 中或打印到 concole 上。DPDK App 也支持用户可以通过使用不同的日志机制来代替上述方式。
  • 跟踪与调试功能(DEBUG):Glibc 提供了一些调试函数用于打印应用程序的堆栈信息。rte_panic() 可以产生一个 SIG_ABORT 信号,这个信号可以触发产生用于 GBD 调试的 core 文件,所以,我们可以通过 gdb 指令来加载并调试 DPDK App。
  • CPU Feature 识别:EAL 提供了 rte_cpu_get_feature() 接口来查询 CPU 的状态信息,包括 CPU 的特征信息,以此来决定 DPDK App 是否可以在该 CPU 上运行。
  • PCI 设备的黑白名单:EAL 的 PCI 设备黑名单功能用于让 DPDK 忽略指定的 NIC 端口,使用 PCI 设备的地址描述符(Domain:Bus:Device:Function)来对端口进行标记。
  • MISC 功能:包括锁和原子操作(x86 架构)。

内存分配

在 Linux 中,所有的物理内存都通过一个内存描述符表进行管理,且每个描述符指向一块连续的物理内存。通常,物理内存区块之间很可能是不连续的,所以 DPDK 内存区块分配器的作用就是保证分配到一块连续的物理内存。内存分配可以从指定的地址开始,也使用使用对齐的方式来进行分配(默认是 Cache Line 大小对齐),对齐一般是以 2 的次幂来进行的,并且不小于 64 字节对齐。

实际上,连续的物理内存分配是通过 hugetlbfs 内核文件系统来实现的。 EAL 提供了相应的接口(函数)用于申请指定名字的、连续的内存空间。 这个接口同时会将这段连续空间的地址返回给用户程序。内存申请是使用 rte_malloc() 接口来完成的,是 hugetlbfs 文件系统支持的调用。所以,内存区块可以是 2M 或是 1G 大小的内存页。

这些内存区块会使用一个名称进行唯一标识,通过名字访问一个内存区块会返回对应内存区块的描述符。rte_memzone 描述符也存在 DPDK 的配置结构体中,通过 rte_eal_get_configuration() 接口来获取。

注意,通常的,rte_malloc() 内存分配不应该在数据面处理逻辑中进行,因为相对于基于池(Mempool 库)的分配速度要慢,并且在分配和释放的过程中也使用了锁操作。所以 rte_malloc() 内存分配通常在控制逻辑的配置代码中使用。

rte_malloc() 可以传入一个对齐参数,数值必须是 2 的次幂,表示分配对齐参数乘以倍数的内存区域。在 NUMA 多处理器系统中,默认的,对 rte_malloc() 的调用会从调用该函数的 Core 所在的 Socket 上分配内存。此外,DPDK 也提供了另一组 API,允许在指定的 NUMA node 上显式的分配内存。

Malloc 库内部使用了两种数据结构类型:

  • struct malloc_heap:用于在每个 CPU Socket 上跟踪可用内存空间。
  • struct malloc_elem:Malloc 库内部用于追踪分配和释放空间的基本要素。

Structure: malloc_heap

  • 数据结构 malloc_heap 用于管理每个 Socket 上的可用内存空间。
  • NOTE:malloc_heap 并不会跟踪已使用的内存块。

在 Malloc 库内部,每个 NUMA node 都有一个堆结构,这允许我们在线程运行的 NUMA node 上为线程分配内存,但这也只是一种具有优先级的 “弱限制” 而已。

malloc_heap 结构及其关键字段和功能描述如下:

  • lock:需要使用锁(Lock)来同步对堆的访问。例如:当使用链表来跟踪堆中的可用空间,我们就需要一个锁来防止多个线程同时处理该链表。
  • free_head:指向这个堆的空闲结点链表中的第一个元素。

Structure: malloc_elem

数据结构 malloc_elem 用作各种内存块的通用头结构。它以三种不同的使用方式:

  • 作为一个释放/申请内存的头部(正常使用)
  • 作为内存的内部填充(Pad)头部
  • 作为内存的结尾标记

结构中重要的字段和使用方法如下所述:

heap:这个指针指向了该内存块从哪个堆申请。它被用于正常的内存块,当他们被释放时,将新释放的块添加到堆的空闲列表中。

prev:这个指针用于指向紧跟这当前 memseg 的头元素。当释放一个内存块时,该指针用于引用上一个内存块,检查上一个块是否也是空闲。如果空闲,则将两个空闲块合并成一个大块。

next_free:这个指针用于将空闲块列表连接在一起。它用于正常的内存块,在 malloc() 接口中用于找到一个合适的空闲块申请出来,在 free() 函数中用于将内存块添加到空闲链表。

state:该字段可以有三个可能值:FREE、BUSY 或 PAD。前两个是指示正常内存块的分配状态,后者用于指示元素结构是在块开始填充结束时的虚拟结构,即,由于对齐限制,块内的数据开始的地方不在块本身的开始处。在这种情况下,Pad 头用于定位块的实际 malloc 元素头。对于结尾的结构,这个字段总是 BUSY,它确保没有元素在释放之后搜索超过 memseg 的结尾以供其它块合并到更大的空闲块。

pad:这个字段为块开始处的填充长度。在正常块头部情况下,它被添加到头结构的结尾,以给出数据区的开始地址,即在 malloc 上传回的地址。在填充虚拟头部时,存储相同的值,并从虚拟头部的地址中减去实际块头部的地址。

size:数据块的大小,包括头部本身。对于结尾结构,这个大小需要指定为 0,虽然从未使用。对于正在释放的正常内存块,使用此大小值替代 next 指针,以标识下一个块的存储位置,在 FREE 情况下,可以合并两个空闲块。

申请内存

在 EAL 初始化时,所有 memseg 都被设置为 malloc_heap 的一部分。此设置包括在 BUSY 状态的末尾放置一个结构体,如果启用了 CONFIG_RTE_MALLOC_DEBUG,则该结构体可能包含一个 sentinel 成员,并为每个 memseg 在开始处放置一个具有 FREE 的 malloc_elem 头部。FREE 元素被添加到 malloc_heap 的空闲列表中。

当 DPDK App 调用 rte_malloc 时,rte_malloc 首先为调用线程索引 lcore_config 结构,并确定该线程的 NUMA node。NUMA node 将作为参数传给 heap_alloc(),用于索引 malloc_heap 数组。参与索引参数还有:大小、类型、对齐方式和边界参数等。

函数 heap_alloc() 将扫描堆的空闲链表,尝试找到一个适用于所请求的大小、对齐方式和边界约束的内存块。当已经识别出合适的空闲元素时,将计算要返回给用户的指针,并且在该指针之前的内存的高速缓存行填充一个 malloc_elem 头部。由于对齐和边界约束,在元素的开头和结尾可能会有空闲的空间,这将导致了下列行为:

检查尾随空间。如果尾部空间足够大,例如:>128 字节,那么空闲元素将被分割。否则,仅仅忽略它(浪费空间)。

检查元素开始处的空间。如果起始处的空间很小,例如:<=128 字节,那么使用填充头,这部分空间被浪费。但是,如果空间很大,那么空闲元素将被分割。

从现有元素的末尾分配内存的优点是:不需要调整空闲链表,空闲链表中现有元素仅调整大小指针,并且后面的元素使用 prev 指针重定向到新创建的元素位置。

释放内存

释放内存,将指向数据区开始的指针传递给 free() 函数。从该指针中减去 malloc_elem 结构的大小,以获得内存块的元素头部。如果这个头部类型是 PAD,那么进一步减去 pad 头部的长度,以获得整个块的正确元素头。

从这个元素头中,我们获得指向块所分配的堆的指针及必须被释放的位置,以及指向前一个元素的指针, 并且通过 size 字段,可以计算下一个元素的指针。这意味着我们永远不会有两个相邻的 FREE 内存块,因为他们总是会被合并成一个大的块。

多线程支持

在 DPDK 的术语中,lcore 描述 EAL thread,本质是一个 Linux/FreeBSD pthread。EAL pthreads 由 EAL 创建和管理,用于执行 rte_eal_remote_launch() 回调的任务(Task)函数。EAL pthread 从逻辑上又可以分为 Master、Slave 两种类型,前者做管理相关的,而后者是真正处理业务的线程。

每个 EAL pthread 都有一个 _lcore_id 作为其 TLS(Thread Local Storage,线程本地存储)的唯一标识,并且由于 EAL pthreads 通常和 CPU 是 1:1 的绑定关系,所以 lcore_id 通常就是 CPU 的 ID。但是,当使用多个 pthread 时,EAL pthread 和物理 CPU 之间的绑定关系就未必总是 1:1 的了。EAL pthread 也有可能与一个 CPU 集合关联,这是的 lcore_id 将与 CPU ID 不同。

我们可以通过 DPDK App 的命令行参数 --lcores 进行设定:

-–lcores=’[@cpu_set][,[@cpu_set],...]

DPDK 为线程操作提供了两个接口 rte_thread_set_affinity() 和 rte_pthread_get_affinity()。当他们在线程上下文中被调用时,将获取或设置线程 TLS,包括 _cpuset 和 _socket_id:

  • _cpuset:存储了与线程相关联的 CPU 位图(BitMap)。
  • _socket_id:存储了 CPU set 所在的 NUMA node。如果 CPU set 中的 CPU 属于不同的 NUMA 节点,_socket_id 将设置为 SOCKET_ID_ANY。

DPDK App 通常会进行 CPU 绑核以避免切换开销。这显然是有利于性能提升的,但同时也会缺乏灵活性。

从性能的角度出发,我们应该从操作系统层面为 DPDK App 隔离出专用的 CPU,这是基于 Linux 的 cgroup 来实现的。以下是 cgroup 的简单示例:在同一个 CPU 上两个线程 t0、t1 用于执行数据包 I/O,并期望只有 50% 的 CPU 消耗在数据包 IO 操作上。

mkdir /sys/fs/cgroup/cpu/pkt_io
mkdir /sys/fs/cgroup/cpuset/pkt_io

echo $cpu > /sys/fs/cgroup/cpuset/cpuset.cpus

echo $t0 > /sys/fs/cgroup/cpu/pkt_io/tasks
echo $t0 > /sys/fs/cgroup/cpuset/pkt_io/tasks

echo $t1 > /sys/fs/cgroup/cpu/pkt_io/tasks
echo $t1 > /sys/fs/cgroup/cpuset/pkt_io/tasks

cd /sys/fs/cgroup/cpu/pkt_io
echo 100000 > pkt_io/cpu.cfs_period_us
echo  50000 > pkt_io/cpu.cfs_quota_us

同时,也可以为 DPDK App 的每个线程或进程绑定一个核心。甚至还可以通过对 BIOS 的电源管理进行设置,让 CPU 处于高性能的工作效率,不过这也会带来 CPU 的损耗以及电费成本的提高。所以用户应该根据自己的实际需求来对 DPDK App 的性能进行优化。

从灵活性的角度出发,用户可以设置 DPDK App 的线程 CPU 亲和性指向一个 CPU 集合而不是绑定到某个单一的 CPU 了。例如:

用户态中断处理

主线程的用户态中断和警告处理:EAL 会创建一个主线程(Master Core)用于轮询 UIO 设备的描述文件以检测中断事件,通过 EAL 提供的函数可以为特定的中断事件注册/解注册一个回调函数(中断处理函数),回调函数在主线程中被异步调用。当然,EAL 也支持像物理网卡中断那样定时的调用回调函数。

需要注意的是,DPDK 实现了基于轮询方式的 PMD(Poll Mode Drivers)网卡驱动,内核态的 UIO Driver(e.g. igb_uio)屏蔽了网卡发出的中断信号,然后由用户态的 PMD Driver 采用主动轮询的方式。所以 DPDK App 的用户态中断处理区别于传统物理网卡的软硬中断。在 DPDK 的 PMD 中,主线程只会对链路状态的改变触发的中断进行处理,例如:网卡的打开和关闭。除了链路状态通知仍必须采用中断方式以外,均使用无中断方式直接操作 PCI 网卡设备的接收和发送队列。这与传统的内核协议栈每接受一个数据包都要触发一次中断完全不同(先抛开 NAPI 不谈)。

RX 中断事件:PMD 提供的报文收发程序并不只限制于单存的轮询机制。为了缓解小吞吐量场景中轮询模式对 CPU 资源的浪费,所以,PMD 还实现了 “暂停轮询并等待唤醒事件” 的设计,即 Interrupt DPDK(中断 DPDK)模式。

Interrupt DPDK 的原理和 NAPI 很像,就是 PMD 在没数据包需要处理时自动进入睡眠,改为中断通知,接收到收包中断信号后,激活主动轮询。这就是所谓的链路状态中断通知。并且 Interrupt DPDK 还可以和其他进程共享一个 CPU Core,但 DPDK 进程仍具有更高的调度优先级。

EAL 提供了这种事件驱动模式相关的 rte_eth_dev_rx_intr_* 接口,来实现控制、使能、关闭。当 PMD 不支持 RX 中断时,这些 API 会返回失败。Intr_conf.rxq 标识用于打开每个设备的 RX 中断。

以 Linux 上运行的 DPDK App 为例,其实现依赖于 epoll 技术。每个线程可以监控一个 epoll 实例,而在实例中可以添加所有需要的 wake-up 事件的文件描述符。事件文件描述符创建并根据 UIO/VFIO 的规范来映射到指定的中断向量上。EAL 初始化过程中,完成了中断向量和事件文件描述符之间的映射关系,同时为每个 PCI 设备初始化中断向量和队列之间的映射关系。这样一来,EAL 实际上并不知道在指定向量上发生的中断,这是由设备驱动来负责执行后面的映射的。

3.2 Ring库

环形缓冲区支持队列管理。rte_ring并不是具有无限大小的链表,它具有如下属性:

  • 先进先出(FIFO)
  • 最大大小固定,指针存储在表中

无锁实现

  • 多消费者或单消费者出队操作
  • 多生产者或单生产者入队操作
  • 批量出队 - 如果成功,将指定数量的元素出队,否则什么也不做
  • 批量入队 - 如果成功,将指定数量的元素入队,否则什么也不做
  • 突发出队 - 如果指定的数目出队失败,则将最大可用数目对象出队
  • 突发入队 - 如果指定的数目入队失败,则将最大可入队数目对象入队
  • 单生产者入队:

单消费者出队

rte_ring_tailq保存rte_ring链表

创建ring后会将其插入共享内存链表rte_ring_tailq,以便主从进程都可以访问。

//定义队列头结构 struct rte_tailq_elem_head
TAILQ_HEAD(rte_tailq_elem_head, rte_tailq_elem);

//声明全局变量rte_tailq_elem_head,类型为struct rte_tailq_elem_head,
//相当于是链表头,用来保存本进程注册的队列
/* local tailq list */
static struct rte_tailq_elem_head rte_tailq_elem_head =
    TAILQ_HEAD_INITIALIZER(rte_tailq_elem_head);

//调用EAL_REGISTER_TAILQ在main函数前注册rte_ring_tailq到全局变量rte_tailq_elem_head。
#define RTE_TAILQ_RING_NAME "RTE_RING"
static struct rte_tailq_elem rte_ring_tailq = {
    .name = RTE_TAILQ_RING_NAME,
};
EAL_REGISTER_TAILQ(rte_ring_tailq)

调用rte_eal_tailq_update遍历链表rte_tailq_elem_head上的节点,将节点中的head指向 struct rte_mem_ring->tailq_head[]数组中的一个tailq_head,此head又作为另一个链表头。比如注册的rte_ring_tailq节点,其head专门用来保存创建的rte_ring(将rte_ring作为struct rte_tailq_entry的data,将struct rte_tailq_entry插入head)。前面说过struct rte_mem_ring->tailq_head存放在共享内存中,主从进程都可以访问,这样对于rte_ring来说,主从进程都可以创建/访问ring。 相关的数据结构如下图所示:

创建ring

调用函数rte_ring_create创建ring,它会申请一块memzone的内存,大小为struct rte_ring结构加上count个void类型指针,内存结构如下:

然后将ring中生产者和消费者的头尾指向0,最后将ring作为struct rte_tailq_entry的data插入共享内存链表,这样主从进程都可以访问此ring。

/**
 * An RTE ring structure.
 *
 * The producer and the consumer have a head and a tail index. The particularity
 * of these index is that they are not between 0 and size(ring). These indexes
 * are between 0 and 2^32, and we mask their value when we access the ring[]
 * field. Thanks to this assumption, we can do subtractions between 2 index
 * values in a modulo-32bit base: that's why the overflow of the indexes is not
 * a problem.
 */
struct rte_ring {
    /*
     * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
     * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
     * next time the ABI changes
     */
    char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */
    //flags有如下三个值:
    //RING_F_SP_ENQ创建单生产者,
    //RING_F_SC_DEQ创建单消费者,
    //RING_F_EXACT_SZ
    int flags;               /**< Flags supplied at creation. */
    //memzone内存管理的底层结构,用来分配内存
    const struct rte_memzone *memzone;
            /**< Memzone, if any, containing the rte_ring */
    //size为ring大小,值和RING_F_EXACT_SZ有关,如果指定了flag     
    //RING_F_EXACT_SZ,则size为rte_ring_create的参数count的
    //向上取2次方,比如count为15,则size就为16。如果没有指定
    //flag,则count必须是2的次方,此时size等于count
    uint32_t size;           /**< Size of ring. */
    //mask值为size-1
    uint32_t mask;           /**< Mask (size-1) of ring. */
    //capacity的值也和RING_F_EXACT_SZ有关,如果指定了,
    //则capacity为rte_ring_create的参数count,如果没指定,
    //则capacity为size-1
    uint32_t capacity;       /**< Usable size of ring */

    //生产者位置,包含head和tail,head代表着下一次生产时的起
    //始位置。tail代表消费者可以消费的位置界限,到达tail后就无  
    //法继续消费,通常情况下生产完成后tail = head,意味着刚生
    //产的元素皆可以被消费
    /** Ring producer status. */
    struct rte_ring_headtail prod __rte_aligned(PROD_ALIGN);

    // 消费者位置,也包含head和tail,head代表着下一次消费时的
    //起始位置。tail代表生产者可以生产的位置界限,到达tail后就
    //无法继续生产,通常情况下消费完成后,tail =head,意味着
    //刚消费的位置皆可以被生产
    /** Ring consumer status. */
    struct rte_ring_headtail cons __rte_aligned(CONS_ALIGN);
};

下面看一下在函数rte_ring_create中ring是如何被创建的:

/* create the ring */
struct rte_ring *
rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags)
{
    char mz_name[RTE_MEMZONE_NAMESIZE];
    struct rte_ring *r;
    struct rte_tailq_entry *te;
    const struct rte_memzone *mz;
    ssize_t ring_size;
    int mz_flags = 0;
    struct rte_ring_list* ring_list = NULL;
    const unsigned int requested_count = count;
    int ret;

    //(tailq_entry)->tailq_head 的类型应该是 struct rte_tailq_entry_head,
    //但是返回的却是 struct rte_ring_list,因为 rte_tailq_entry_head 和 rte_ring_list 定义都是一样的,
    //可以认为是等同的。
    #define RTE_TAILQ_CAST(tailq_entry, struct_name) \
        (struct struct_name *)&(tailq_entry)->tailq_head
    ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);

    /* for an exact size ring, round up from count to a power of two */
    if (flags & RING_F_EXACT_SZ)
        count = rte_align32pow2(count + 1);

    //获取需要的内存大小,包括结构体 struct rte_ring 和 count 个指针
    ring_size = rte_ring_get_memsize(count);
        ssize_t sz;
        sz = sizeof(struct rte_ring) + count * sizeof(void *);
        sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
    
    #define RTE_RING_MZ_PREFIX "RG_"
    snprintf(mz_name, sizeof(mz_name), "%s%s", RTE_RING_MZ_PREFIX, name);

    //分配 struct rte_tailq_entry,用来将申请的ring挂到共享链表ring_list中
    te = rte_zmalloc("RING_TAILQ_ENTRY", sizeof(*te), 0);

    rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);

    //申请memzone,
    /* reserve a memory zone for this ring. If we can't get rte_config or
     * we are secondary process, the memzone_reserve function will set
     * rte_errno for us appropriately - hence no check in this this function */
    mz = rte_memzone_reserve_aligned(mz_name, ring_size, socket_id, mz_flags, __alignof__(*r));
    if (mz != NULL) {
        //memzone的的addr指向分配的内存,ring也从此内存开始
        r = mz->addr;
        /* no need to check return value here, we already checked the
         * arguments above */
        rte_ring_init(r, name, requested_count, flags);

        //将ring保存到链表entry中
        te->data = (void *) r;
        r->memzone = mz;

        //将链表entry插入链表ring_list
        TAILQ_INSERT_TAIL(ring_list, te, next);
    } else {
        r = NULL;
        RTE_LOG(ERR, RING, "Cannot reserve memory\n");
        rte_free(te);
    }
    rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);

    return r;
}

int
rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
    unsigned flags)
{
    int ret;

    /* compilation-time checks */
    RTE_BUILD_BUG_ON((sizeof(struct rte_ring) &
              RTE_CACHE_LINE_MASK) != 0);
    RTE_BUILD_BUG_ON((offsetof(struct rte_ring, cons) &
              RTE_CACHE_LINE_MASK) != 0);
    RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
              RTE_CACHE_LINE_MASK) != 0);

    /* init the ring structure */
    memset(r, 0, sizeof(*r));
    ret = snprintf(r->name, sizeof(r->name), "%s", name);
    if (ret < 0 ret>= (int)sizeof(r->name))
        return -ENAMETOOLONG;
    r->flags = flags;
    r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
    r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;

    if (flags & RING_F_EXACT_SZ) {
        r->size = rte_align32pow2(count + 1);
        r->mask = r->size - 1;
        r->capacity = count;
    } else {
        if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) {
            RTE_LOG(ERR, RING,
                "Requested size is invalid, must be power of 2, and not exceed the size limit %u\n",
                RTE_RING_SZ_MASK);
            return -EINVAL;
        }
        r->size = count;
        r->mask = count - 1;
        r->capacity = r->mask;
    }
    //初始时,生产者和消费者的首尾都为0
    r->prod.head = r->cons.head = 0;
    r->prod.tail = r->cons.tail = 0;

    return 0;
}

入队操作

DPDK提供了如下几个api用来执行入队操作,它们最终都会调用__rte_ring_do_enqueue来实现,所以重点分析函数__rte_ring_do_enqueue。

//多生产者批量入队。入队个数n必须全部成功,否则入队失败。调用者明确知道是多生产者
rte_ring_mp_enqueue_bulk
//单生产者批量入队。入队个数n必须全部成功,否则入队失败。调用者明确知道是单生产者
rte_ring_sp_enqueue_bulk
//批量入队。入队个数n必须全部成功,否则入队失败。调用者不用关心是不是单生产者
rte_ring_enqueue_bulk
//多生产者批量入队。入队个数n不一定全部成功。调用者明确知道是多生产者
rte_ring_mp_enqueue_burst
//单生产者批量入队。入队个数n不一定全部成功。调用者明确知道是单生产者
rte_ring_sp_enqueue_burst
//批量入队。入队个数n不一定全部成功。调用者不用关心是不是单生产者
rte_ring_enqueue_burst

__rte_ring_do_enqueue主要做了三个事情:

  • a. 移动生产者head,此处在多生产者下可能会有冲突,需要使用cas操作循环检测,只有自己能移动head时才行。
  • b. 执行入队操作,将obj插入ring,从老的head开始,直到新head结束。
  • c. 更新生产者tail,只有这样消费者才能看到最新的消费对象。

其参数r指定了目标ring。 参数obj_table指定了入队对象。 参数n指定了入队对象个数。 参数behavior指定了入队行为,有两个值RTE_RING_QUEUE_FIXED和RTE_RING_QUEUE_VARIABLE,前者表示入队对象必须一次性全部成功,后者表示尽可能多的入队。 参数is_sp指定了是否为单生产者模式,默认为多生产者模式。

static __rte_always_inline unsigned int
__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
         unsigned int n, enum rte_ring_queue_behavior behavior,
         int is_sp, unsigned int *free_space)
{
    uint32_t prod_head, prod_next;
    uint32_t free_entries;

    //先移动生产者的头指针,prod_head保存移动前的head,prod_next保存移动后的head
    n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
            &prod_head, &prod_next, &free_entries);
    if (n == 0)
        goto end;

    //&r[1]指向存放对象的内存。
    //从prod_head开始,将n个对象obj_table插入ring的prod_head位置
    ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
    rte_smp_wmb();

    //更新生产者tail
    update_tail(&r->prod, prod_head, prod_next, is_sp);
end:
    if (free_space != NULL)
        *free_space = free_entries - n;
    return n;
}

__rte_ring_move_prod_head用来使用cas操作更新生产者head。

static __rte_always_inline unsigned int
__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
        unsigned int n, enum rte_ring_queue_behavior behavior,
        uint32_t *old_head, uint32_t *new_head,
        uint32_t *free_entries)
{
    const uint32_t capacity = r->capacity;
    unsigned int max = n;
    int success;

    do {
        /* Reset n to the initial burst count */
        n = max;

        //获取生产者当前的head位置
        *old_head = r->prod.head;

        /* add rmb barrier to avoid load/load reorder in weak
         * memory model. It is noop on x86
         */
        rte_smp_rmb();

        const uint32_t cons_tail = r->cons.tail;
        /*
         *  The subtraction is done between two unsigned 32bits value
         * (the result is always modulo 32 bits even if we have
         * *old_head > cons_tail). So 'free_entries' is always between 0
         * and capacity (which is < size. entry free_entries='(capacity' cons_tail - old_head entry0 entry check that we have enough room in ring if unlikelyn> *free_entries))
            n = (behavior == RTE_RING_QUEUE_FIXED) ?
                    0 : *free_entries;

        if (n == 0)
            return 0;

        //当前head位置加上入队对象个数获取新的生产者head
        *new_head = *old_head + n;
        //如果是单生产者,直接更新生产者head,并返回1
        if (is_sp)
            r->prod.head = *new_head, success = 1;
        else //如果是多生产者,需要借助函数rte_atomic32_cmpset,比较old_head和r->prod.head是否相同,
             //如果相同,则将r->prod.head更新为new_head,并返回1,退出循环,
             //如果不相同说明有其他生产者更新head了,返回0,继续循环。
            success = rte_atomic32_cmpset(&r->prod.head,
                    *old_head, *new_head);
    } while (unlikely(success == 0));
    return n;
}

ENQUEUE_PTRS定义了入队操作。

/* the actual enqueue of pointers on the ring.
 * Placed here since identical code needed in both
 * single and multi producer enqueue functions */
#define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \
    unsigned int i; \
    const uint32_t size = (r)->size; \
    uint32_t idx = prod_head & (r)->mask; \
    obj_type *ring = (obj_type *)ring_start; \
    //idx+n 大于 size,说明入队n个对象后,ring还没满,还没翻转
    if (likely(idx + n < size)) { \
        //一次循环入队四个对象
        for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
            ring[idx] = obj_table[i]; \
            ring[idx+1] = obj_table[i+1]; \
            ring[idx+2] = obj_table[i+2]; \
            ring[idx+3] = obj_table[i+3]; \
        } \
        //还有剩余不满四个对象,则在switch里入队
        switch (n & 0x3) { \
        case 3: \
            ring[idx++] = obj_table[i++]; /* fallthrough */ \
        case 2: \
            ring[idx++] = obj_table[i++]; /* fallthrough */ \
        case 1: \
            ring[idx++] = obj_table[i++]; \
        } \
    } else { \
        //入队n个对象,会导致ring满,发生翻转,
        //则先入队idx到size的位置,
        for (i = 0; idx < size; i++, idx++)\
            ring[idx] = obj_table[i]; \
        //再翻转回到ring起始位置,入队剩余的对象
        for (idx = 0; i < n; i++, idx++) \
            ring[idx] = obj_table[i]; \
    } \
} while (0)

最后更新生产者tail。

static __rte_always_inline void
update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
        uint32_t single)
{
    /*
     * If there are other enqueues/dequeues in progress that preceded us,
     * we need to wait for them to complete
     */
    if (!single)
        //多生产者时,必须等到其他生产者入队成功,再更新自己的tail
        while (unlikely(ht->tail != old_val))
            rte_pause();

    ht->tail = new_val;
}

出队操作

DPDK提供了如下几个api用来执行出队操作,它们最终都会调用__rte_ring_do_dequeue来实现,所以重点分析函数__rte_ring_do_dequeue。

//多消费者批量出队。出队个数n必须全部成功,否则出队失败。调用者明确知道是多消费者
rte_ring_mc_dequeue_bulk
//单消费者批量出队。出队个数n必须全部成功,否则出队失败。调用者明确知道是单消费者
rte_ring_sc_dequeue_bulk
//批量出队。出队个数n必须全部成功,否则出队失败。调用者不用关心是不是单消费者
rte_ring_dequeue_bulk
//多消费者批量出队。出队个数n不一定全部成功。调用者明确知道是多消费者
rte_ring_mc_dequeue_burst
//单消费者批量出队。出队个数n不一定全部成功。调用者明确知道是单消费者
rte_ring_sc_dequeue_burst
//批量出队。出队个数n不一定全部成功。调用者不用关心是不是单消费者
rte_ring_dequeue_burst

__rte_ring_do_dequeue主要做了三个事情:

a. 移动消费者head,此处在多消费者下可能会有冲突,需要使用cas操作循环检测,只有自己能移动head时才行。 b. 执行出队操作,将ring中的obj插入obj_table,从老的head开始,直到新head结束。 c. 更新消费者tail,只有这样生成者才能进行生产。

其参数r指定了目标ring。 参数obj_table指定了出队对象出队后存放位置。 参数n指定了入队对象个数。 参数behavior指定了出队行为,有两个值RTE_RING_QUEUE_FIXED和RTE_RING_QUEUE_VARIABLE,前者表示出队对象必须一次性全部成功,后者表示尽可能多的出队。 参数is_sp指定了是否为单消费者模式,默认为多消费者模式。

static __rte_always_inline unsigned int
__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
         unsigned int n, enum rte_ring_queue_behavior behavior,
         int is_sc, unsigned int *available)
{
    uint32_t cons_head, cons_next;
    uint32_t entries;

    //先移动消费者head,成功后,cons_head为老的head,cons_next为新的head,
    //两者之间的部分为此次可消费的对象
    n = __rte_ring_move_cons_head(r, is_sc, n, behavior,
            &cons_head, &cons_next, &entries);
    if (n == 0)
        goto end;

    //执行出队操作,从老的cons_head开始出队n个对象
    DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
    rte_smp_rmb();

    //更新消费者tail,和前面更新生产者head代码相同
    update_tail(&r->cons, cons_head, cons_next, is_sc);

end:
    if (available != NULL)
        *available = entries - n;
    return n;
}

__rte_ring_move_cons_head用来使用cas操作更新消费者head。

static __rte_always_inline unsigned int
__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
        unsigned int n, enum rte_ring_queue_behavior behavior,
        uint32_t *old_head, uint32_t *new_head,
        uint32_t *entries)
{
    unsigned int max = n;
    int success;

    /* move cons.head atomically */
    do {
        /* Restore n as it may change every loop */
        n = max;

        //取出当前head位置
        *old_head = r->cons.head;

        /* add rmb barrier to avoid load/load reorder in weak
         * memory model. It is noop on x86
         */
        rte_smp_rmb();

        //生产者tail减去消费者head为可消费的对象个数。
        //因为head和tail都是无符号32位类型,即使生产者tail比消费者head
        //小,也能正确得出结果,不用担心溢出。
        const uint32_t prod_tail = r->prod.tail;
        /* The subtraction is done between two unsigned 32bits value
         * (the result is always modulo 32 bits even if we have
         * cons_head > prod_tail). So 'entries' is always between 0
         * and size(ring)-1. */
        *entries = (prod_tail - *old_head);

        //要求出队对象个数大于实际可消费对象个数
        /* Set the actual entries for dequeue */
        if (n > *entries)
            //此时如果behavior为RTE_RING_QUEUE_FIXED,表示必须满足n,满足不了就一个都不出队,返回0,
            //如果不为RTE_RING_QUEUE_FIXED,则尽可能多的出队
            n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;

        if (unlikely(n == 0))
            return 0;
        
        //当前head加上n即为新的消费者head
        *new_head = *old_head + n;
        if (is_sc)
            //如果单消费者,直接更新head即可,返回1
            r->cons.head = *new_head, success = 1;
        else
            //多消费者,需要借用rte_atomic32_cmpset更新head
            success = rte_atomic32_cmpset(&r->cons.head, *old_head,
                    *new_head);
    } while (unlikely(success == 0));
    return n;
}

ring是否满或者是否为空

函数rte_ring_full用来判断ring是否满
static inline int
rte_ring_full(const struct rte_ring *r)
{
    return rte_ring_free_count(r) == 0;
}

static inline unsigned
rte_ring_free_count(const struct rte_ring *r)
{
    return r->capacity - rte_ring_count(r);
}

函数rte_ring_empty用来判断ring是否为空

static inline int
rte_ring_empty(const struct rte_ring *r)
{
    return rte_ring_count(r) == 0;
}

判断ring是否为空或者是否满都需要调用rte_ring_count获取当前ring中已使用的个数。

static inline unsigned
rte_ring_count(const struct rte_ring *r)
{
    uint32_t prod_tail = r->prod.tail;
    uint32_t cons_tail = r->cons.tail;
    uint32_t count = (prod_tail - cons_tail) & r->mask;
    return (count > r->capacity) ? r->capacity : count;
}

3.3 Mempool库

DPDK提供了内存池机制,使得内存的管理的使用更加简单安全。在设计大的数据结构时,都可以使用mempool分配内存,同时,mempool也提供了内存的获取和释放等操作接口。对于数据包mempool甚至提供了更加详细的接口-rte_pktmbuf_pool_create()。

Mempool是固定大小的对象分配器。 在DPDK中,它由名称唯一标识,并且使用mempool操作来存储空闲对象。Mempool的组织是通过三个部分实现的:

  • mempool对象节点:mempool的对象挂接在 static struct rte_tailq_elem rte_mempool_tailq 全局队列中,可以通过名字进行唯一标识符;此队列只是mempool的一个对象指示结构,并不是实际的内存区;
  • mempool实际内存区: struct rte_memzone 是实际分配的连续内存空间,存储所创建的mempool对象;
  • ring无锁队列:作为一个无锁环形队列 struct rte_ring ,存储着mempool对象的指针,提供了方便存取使用mempool的空间的办法。

一般结构

如图所示,mempool的对象通过与ring无锁队列建立关联方便存取;同时,为了减少多核访问造成的冲突,引入了local_cache对象缓冲区。该local_cache非硬件上的cache,而是为了减少多核访问ring造成的临界区访问,

coreX app会优先访问该local_cache上的对象。入队的时候优先入local_cache中,出队的时候优先从local_cache中出队。

mempool的创建和使用

先注意一下 rte_mempool_create 的参数中的两个 mp_init 和 obj_init ,前者负责初始化mempool中配置的私有参数,如在数据包中加入的我们自己的私有结构;后者负责初始化每个mempool对象。我们然后按照mempool的3个关键部分展开说明。

(1)mempool头结构的创建

mempool头结构包含3个部分: struct rte_mempool , struct rte_mempool_cache 和mempool private。创建是在 rte_mempool_create_empty() 中完成的,看这个函数,先进行了对齐的检查:

 RTE_BUILD_BUG_ON((sizeof(struct rte_mempool) &
              RTE_CACHE_LINE_MASK) != 0);
    RTE_BUILD_BUG_ON((sizeof(struct rte_mempool_cache) &
              RTE_CACHE_LINE_MASK) != 0);

然后从mempool队列中取出头节点,我们创建的mempool结构填充好,就挂接在这个节点上。接下来做一些检查工作和创建flag的设置。

rte_mempool_calc_obj_size() 计算了每个obj的大小,这个obj又是由三个部分组成的,header_size、elt_size、trailer_size,即头,数据区,尾。在没有开启RTE_LIBRTE_MEMPOOL_DEBUG调试时,没有尾部分;头部分的结构为: struct rte_mempool_objhdr ,

通过这个头部,mempool中的obj都是链接到队列中的,所以,提供了遍历obj的方式(尽管很少这么用)。函数返回最后计算对齐后的obj的大小,为后面分配空间提供依据。然后分配了一个mempool队列条目,为后面挂接在队列做准备。

 /* try to allocate tailq entry */
    te = rte_zmalloc("MEMPOOL_TAILQ_ENTRY", sizeof(*te), 0);
    if (te == NULL) {
        RTE_LOG(ERR, MEMPOOL, "Cannot allocate tailq entry!\n");
        goto exit_unlock;
    }

接下来,就是计算整个mempool头结构多大。

 mempool_size = MEMPOOL_HEADER_SIZE(mp, cache_size);
    mempool_size += private_data_size;
    mempool_size = RTE_ALIGN_CEIL(mempool_size, RTE_MEMPOOL_ALIGN);

这里指的是计算mempool的头结构的大小。而不是内存池实际的大小。在这里可以清晰的看出这个mempool头结构是由三部分组成的。cache计算的是所有核上的cache之和。

然后,使用 rte_memzone_reserve() 分配这个mempool头结构大小的空间,填充mempool结构体,并把mempool头结构中的cache地址分配给mempool。初始化这部分cache。

最后就是挂接mempool结构。 TAILQ_INSERT_TAIL(mempool_list, te, next); (这里上了锁?)。

(2)mempool实际空间的创建

这部分的创建是在函数
rte_mempool_populate_default(struct rte_mempool *mp) 中完成的。

首先计算为这些元素需要分配多大的空间,
rte_mempool_ops_calc_mem_size()

接着
rte_memzone_reserve_aligned() 分配空间。把元素添加到mempool,实际上就是把申请的空间分给每个元素。

(3)ring的创建

先看到的是这么一段代码:

static int
mempool_ops_alloc_once(struct rte_mempool *mp)
{
    int ret;

    /* create the internal ring if not already done */
    if ((mp->flags & MEMPOOL_F_POOL_CREATED) == 0) {
        ret = rte_mempool_ops_alloc(mp);
        if (ret != 0)
            return ret;
        mp->flags |= MEMPOOL_F_POOL_CREATED;
    }
    return 0;
}

这就是创建ring的过程咯,其中的函数rte_mempool_ops_alloc()就是实现。那么,对应的ops->alloc()在哪注册的呢?

/*
     * Since we have 4 combinations of the SP/SC/MP/MC examine the flags to
     * set the correct index into the table of ops structs.
     */
    if ((flags & MEMPOOL_F_SP_PUT) && (flags & MEMPOOL_F_SC_GET))
        ret = rte_mempool_set_ops_byname(mp, "ring_sp_sc", NULL);
    else if (flags & MEMPOOL_F_SP_PUT)
        ret = rte_mempool_set_ops_byname(mp, "ring_sp_mc", NULL);
    else if (flags & MEMPOOL_F_SC_GET)
        ret = rte_mempool_set_ops_byname(mp, "ring_mp_sc", NULL);
    else
        ret = rte_mempool_set_ops_byname(mp, "ring_mp_mc", NULL);

就是根据ring的类型,来注册对应的操作函数,如默认的就是ring_mp_mc,多生产者多消费者模型,其操作函数不难找到:

static const struct rte_mempool_ops ops_mp_mc = {
    .name = "ring_mp_mc",
    .alloc = common_ring_alloc,
    .free = common_ring_free,
    .enqueue = common_ring_mp_enqueue,
    .dequeue = common_ring_mc_dequeue,
    .get_count = common_ring_get_count,
};

接下来,又分配了一个 struct rte_mempool_memhdr *memhdr; 结构的变量,就是这个变量管理着mempool的实际内存区,它记录着mempool实际地址区的物理地址,虚拟地址,长度等信息。

再然后,就是把每个元素对应到mempool池中了: mempool_add_elem() 。在其中,把每个元素都挂在了elt_list中,可以遍历每个元素。最后
rte_mempool_ops_enqueue_bulk(struct rte_mempool *mp, void * const *obj_table, ,最终,把元素对应的地址入队,这样,mempool中的每个元素都放入了ring中。

四:mempool的使用

mempool的常见使用是获取元素空间和释放空间。

  • rte_mempool_get可以获得池中的元素,其实就是从ring取出可用元素的地址。
  • rte_mempool_put可以释放元素到池中。
  • rte_mempool_in_use_count查看池中已经使用的元素个数
  • rte_mempool_avail_count 查看池中可以使用的元素个数

3.4 mbuf库

报文缓冲区库(Mbuf)提供了申请和释放缓冲区的功能,DPDK应用程序使用这些buffer存储消息缓冲。消息缓冲存储在mempool中,使用内存池库 。
数据结构rte_mbuf可以承载网络数据包buffer或者通用控制消息buffer(由CTRL_MBUF_FLAG指示)。也可以扩展到其他类型。rte_mbuf头部结构尽可能小,目前只使用两个缓存行,最常用的字段位于第一个缓存行中。

报文缓冲区设计

为了存储数据包数据(包括协议头部),考虑了两种方法:

  1. 在单个存储buffer中嵌入metadata,后面跟着数据包数据固定大小区域
  2. 为metadata和报文数据分别使用独立的存储buffer。

第一种方法的优点是他只需要一个操作来分配/释放数据包的整个存储表示。但是,第二种方法更加灵活,并允许将元数据的分配与报文数据缓冲区的分配完全分离。
DPDK选择了第一种方法。Metadata包含诸如消息类型,长度,到数据开头的偏移量等控制信息,以及允许缓冲链接的附加mbuf结构指针。

用于承载网络数据包buffer的消息缓冲可以处理需要多个缓冲区来保存完整数据包的情况。许多通过下一个字段链接在一起的mbuf组成的jumbo帧,就是这种情况。

对于新分配的mbuf,数据开始的区域是buffer之后 RTE_PKTMBUF_HEADROOM 字节的位置,这是缓存对齐的。 Message buffers可以在系统中的不同实体中携带控制信息,报文,事件等。 Message buffers也可以使用起buffer指针来指向其他消息缓冲的数据字段或其他数据结构。

Buffer Manager实现了一组相当标准的buffer访问操作来操纵网络数据包。

存储在内存池中的缓冲区

Buffer Manager使用内存池库来申请buffer。因此确保了数据包头部均衡分布到信道上,有利于L3处理。mbuf中包含一个字段,用于表示它从哪个池中申请出来。当调用 rte_ctrlmbuf_free(m) 或 rte_pktmbuf_free(m),mbuf被释放到原来的池中。

构造函数

Packet及control mbuf构造函数由API提供。接口rte_pktmbuf_init()及rte_ctrlmbuf_init()初始化mbuf结构中的某些字段,这些字段一旦创建将不会被用户修改(如mbuf类型、源池、缓冲区起始地址等)。此函数在池创建时作为rte_mempool_create()函数的回掉函数给出。

缓冲区申请及释放

分配一个新mbuf需要用户指定从哪个池中申请。对于任意新分配的mbuf,它包含一个段,长度为0。 缓冲区到数据的偏移量被初始化,以便使得buffer具有一些字节(RTE_PKTMBUF_HEADROOM)的headroom。
释放mbuf意味着将其返回到原始的mempool。当mbuf的内容存储在一个池中(作为一个空闲的mbuf)时,mbuf的内容不会被修改。由构造函数初始化的字段不需要在mbuf分配时重新初始化。
当释放包含多个段的数据包mbuf时,他们都被释放,并返回到原始mempool。

缓冲区操作

这个库提供了一些操作数据包mbuf中的数据的功能。例如:

  • 获取数据长度
  • 获取指向数据开始位置的指针
  • 数据前插入数据
  • 数据之后添加数据
  • 删除缓冲区开头的数据(rte_pktmbuf_adj())
  • 删除缓冲区末尾的数据(rte_pktmbuf_trim())详细信息请参阅 DPDK API Reference

元数据信息

数据包的一些信息由网络驱动程序检索并存储在mbuf中使得处理更简单。例如,VLAN、RSS哈希结果(参见 Poll Mode Driver)及校验和由硬件计算的标志等。
一个报文缓冲区中还包含数据源端口和报文链中mbuf数目。对于链接的mbuf,只有链的第一个mbuf存储这个元信息。

例如,对于IEEE1588数据包,RX侧就是这种情况,时间戳机制,VLAN标记和IP校验和计算。
在TX端,应用程序还可以将一些处理委托给硬件。 例如,PKT_TX_IP_CKSUM标志允许卸载IPv4校验和的计算。
以下示例说明如何在vxlan封装的tcp数据包上配置不同的TX offloads:

out_eth/out_ip/out_udp/vxlan/in_eth/in_ip/in_tcp/payload

计算out_ip的校验和:

mb->l2_len = len(out_eth)
mb->l3_len = len(out_ip)
mb->ol_flags |= PKT_TX_IPV4 | PKT_TX_IP_CSUM
set out_ip checksum to 0 in the packet
  • 配置DEV_TX_OFFLOAD_IPV4_CKSUM支持在硬件计算。

计算out_ip 和 out_udp的校验和:

mb->l2_len = len(out_eth)
mb->l3_len = len(out_ip)
mb->ol_flags |= PKT_TX_IPV4 | PKT_TX_IP_CSUM | PKT_TX_UDP_CKSUM
set out_ip checksum to 0 in the packet
set out_udp checksum to pseudo header using rte_ipv4_phdr_cksum()

配置DEV_TX_OFFLOAD_IPV4_CKSUM 和 DEV_TX_OFFLOAD_UDP_CKSUM支持在硬件上计算。

计算in_ip的校验和:

mb->l2_len = len(out_eth + out_ip + out_udp + vxlan + in_eth)
mb->l3_len = len(in_ip)
mb->ol_flags |= PKT_TX_IPV4 | PKT_TX_IP_CSUM
set in_ip checksum to 0 in the packet

这以情况1类似,但是l2_len不同。 配置DEV_TX_OFFLOAD_IPV4_CKSUM支持硬件计算。 注意,只有外部L4校验和为0时才可以工作。

计算in_ip 和 in_tcp的校验和:

mb->l2_len = len(out_eth + out_ip + out_udp + vxlan + in_eth)
mb->l3_len = len(in_ip)
mb->ol_flags |= PKT_TX_IPV4 | PKT_TX_IP_CSUM | PKT_TX_TCP_CKSUM
在报文中设置in_ip校验和为0
使用rte_ipv4_phdr_cksum()将in_tcp校验和设置为伪头

这与情况2类似,但是l2_len不同。 配置DEV_TX_OFFLOAD_IPV4_CKSUM 和 DEV_TX_OFFLOAD_TCP_CKSUM支持硬件实现。 注意,只有外部L4校验和为0才能工作。

segment inner TCP:

mb->l2_len = len(out_eth + out_ip + out_udp + vxlan + in_eth)
mb->l3_len = len(in_ip)
mb->l4_len = len(in_tcp)
mb->ol_flags |= PKT_TX_IPV4 | PKT_TX_IP_CKSUM | PKT_TX_TCP_CKSUM | PKT_TX_TCP_SEG;
在报文中设置in_ip校验和为0
将in_tcp校验和设置为伪头部,而不使用IP载荷长度
配置DEV_TX_OFFLOAD_TCP_TSO支持硬件实现。 注意,只有L4校验和为0时才能工作。

计算out_ip, in_ip, in_tcp的校验和:

mb->outer_l2_len = len(out_eth)
mb->outer_l3_len = len(out_ip)
mb->l2_len = len(out_udp + vxlan + in_eth)
mb->l3_len = len(in_ip)
mb->ol_flags|=PKT_TX_OUTER_IPV4|PKT_TX_OUTER_IP_CKSUM | PKT_TX_IP_CKSUM |  PKT_TX_TCP_CKSUM;
设置 out_ip 校验和为0
设置 in_ip 校验和为0
使用rte_ipv4_phdr_cksum()设置in_tcp校验和为伪头部

配置DEV_TX_OFFLOAD_IPV4_CKSUM、DEV_TX_OFFLOAD_UDP_CKSUM、
DEV_TX_OFFLOAD_OUTER_IPV4_CKSUM支持硬件实现。

Flage标记的意义在mbuf API文档(rte_mbuf.h)中有详细描述。 更多详细信息还可以参阅testpmd 源码(特别是csumonly.c)。

直接缓冲区和间接缓冲区

直接缓冲区是指缓冲区完全独立。间接缓冲区的行为类似于直接缓冲区,但缓冲区的指针和数据偏移量指的是另一个直接缓冲区的数据。这在数据包需要复制或分段的情况下是很有用的,因为间接缓冲区提供跨越多个缓冲区重用相同数据包数据的手段。

当使用接口 rte_pktmbuf_attach() 函数将缓冲区附加到直接缓冲区时,该缓冲区变成间接缓冲区。每个缓冲区有一个引用计数器字段,每当直接缓冲区附加一个间接缓冲区时,直接缓冲区上的引用计数器递增。类似的,每当间接缓冲区被分裂时,直接缓冲区上的引用计数器递减。如果生成的引用计数器为0,则直接缓冲区将被释放,因为它不再使用。

处理间接缓冲区时需要注意几件事情。首先,间接缓冲区从不附加到另一个间接缓冲区。尝试将缓冲区A附加到间接缓冲区B(且B附加到C上了),将使得rte_pktmbuf_attach() 自动将A附加到C上。其次,为了使缓冲区变成间接缓冲区,其引用计数必须等于1,也就是说它不能被另一个间接缓冲区引用。最后,不可能将间接缓冲区重新链接到直接缓冲区(除非它已经被分离了)。

虽然可以使用推荐的rte_pktmbuf_attach()和rte_pktmbuf_detach()函数直接调用附加/分离操作,但建议使用更高级的rte_pktmbuf_clone()函数,该函数负责间接缓冲区的正确初始化,并可以克隆具有多个段的缓冲区。
由于间接缓冲区不应该实际保存任何数据,间接缓冲区的内存池应配置为指示减少的内存消耗。可以在几个示例应用程序中找到用于间接缓冲区的内存池(以及间接缓冲区的用例示例)的初始化示例,例如IPv4组播示例应用程序。

调试

在调试模式(CONFIG_RTE_MBUF_DEBUG使能)下,mbuf库的函数在任何操作之前执行完整性检查(如缓冲区检查、类型错误等)。

用例

所有网络应用程序都应该使用mbufs来传输网络数据包。

3.5 PMD驱动

为了支持Userspace IO,DPDK可以选择如下三种类型的驱动:

  • uio_pci_generic
  • uio + igb_uio
  • vfio-pci

uio_pci_generic是内核原生的一种uio驱动,该驱动提供了uio功能,直接使用如下命令加载:

sudo modprobe uio_pci_generic

这个原生驱动是不支持VF设备创建的,因此DPDK也提供了另外一种uio驱动igb_uio,它是可以用于宿主机上来创建VF设备的。适用性比内核原生的uio_pci_generic更强一些,其中igb_uio.ko是由dpdk代码库编译出来的:

sudo modprobe uio
sudo insmod kmod/igb_uio.ko

从DPDK release 1.7开始,DPDK对VFIO进行了支持,因此VFIO Driver成了新的可选项:

sudo modprobe vfio-pci

当DPDK使用vfio来实现网络功能时可以直接加载该驱动。

特别注意:

对于使用VFIO驱动来使用DPDK的场景,必须保证:

  • 1.硬件上支持支持VT-x、VT-d,BIOS中需要打开相关特性
  • 2.对于物理机的内核中需要支持IOMMU特性(在启动参数添加 iommu=pt, intel_iommu=on)

物理机上使用DPDK

在物理机上使用DPDK,需要内核中加载DPDK PMD Driver,那么需要使用如下命令加载DPDK的驱动:

modprobe uio
insmod igb_uio
usertools/dpdk-devbind.py --bind=igb_uio bb:ss.f

当然这里我们也可以使用上面介绍过的其他类型的内核模块:uio_pci_generic或者vfio-pci

虚拟机中使用DPDK

对于支持SR-IOV的网卡来说,比如Intel的X710/XL710网卡,在虚拟化的环境中使用,网卡可以进行透传,本文以透传的方式来进行实践介绍,对于支持SR-IOV的网卡来说,它分为PF和VF模块,在宿主机中需要加载对应的PF Driver和VF Driver来驱动这两个子模块。

宿主机

在宿主机上可以直接使用Linux kernel官方的intel PF驱动,比如i40e,也可以使用DPDK专用的 PMD PF驱动。如果使用了DPDK PMD PF 驱动,那么这个宿主机网络的管理权就完全交给DPDK了。

方案一:i40e驱动

rmmod i40e (To remove the i40e module)
insmod i40e.ko max_vfs=2,2 (To enable two Virtual Functions per port)

通过重新加载intel提供的i40e驱动,并指定max_vfs参数来创建VF功能,对于该网卡的VF功能内核默认使用的驱动为i40evf,因此在使用dpdk之前,还需要在Host上将VF与i40evf驱动解绑,重新绑定到vfio-pci驱动上:

modprobe vfio-pci

宿主机中需要使用vfio_pci这个内核模块来对需要分配给客户机的设备进行隐藏, 从而让宿主机和未被分配该设备的客户机都无法使用该设备, 达到隔离和安全使用的目的。而在客户机不需要使用该设备后, 让宿主机使用该设备, 则需要将其恢复到使用原本的驱动。

这里也可以利用DPDK提供的脚本dpdk_bind_nic.py来设置:

usertools/dpdk-devbind.py --bind=vfio-pci  bb:ss.f

此时该VF已经由vfio驱动接管,对于上一章介绍的,如果在宿主机上使用VFIO DPDK,那么此时就已经满足了条件,但是我们此处是为了演示虚拟机中使用DPDK,所以此时不能启动DPDK去使用该VF,而需要在虚拟机中透传该设备来使用。

方案二:DPDK PMD PF驱动

需要内核启动参数中使能iommu=pt, intel_iommu=on,然后启动后加载DPDK驱动:

modprobe uio
insmod kmod/igb_uio.ko
usertools/dpdk-devbind.py --bind=igb_uio bb:ss.f
echo 2 > /sys/bus/pci/devices/0000\:bb\:ss.f/max_vfs (To enable two VFs on a specific PCI device)

虚拟机

对于虚拟机来说,透传过来的VFIO网卡对于虚拟机来说就相当于是一个常规的物理网卡,默认就会使用该物理网卡对应的驱动,比如i40e driver,那么如果要在虚拟机中使用DPDK,就需要把虚拟网卡重新绑定到igb_uio驱动,这样就可以在虚拟机中使用DPDK了。实际上操作还是与宿主机中一样:

modprobe uio
insmod kmod/igb_uio.ko
usertools/dpdk-devbind.py --bind=igb_uio bb:ss.f

3.6 IVSHMEM库

3.7 Timer库

初始化dpdk定时器库

1.void rte_timer_subsystem_init (   void        )

功能:初始化dpdk定时器库,初始化内部变量(链表和锁)

初始化timer定时器

void rte_timer_init (   struct rte_timer *  tim )
参数tim:带初始化的timer

启动定时器

int rte_timer_reset (   struct rte_timer *  tim,
                    uint64_t    ticks,
                    enum rte_timer_type     type,
                    unsigned    tim_lcore,
                    rte_timer_cb_t  fct,
                    void *  arg 
)   
功能:启动或者重置定时器,当定时器经过一定时间间隔超时后,会在tim_lcore指定的core上调用fct函数,函数参数是arg。

如果timer当前处于运行状态(Running),函数会调用失败,所以应检查函数返回值查看timer是否处于运行状态

如果timer在其他core上被设置,即(CONFIG 状态),函数也返回失败。

参数time:timer句柄

参数ticks:超时时间,参考rte_get_hpet_hz()的使用

参数type:取值PERIODICAL或SINGLE
    PERIODICAL:定时器触发,并执行后自动加载
    SINGLE:定时器仅仅触发一次。执行后进入STOPPED 状态

参数tim_lcore:指定这个定时器回调函数在哪个core上面运行;,如果tim_lcore 值为LCORE_ID_ANY,则以轮询方式在不同的核上执行回调函数。

参数fct:定时器回调函数

参数arg:回调函数的参数

定时器调度和管理

 rte_timer_manage();  
 管理定时器链表,执行定时器回调函数
 这个函数必须在EAL core的main_loop()函数中调用。它浏览挂起的timer,然后执行已经超时的定时器。

定时器的精度取决于这个函数的调用次序。

停止定时器

rte_timer_stop停止定时器

x timer例子代码分析:

/* timer0 callback */
static void
timer0_cb(__attribute__((unused)) struct rte_timer *tim,
      __attribute__((unused)) void *arg)
{
    static unsigned counter = 0;
    unsigned lcore_id = rte_lcore_id();

    printf("%s() on lcore %u\n", __func__, lcore_id);

    /* this timer is automatically reloaded until we decide to
     * stop it, when counter reaches 20. */
    if ((counter ++) == 20)
        rte_timer_stop(tim);
}

timer1定时器,我们传递了一个参数SINGLE,就是指让这个定时器在某个core上执行一次,然后停止;
当前启动调用的回调函数是timer1_cb()
在timer1_cb函数中再次执行一次rte_timer_reset就可以启动定时器,

/* timer1 callback */
static void
timer1_cb(__attribute__((unused)) struct rte_timer *tim,
      __attribute__((unused)) void *arg)
{
    unsigned lcore_id = rte_lcore_id();
    uint64_t hz;

    printf("%s() on lcore %u\n", __func__, lcore_id);

    /* reload it on another lcore */
    hz = rte_get_timer_hz();
    lcore_id = rte_get_next_lcore(lcore_id, 0, 1);
    rte_timer_reset(tim, hz/3, SINGLE, lcore_id, timer1_cb, NULL);
}

static __attribute__((noreturn)) int
lcore_mainloop(__attribute__((unused)) void *arg)
{
    uint64_t prev_tsc = 0, cur_tsc, diff_tsc;
    unsigned lcore_id;

    lcore_id = rte_lcore_id();
    printf("Starting mainloop on core %u\n", lcore_id);

    while (1) {
        /*
         * Call the timer handler on each core: as we don't
         * need a very precise timer, so only call
         * rte_timer_manage() every ~10ms (at 2Ghz). In a real
         * application, this will enhance performances as
         * reading the HPET timer is not efficient.
         */
        //函数中会每10ms执行一次调度,让rte_timer_manage()管理当前core上的定时器,rte_timer_manage函数必须调用,否则timer定时器不会运行
        cur_tsc = rte_rdtsc();
        diff_tsc = cur_tsc - prev_tsc;
        if (diff_tsc > TIMER_RESOLUTION_CYCLES) {
            rte_timer_manage();
            prev_tsc = cur_tsc;
        }
    }
}


int
main(int argc, char **argv)
{
    int ret;
    uint64_t hz;
    unsigned lcore_id;

    /* init EAL */
    ret = rte_eal_init(argc, argv);
    if (ret < 0)
        rte_panic("Cannot init EAL\n");

    /* init RTE timer library */
    rte_timer_subsystem_init();

    /* init timer structures */
    rte_timer_init(&timer0);
    rte_timer_init(&timer1);

    /* load timer0, every second, on master lcore, reloaded automatically */
    hz = rte_get_timer_hz();
    lcore_id = rte_lcore_id();
    rte_timer_reset(&timer0, hz, PERIODICAL, lcore_id, timer0_cb, NULL);

    /* load timer1, every second/3, on next lcore, reloaded manually */
    //初始化的另一个定时器timer1;这用hz/3  是指定时只有1秒的三分之一;SINGLE是指只执行一次,如想让它再运行就需要从新初始化它;
    lcore_id = rte_get_next_lcore(lcore_id, 0, 1);
    rte_timer_reset(&timer1, hz/3, SINGLE, lcore_id, timer1_cb, NULL);

    /* call lcore_mainloop() on every slave lcore */
    RTE_LCORE_FOREACH_SLAVE(lcore_id) {
        rte_eal_remote_launch(lcore_mainloop, NULL, lcore_id);
    }

    /* call it on master lcore too */
    (void) lcore_mainloop(NULL);

    return 0;
}

注意事项:

  • 1.不管是slave core还是master core,rte_timer_manage函数必须调用,否则timer定时器不会运行
  • 2.一个core上不能同时运行两个以上,PERIODICAL类型的定时器
  • 3.一个core能同时运行两个,SINGLE类型的单次定时器
  • 4.可以在一个core上,启动另一个core上的定时器

3.8 LPM库

DPDK LPM库组件实现了32位Key的最长前缀匹配(LPM)表搜索方法,该方法通常用于在IP转发应用程序中找到最佳路由。

LPM API概述

LPM组件实例的主要配置参数是要支持的最大数量的规则。LPM前缀由一对参数(32位Key,深度)表示,深度范围为1到32。LPM规则由LPM前缀和与前缀相关联的一些用户数据表示。该前缀作为LPM规则的唯一标识符。在该实现中,用户数据为1字节长,被称为下一跳,与其在路由表条目中存储下一跳的ID的主要用途相关。

LPM组件导出的主要方法有:

  • 添加LPM规则:LPM规则作为输入参数。如果表中没有存在相同前缀的规则,则将新规则添加到LPM表中。如果表中已经存在具有相同前缀的规则,则会更新规则的下一跳。当没有可用的规则空间时,返回错误。
  • 删除LPM规则:LPM规则的前缀作为输入参数。如果具有指定前缀的规则存在于LPM表中,则会被删除。
  • LPM规则查找:32位Key作为输入参数。该算法用于选择给定Key的最佳匹配的LPM规则,并返回该规则的下一跳。在LPM表中具有多个相同32位Key的规则的情况下,算法将最高深度的规则选为最佳匹配规则(最长前缀匹配),这意味着该规则Key和输入的Key之间具有最高有效位的匹配。

实现细节

目前的实现使用DIR-24-8算法的变体,可以改善内存使用量,以提高LPM查找速度。该算法允许以典型的单个存储器读访问来执行查找操作。在统计上看,即便是不常出现的情况,当即最佳匹配规则的深度大于24时,查找操作也仅需要两次内存读取访问。因此,特定存储器位置是否存在于处理器高速缓存中将很大程度上影响LPM查找操作的性能。

主要数据结构使用以下元素构建:

  • 一个2^24个条目的表。
  • 多个表(RTE_LPM_TBL8_NUM_GROUPS),每个表有2 ^ 8个条目。

第一个表,称为tbl24,使用要查找的IP地址的前24位进行索引;而第二个表,称为tbl8使用IP地址的最后8位进行索引。这意味着根据输入数据包的IP地址与存储在tbl24中的规则进行匹配的结果,我们可能需要在第二级继续查找过程。

由于tbl24的每个条目都可以指向tbl8,理想情况下,我们将具有2 ^ 24 tbl8,这与具有2 ^ 32个条目的单个表占用空间相同。因为资源限制,这显然是不可行的。相反,这种组织方法就是利用了超过24位的规则是非常罕见的这一特定。通过将这个过程分为两个不同的表/级别并限制tbl8的数量,我们可以大大降低内存消耗,同时保持非常好的查找速度(大部分时间仅一个内存访问)。

tbl24中的条目包含以下字段:

  • 下一跳,或者下一级查找表tbl8的索引值。
  • 有效标志。
  • 外部条目标志。
  • 规则深度。

第一个字段可以包含指示查找过程应该继续的tbl8的数字,或者如果已经找到最长的前缀匹配,则可以包含下一跳本身。两个标志字段用于确定条目是否有效,以及搜索过程是否分别完成。规则的深度或长度是存储在特定条目中的规则的位数。

tbl8中的条目包含以下字段:

  • 下一跳。
  • 有效标志。
  • 有效组。
  • 深度。

下一跳和深度包含与tbl24中相同的信息。两个标志字段显示条目和表分别是否有效。其他主要数据结构是包含有关规则(IP和下一跳)的主要信息的表。这是一个更高级别的表,用于不同的东西:

  • 在添加或删除之前,检查规则是否已经存在,而无需实际执行查找。
  • 删除时,检查是否存在包含要删除的规则。这很重要,因为主数据结构必须相应更新。

添加

添加规则时,存在不同的可能性。如果规则的深度恰好是24位,那么:

  • 使用规则(IP地址)作为tbl24的索引。
  • 如果条目无效(即它不包含规则),则将其下一跳设置为其值,将有效标志设置为1(表示此条目正在使用中),并将外部条目标志设置为0(表示查找 此过程结束,因为这是匹配的最长的前缀)。

如果规则的深度正好是32位,那么:

  • 使用规则的前24位作为tbl24的索引。
  • 如果条目无效(即它不包含规则),则查找一个空闲的tbl8,将该值的tbl8的索引设置为该值,将有效标志设置为1(表示此条目正在使用中),并将外部 条目标志为1(意味着查找过程必须继续,因为规则尚未被完全探测)。

如果规则的深度是任何其他值,则必须执行前缀扩展。这意味着规则被复制到所有下一级条目(只要它们不被使用),这也将导致匹配。
作为一个简单的例子,我们假设深度是20位。这意味着有可能导致匹配的IP地址的前24位的2 ^(24 - 20)= 16种不同的组合。因此,在这种情况下,我们将完全相同的条目复制到由这些组合索引的每个位置。

通过这样做,我们确保在查找过程中,如果存在与IP地址匹配的规则,则可以在一个或两个内存访问中找到,具体取决于是否需要移动到下一个表。前缀扩展是该算法的关键之一,因为它通过添加冗余来显着提高速度。

查询

查找过程要简单得多,速度更快。在这种情况下:

  • 使用IP地址的前24位作为tbl24的索引。如果该条目未被使用,那么这意味着我们没有匹配此IP的规则。如果它有效并且外部条目标志设置为0,则返回下一跳。
    如果它是有效的并且外部条目标志被设置为1,那么我们使用tbl8索引来找出要检查的tbl8,并且将该IP地址的最后8位作为该表的索引。类似地,如果条目未被使用,那么我们没有与该IP地址匹配的规则。如果它有效,则返回下一跳。

规则数目的限制

规则数量受到诸多不同因素的限制。第一个是规则的最大数量,这是通过API传递的参数。一旦达到这个数字,就不可能再添加任何更多的规则到路由表,除非有一个或多个删除。

第二个因素是算法的内在限制。如前所述,为了避免高内存消耗,tbl8的数量在编译时间有限(此值默认为256)。如果我们耗尽tbl8,我们将无法再添加任何规则。特定路由表中需要多少路由表是很难提前确定的。

只要我们有一个深度大于24的新规则,并且该规则的前24位与先前添加的规则的前24位不同,就会消耗tbl8。如果相同,那么新规则将与前一个规则共享相同的tbl8,因为两个规则之间的唯一区别是在最后一个字节内。

默认值为256情况下,我们最多可以有256个规则,长度超过24位,且前三个字节都不同。由于长度超过24位的路由不太可能,因此在大多数设置中不应该是一个问题。即便如此,tbl8的数量也可以通过设置更改。

用例:IPv4转发

LPM算法用于实现IPv4转发的路由器所使用的无类别域间路由(CIDR)策略。

3.9 Hash库

Hash表和算法是高性能流量处理中常用技术,在DPDK中也提供了相应的高性能实现用于支持流表、转发规则表等设施的开发。DPDK18的Hash库实现方式与其早期版本(1.6)时期的实现有了根本性的差异,本文介绍的实现基于18.05版本代码。

Hash算法原理——Cuckoo Hash

DPDK中的Hash库采用的hash算法和数据结构基于CuckooHash哈希表中的节点采用一个连续数组保存,其原理是每个节点key可以对应两个哈希散列位置(主、从)。查找和删除key时需要查找两个位置来确定key是否存在;插入时将key插入到主位,如果主位已经被key2占用,则将占用主位的key2移动到其另一位置存储,如果key2的另一位置也已经被key3占用,则再将key3移动到其另一位置存储......直到最后一次移动不再冲突为止,如果冲突次数达到一定阈值,则判定哈希表已满。

具体实现方式

在DPDK的实现中,每个节点的主从存储位置被实现为两个桶数组。哈希结构由两部分构成:

一部分是一系列的哈希桶,每个桶中可以保存8个节点的主次signature和key_index;

另一部分则是保存节点key和内容的数组,通过key_index下标进行索引,其中节点内容部分共8字节,可以直接保存节点信息,也可以保存一个指针来指向节点的数据结构。

在查找一个key时,流程如下:

首先通过散列算法获得两个4字节的signature和对应的两个桶。主signature散列算法可以是CRC、JHash或自定义算法;次signature是对主signature经过一个变换产生的。

在桶中查找signature匹配的节点信息

根据key_index索引到完整的key

比对key是否一致,如果一致则查找成功

可见,一次成功的查找至少要访问两个数据结构各一次,分别比对signature和key。一次失败的查找则要访问主从桶各一次,比对signature。

代码分析——数据结构

/** A hash table structure. */
struct rte_hash {
	char name[RTE_HASH_NAMESIZE];   /**< Name of the hash. */
	uint32_t entries;               /**< Total table entries. */
	uint32_t num_buckets;           /**< Number of buckets in table. */
 
	struct rte_ring *free_slots;
	/**< Ring that stores all indexes of the free slots in the key table */
	uint8_t hw_trans_mem_support;
	/**< Hardware transactional memory support */
	struct lcore_cache *local_free_slots;
	/**< Local cache per lcore, storing some indexes of the free slots */
	enum add_key_case add_key; /**< Multi-writer hash add behavior */
 
	rte_spinlock_t *multiwriter_lock; /**< Multi-writer spinlock for w/o TM */
 
	/* Fields used in lookup */
 
	uint32_t key_len __rte_cache_aligned;
	/**< Length of hash key. */
	rte_hash_function hash_func;    /**< Function used to calculate hash. */
	uint32_t hash_func_init_val;    /**< Init value used by hash_func. */
	rte_hash_cmp_eq_t rte_hash_custom_cmp_eq;
	/**< Custom function used to compare keys. */
	enum cmp_jump_table_case cmp_jump_table_idx;
	/**< Indicates which compare function to use. */
	enum rte_hash_sig_compare_function sig_cmp_fn;
	/**< Indicates which signature compare function to use. */
	uint32_t bucket_bitmask;
	/**< Bitmask for getting bucket index from hash signature. */
	uint32_t key_entry_size;         /**< Size of each key entry. */
 
	void *key_store;                /**< Table storing all keys and data */
	struct rte_hash_bucket *buckets;
	/**< Table with buckets storing all the	hash values and key indexes
	 * to the key table.
	 */
} __rte_cache_aligned;

一个哈希表的数据结构如上,其中包括节点最大数量、哈希桶数量、哈希key的长度、key的散列值算法等。该结构通过rte_hash_create创建和初始化,其中最重要的数据结构是:

  • struct rte_ring *free_slots,表示哈希节点数组中空闲的节点;
  • void *key_store,保存实际的key和节点信息的数组
  • struct rte_hash_bucket *buckets,哈希桶数组,保存key的signature和在key_store中的下标位置
  • 每个成员的意义和功能可以结合上面的注释以及rte_hash_create函数中的初始化逻辑来深入了解。

关键逻辑

哈希表中的节点查找和删除逻辑都比较简单,这里不再分析。节点插入的操作相对复杂一点,关键逻辑在于发现节点对应的主次桶都已经填满时的已有节点移动逻辑。这个逻辑在make_space_bucket函数中实现。

make_space_bucket函数主要分为两步:

1. 桶中是否有节点对应的次桶中还有空间?如果有的话就将该节点移动它的次桶中。代码如下:

	/*
	 * Push existing item (search for bucket with space in
	 * alternative locations) to its alternative location
	 */
	for (i = 0; i < rte_hash_bucket_entries i search for space in alternative locations next_bucket_idx='bkt-'>sig_alt[i] & h->bucket_bitmask;
		next_bkt[i] = &h->buckets[next_bucket_idx];
		for (j = 0; j < rte_hash_bucket_entries j if next_bkti->key_idx[j] == EMPTY_SLOT)
				break;
		}
 
		if (j != RTE_HASH_BUCKET_ENTRIES)
			break;
	}
 
	/* Alternative location has spare room (end of recursive function) */
	if (i != RTE_HASH_BUCKET_ENTRIES) {
		next_bkt[i]->sig_alt[j] = bkt->sig_current[i];
		next_bkt[i]->sig_current[j] = bkt->sig_alt[i];
		next_bkt[i]->key_idx[j] = bkt->key_idx[i];
		return i;
	}

2. 如果没有节点的次桶有空间了,则只能强行移动一个节点到次桶,再在次桶中再移动一个节点。这里会递归调用make_space_bucket函数来移动次桶中的节点。如果桶中的每个节点都已经被强行移动过,或者此次操作移动的节点数达到上限,则认为哈希表已满,操作失败。代码如下:

	/* Pick entry that has not been pushed yet */
	for (i = 0; i < rte_hash_bucket_entries i if bkt->flag[i] == 0)
			break;
 
	/* All entries have been pushed, so entry cannot be added */
	if (i == RTE_HASH_BUCKET_ENTRIES || ++(*nr_pushes) > RTE_HASH_MAX_PUSHES)
		return -ENOSPC;
 
	/* Set flag to indicate that this entry is going to be pushed */
	bkt->flag[i] = 1;
 
	/* Need room in alternative bucket to insert the pushed entry */
	ret = make_space_bucket(h, next_bkt[i], nr_pushes);
	/*
	 * After recursive function.
	 * Clear flags and insert the pushed entry
	 * in its alternative location if successful,
	 * or return error
	 */
	bkt->flag[i] = 0;
	if (ret >= 0) {
		next_bkt[i]->sig_alt[ret] = bkt->sig_current[i];
		next_bkt[i]->sig_current[ret] = bkt->sig_alt[i];
		next_bkt[i]->key_idx[ret] = bkt->key_idx[i];
		return i;
	} else
		return ret;

综合上述分析,可以发现,DPDK实现的hash库的最大优点,在于其一次查找的时间是有上限的,一般情况下最多产生3次随机内存访问:主桶、从桶、key节点。同时通过CuckooHash算法,很大程度上避免了普通的基于数组和桶的哈希表在同一个桶冲突严重的情况下节点会插入失败的问题。官方实验结果显示只有当哈希表节点使用率达到95%时才会出现插入失败,在50%以下时基本可以保证绝大部分节点都保存在主桶中。此外,通过signature和key两阶段比较的方法减少了key比较的次数,在key比较长时能提升一定的性能。

但这个方法的逻辑相当复杂,在表完全为空时,一次失败的查找也要访问两次内存,还要基于ring来获取释放数组索引资源,其中实现的本核心缓存逻辑感觉也不太合理。综合来看其性能未必高于普通的基于大规模哈希数组和链表的哈希表结构。采用时需要经过具体测试评估。

3.10多进程支持

DPDK库里是支持多进程和多线程,本文主要总结多进程的相关的操作。

DPDK多进程使用的关键启动参数:

  • --proc-type:指定一个dpdk进程是主进程还是副进程(参数值就用上面的primary或是secondary,或者是auto)
  • --file-prefix:允许非合作的进程拥有不同的内存区域。主副进程默认文件路径/var/run/.rte_config,同一个处理组的主副进程使用相同的参数,
    如果想运行多个主进程,这个参数就必须指定!
  • --socket-mem:设置从hugepages分配多大的存储空间。默认会用掉所有的hugepages,所以建议指定这个参数,不管是单cpu还是在NUMA中。
    eg:单socket
    ,--socket-mem=512;在numa中,--socket-mem=512,512;多个socket间用‘,’号隔开;
  • -w : 后面跟网卡的PCI号,指定使用网卡。设置了这参数,DPDK只会使用这个参数对应的网卡,不会初始化其他的。

在Multi-process Sample Application中介绍了4种使用场景:

Basic Multi-process Example,DPDK进程间通过ring,内存池,队列,进行信息交互。
Symmetric Multi-process Example,主进程初始化所有资源,副进程直接获取资源进行数据包处理,副进程除了不初始化资源,数据包处理和主进程是一样的。每个进程获取每个端口的一个RX, TX队列。
Client-Server Multi-process Example,主进程初始化资源和接收所有收到的数据包并轮询分发给副进程处理。
Master-slave Multi-process Example,这个模式主要是介绍各进程之间存在依赖关系,主进程和副进程,副进程和副进程

[root@localhost simple_mp]#   ./build/simple_mp -l 0-1   --proc-type=primary
EAL: Detected 128 lcore(s)
EAL: Detected 4 NUMA nodes
EAL: Multi-process socket /var/run/dpdk/rte/mp_socket
EAL: Selected IOVA mode 'PA'
EAL: Probing VFIO support...
EAL: VFIO support initialized
EAL: PCI device 0000:05:00.0 on NUMA socket 0
EAL:   probe driver: 19e5:200 net_hinic
net_hinic: Initializing pf hinic-0000:05:00.0 in primary process
net_hinic: Device 0000:05:00.0 hwif attribute:
net_hinic: func_idx:0, p2p_idx:0, pciintf_idx:0, vf_in_pf:0, ppf_idx:0, global_vf_id:15, func_type:2
net_hinic: num_aeqs:4, num_ceqs:4, num_irqs:32, dma_attr:2
net_hinic: API CMD poll status timeout
net_hinic: chain type: 0x7
net_hinic: chain hw cpld error: 0x1
net_hinic: chain hw check error: 0x0
net_hinic: chain hw current fsm: 0x0
net_hinic: chain hw current ci: 0x0
net_hinic: Chain hw current pi: 0x1
net_hinic: Send msg to mgmt failed
net_hinic: Failed to get board info, err: -110, status: 0x0, out size: 0x0
net_hinic: Check card workmode failed, dev_name: 0000:05:00.0
net_hinic: Create nic device failed, dev_name: 0000:05:00.0
net_hinic: Initialize 0000:05:00.0 in primary failed
EAL: Requested device 0000:05:00.0 cannot be used
EAL: PCI device 0000:06:00.0 on NUMA socket 0
EAL:   probe driver: 19e5:200 net_hinic
EAL: PCI device 0000:7d:00.0 on NUMA socket 0
EAL:   probe driver: 19e5:a222 net_hns3
EAL: PCI device 0000:7d:00.1 on NUMA socket 0
EAL:   probe driver: 19e5:a221 net_hns3
EAL: PCI device 0000:7d:00.2 on NUMA socket 0
EAL:   probe driver: 19e5:a222 net_hns3
EAL: PCI device 0000:7d:00.3 on NUMA socket 0
EAL:   probe driver: 19e5:a221 net_hns3
APP: Finished Process Init.
Starting core 1

simple_mp > primary send hello1
Command not found

simple_mp > send hello by primary
Bad arguments

simple_mp > 
simple_mp > send hello_by_primary

simple_mp > core 1: Received 'hello_by_sencondary'


simple_mp >

./build/simple_mp -l 0-1 --proc-type=secondary

[root@localhost lib]# ps -elf | grep simple_mp
0 S root       8457 124128 13  80   0 - 8389378 wait_w 03:24 pts/1  00:00:16 ./build/simple_mp -l 0-1 --proc-type=primary
0 S root       8471   7504  6  80   0 - 8389498 wait_w 03:24 pts/2  00:00:06 ./build/simple_mp -l 0-1 --proc-type=secondary
0 S root       8564  57486  0  80   0 -  1729 pipe_w 03:26 pts/0    00:00:00 grep --color=auto simple_mp
[root@localhost lib]# ps -mo pid,tid,%cpu,psr -p 8471
   PID    TID %CPU PSR
  8471      -  6.0   -
     -   8471  0.0   0
     -   8472  0.0  33
     -   8473  0.0  10
     -   8474  6.0   1
[root@localhost lib]# ps -T  -p 8471
   PID   SPID TTY          TIME CMD
  8471   8471 pts/2    00:00:00 simple_mp
  8471   8472 pts/2    00:00:00 eal-intr-thread
  8471   8473 pts/2    00:00:00 rte_mp_handle
  8471   8474 pts/2    00:00:10 lcore-slave-1
[root@localhost lib]# ps -mo pid,tid,%cpu,psr -p 8457
   PID    TID %CPU PSR
  8457      - 10.7   -
     -   8457  5.1   0
     -   8458  0.0   9
     -   8459  0.0   9
     -   8460  5.6   1
[root@localhost lib]# ps -T  -p 8457
   PID   SPID TTY          TIME CMD
  8457   8457 pts/1    00:00:10 simple_mp
  8457   8458 pts/1    00:00:00 eal-intr-thread
  8457   8459 pts/1    00:00:00 rte_mp_handle
  8457   8460 pts/1    00:00:11 lcore-slave-1
[root@localhost lib]#

两个mp_socket,
mp_socket_8471_167e85391023是seconary进程的:

[root@localhost dpdk-stable-17.11.2]# killall simple_mp
[root@localhost dpdk-stable-17.11.2]# ps -elf | grep simple_mp
0 S root       9154  36716  0  80   0 -  1729 pipe_w 03:38 pts/3    00:00:00 grep --color=auto simple_mp
[root@localhost dpdk-stable-17.11.2]#
kill之后还存在哦[root@localhost lib]# ls /var/run/dpdk/rte/ -al
total 15680
drwx------. 2 root root   1100 Aug 28 03:24 .
drwx------. 3 root root     60 Aug 26 03:45 ..
-rw-------. 1 root root  18816 Aug 28 03:24 config
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-0-0
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-0-0_8471
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-0-1
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-0-1_8471
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-0-2
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-0-2_8471
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-0-3
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-0-3_8471
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-1-0
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-1-0_8471
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-1-1
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-1-1_8471
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-1-2
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-1-2_8471
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-1-3
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-1-3_8471
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-2-0
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-2-0_8471
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-2-1
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-2-1_8471
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-2-2
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-2-2_8471
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-2-3
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-2-3_8471
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-3-0
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-3-0_8471
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-3-1
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-3-1_8471
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-3-2
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-3-2_8471
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-3-3
-rw-------. 1 root root 458752 Aug 28 03:24 fbarray_memseg-2048k-3-3_8471
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-0-0
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-0-0_8471
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-0-1
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-0-1_8471
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-1-0
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-1-0_8471
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-1-1
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-1-1_8471
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-2-0
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-2-0_8471
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-2-1
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-2-1_8471
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-3-0
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-3-0_8471
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-3-1
-rw-------. 1 root root  65536 Aug 28 03:24 fbarray_memseg-524288k-3-1_8471
-rw-------. 1 root root 196608 Aug 28 03:24 fbarray_memzone
-rw-------. 1 root root  16576 Aug 28 03:24 hugepage_info
srwxr-xr-x. 1 root root      0 Aug 28 03:24 mp_socket
srwxr-xr-x. 1 root root      0 Aug 28 03:24 mp_socket_8471_167e85391023
[root@localhost lib]#


[root@localhost simple_mp]#   ./build/simple_mp -l 126-127   --proc-type=primary
EAL: Detected 128 lcore(s)
EAL: Detected 4 NUMA nodes
EAL: Multi-process socket /var/run/dpdk/rte/mp_socket
EAL: Selected IOVA mode 'PA'
EAL: Probing VFIO support...
EAL: VFIO support initialized
EAL: PCI device 0000:05:00.0 on NUMA socket 0
EAL:   probe driver: 19e5:200 net_hinic
net_hinic: Initializing pf hinic-0000:05:00.0 in primary process
net_hinic: Device 0000:05:00.0 hwif attribute:
net_hinic: func_idx:0, p2p_idx:0, pciintf_idx:0, vf_in_pf:0, ppf_idx:0, global_vf_id:15, func_type:2
net_hinic: num_aeqs:4, num_ceqs:4, num_irqs:32, dma_attr:2
net_hinic: API CMD poll status timeout
net_hinic: chain type: 0x7
net_hinic: chain hw cpld error: 0x1
net_hinic: chain hw check error: 0x0
net_hinic: chain hw current fsm: 0x0
net_hinic: chain hw current ci: 0x0
net_hinic: Chain hw current pi: 0x1
net_hinic: Send msg to mgmt failed
net_hinic: Failed to get board info, err: -110, status: 0x0, out size: 0x0
net_hinic: Check card workmode failed, dev_name: 0000:05:00.0
net_hinic: Create nic device failed, dev_name: 0000:05:00.0
net_hinic: Initialize 0000:05:00.0 in primary failed
EAL: Requested device 0000:05:00.0 cannot be used
EAL: PCI device 0000:06:00.0 on NUMA socket 0
EAL:   probe driver: 19e5:200 net_hinic
EAL: PCI device 0000:7d:00.0 on NUMA socket 0
EAL:   probe driver: 19e5:a222 net_hns3
EAL: PCI device 0000:7d:00.1 on NUMA socket 0
EAL:   probe driver: 19e5:a221 net_hns3
EAL: PCI device 0000:7d:00.2 on NUMA socket 0
EAL:   probe driver: 19e5:a222 net_hns3
EAL: PCI device 0000:7d:00.3 on NUMA socket 0
EAL:   probe driver: 19e5:a221 net_hns3
APP: Finished Process Init.
Starting core 127

simple_mp >


[root@localhost simple_mp]#   ./build/simple_mp -l 120-121  --proc-type=secondary
EAL: Detected 128 lcore(s)
EAL: Detected 4 NUMA nodes
EAL: Multi-process socket /var/run/dpdk/rte/mp_socket_9543_169b3a72effc
EAL: Selected IOVA mode 'PA'
EAL: Probing VFIO support...
EAL: VFIO support initialized
EAL: PCI device 0000:05:00.0 on NUMA socket 0
EAL:   probe driver: 19e5:200 net_hinic
EAL: Cannot find resource for device
EAL: PCI device 0000:06:00.0 on NUMA socket 0
EAL:   probe driver: 19e5:200 net_hinic
EAL: PCI device 0000:7d:00.0 on NUMA socket 0
EAL:   probe driver: 19e5:a222 net_hns3
EAL: PCI device 0000:7d:00.1 on NUMA socket 0
EAL:   probe driver: 19e5:a221 net_hns3
EAL: PCI device 0000:7d:00.2 on NUMA socket 0
EAL:   probe driver: 19e5:a222 net_hns3
EAL: PCI device 0000:7d:00.3 on NUMA socket 0
EAL:   probe driver: 19e5:a221 net_hns3
APP: Finished Process Init.
Starting core 121

simple_mp >


[root@localhost lib]# ls /var/run/dpdk/rte/ -al
total 15680
drwx------. 2 root root   1100 Aug 28 03:45 .
drwx------. 3 root root     60 Aug 26 03:45 ..
-rw-------. 1 root root  18816 Aug 28 03:45 config
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-0
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-0_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-1
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-1_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-2
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-2_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-3
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-3_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-0
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-0_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-1
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-1_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-2
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-2_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-3
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-3_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-0
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-0_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-1
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-1_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-2
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-2_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-3
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-3_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-0
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-0_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-1
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-1_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-2
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-2_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-3
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-3_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-0-0
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-0-0_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-0-1
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-0-1_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-1-0
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-1-0_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-1-1
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-1-1_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-2-0
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-2-0_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-2-1
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-2-1_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-3-0
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-3-0_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-3-1
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-3-1_9543
-rw-------. 1 root root 196608 Aug 28 03:45 fbarray_memzone
-rw-------. 1 root root  16576 Aug 28 03:45 hugepage_info
srwxr-xr-x. 1 root root      0 Aug 28 03:45 mp_socket
srwxr-xr-x. 1 root root      0 Aug 28 03:45 mp_socket_9543_169b3a72effc
[root@localhost lib]#


[root@localhost simple_mp]#   ./build/simple_mp -l 126-127   --proc-type=primary
EAL: Detected 128 lcore(s)
EAL: Detected 4 NUMA nodes
EAL: Multi-process socket /var/run/dpdk/rte/mp_socket
EAL: Selected IOVA mode 'PA'
EAL: Probing VFIO support...
EAL: VFIO support initialized
EAL: PCI device 0000:05:00.0 on NUMA socket 0
EAL:   probe driver: 19e5:200 net_hinic


simple_mp > Terminated
[root@localhost simple_mp]#   ./build/simple_mp -l 120-121  --proc-type=secondary
EAL: Detected 128 lcore(s)


[root@localhost lib]# ps -mo pid,tid,%cpu,psr -p 9530
   PID    TID %CPU PSR
  9530      -  6.9   -
     -   9530  0.2 126
     -   9531  0.0   9
     -   9532  0.0  10
     -   9533  6.6 127
[root@localhost lib]# ps -T  -p 9530
   PID   SPID TTY          TIME CMD
  9530   9530 pts/1    00:00:10 simple_mp
  9530   9531 pts/1    00:00:00 eal-intr-thread
  9530   9532 pts/1    00:00:00 rte_mp_handle
  9530   9533 pts/1    00:04:11 lcore-slave-127
[root@localhost lib]# ps -mo pid,tid,%cpu,psr -p 9543
   PID    TID %CPU PSR
  9543      -  6.6   -
     -   9543  0.0 120
     -   9544  0.0  10
     -   9545  0.0  11
     -   9546  6.6 121
[root@localhost lib]# ps -T  -p 9543
   PID   SPID TTY          TIME CMD
  9543   9543 pts/2    00:00:00 simple_mp
  9543   9544 pts/2    00:00:00 eal-intr-thread
  9543   9545 pts/2    00:00:00 rte_mp_handle
  9543   9546 pts/2    00:04:08 lcore-slave-121
[root@localhost lib]#


[root@localhost dpdk-stable-17.11.2]# ls /var/run/dpdk/rte/ -al
total 15680
drwx------. 2 root root   1100 Aug 28 03:45 .
drwx------. 3 root root     60 Aug 26 03:45 ..
-rw-------. 1 root root  18816 Aug 28 03:45 config
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-0
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-0_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-1
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-1_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-2
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-2_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-3
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-0-3_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-0
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-0_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-1
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-1_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-2
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-2_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-3
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-1-3_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-0
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-0_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-1
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-1_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-2
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-2_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-3
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-2-3_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-0
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-0_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-1
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-1_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-2
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-2_9543
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-3
-rw-------. 1 root root 458752 Aug 28 03:45 fbarray_memseg-2048k-3-3_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-0-0
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-0-0_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-0-1
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-0-1_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-1-0
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-1-0_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-1-1
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-1-1_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-2-0
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-2-0_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-2-1
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-2-1_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-3-0
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-3-0_9543
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-3-1
-rw-------. 1 root root  65536 Aug 28 03:45 fbarray_memseg-524288k-3-1_9543
-rw-------. 1 root root 196608 Aug 28 03:45 fbarray_memzone
-rw-------. 1 root root  16576 Aug 28 03:45 hugepage_info
srwxr-xr-x. 1 root root      0 Aug 28 03:45 mp_socket
srwxr-xr-x. 1 root root      0 Aug 28 03:45 mp_socket_9543_169b3a72effc
[root@localhost dpdk-stable-17.11.2]# ls

退出

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表