l2fwd应用程序为RX_PORT上接收的每个数据包执行二层转发。 目标端口是启用的端口掩码的相邻端口,即,如果启用前四个端口(端口掩码0xf,每个端口用一个比特位表示,启动4个就是4个比特位置1),端口1和2相互转发,端口3和4相互转发。 此外,如果启用了MAC地址更新,则MAC地址按照如下方式更新:
数据包的源mac会变成发送端口的mac地址
数据包的目的mac会变成02:00:00:00:00:发送端口的ID
也就是说在不开启mac地址更新的情况下,仅仅对数据包进行紧邻端口的转发不对数据包的mac地址进行修改,如果开启的话,会对需要转发的数据包的mac地址按照如上的规则进行更新。
参照官网给出的图,转发的模式如下
也就是说端口0和端口1是相邻的两个端口,2和3是相邻的两个端口,0口进的数据包会从1口发出,1口进的数据包会从0口发出,同理2和3两个端口的转发逻辑也是一样的。
现在对源码做一下跟读,从main函数开始
1. 命令行参数解析
命令行函数由两部分构成:EAL参数和应用程序参数分别由rte_eal_init函数和l2fwd_parse_args函数来解析
//1. 解析命令行参数
/* init EAL */
ret = rte_eal_init(argc, argv); // 解析命令行EAL参数 环境初始化
if (ret < 0)
rte_exit(EXIT_FAILURE, "Invalid EAL arguments\n");
argc -= ret;
argv += ret;// 跳过EAL参数 参数的数量减小 同时指针后移
force_quit = false; // 终止main loop中while循环的变量
signal(SIGINT, signal_handler); // 利用signal函数来处理接收到的信号
signal(SIGTERM, signal_handler);
/* parse application arguments (after the EAL ones) */
ret = l2fwd_parse_args(argc, argv); // 解析应用参数
if (ret < 0)
rte_exit(EXIT_FAILURE, "Invalid L2FWD arguments\n");
rte_eal_init不仅有解析命令行参数的作用还会进行复杂的环境初始化,l2fwd_parse_args函数则主要是用来解析应用程序的参数,这个函数的主体是一个switch,这个switch有三个case,也就是对命令行里输入的-p,-q,-T的解析
switch (opt) {
/* portmask */
case 'p': // 解析-p参数 要配置的端口的掩码
l2fwd_enabled_port_mask = l2fwd_parse_portmask(optarg);
if (l2fwd_enabled_port_mask == 0) {
printf("invalid portmask\n");
l2fwd_usage(prgname);
return -1;
}
break;
/* nqueue */
case 'q': // 每一个逻辑核处理多少个端口
l2fwd_rx_queue_per_lcore = l2fwd_parse_nqueue(optarg);
if (l2fwd_rx_queue_per_lcore == 0) {
printf("invalid queue number\n");
l2fwd_usage(prgname);
return -1;
}
break;
/* timer period */
case 'T': // 设置打印端口数据的周期
timer_secs = l2fwd_parse_timer_period(optarg);
if (timer_secs < 0) {
printf("invalid timer period\n");
l2fwd_usage(prgname);
return -1;
}
timer_period = timer_secs;
break;
/* long options */
case 0:
break;
default:
l2fwd_usage(prgname);
return -1;
}
可以看的出来-p会对l2fwd_enabled_port_mask变量赋值,这个值表示的就是端口掩码,也就是一个比特位表示一个端口,如果这个端口开启,就把对应的比特位置1,-q会对l2fwd_rx_queue_per_lcore变量赋值,表示的是每个逻辑核处理多少个端口。-T会对timer_period变量赋值,也就是打印端口数据的周期
2. 创建mbuf池
命令行参数解析完以后,会创建mbuf池,mbuf pool里边包含很多mbuf对象,mbuf对象用来存储网络数据包
//2. 创建mnuf池
/* create the mbuf pool */
l2fwd_pktmbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", NB_MBUF, // 从已分配的大页中创建内存池
MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE,
rte_socket_id());
if (l2fwd_pktmbuf_pool == NULL)
rte_exit(EXIT_FAILURE, "Cannot init mbuf pool\n");
nb_ports = rte_eth_dev_count(); // 获取全部开启的端口的数量
if (nb_ports == 0)
rte_exit(EXIT_FAILURE, "No Ethernet ports - bye\n");
其中l2fwd_pktmbuf_pool是rte_mempool结构体,他是用于处理对象池的通用结构,这里创建的mbuf的数量是NB_MBUF,各mbuf的大小是RTE_MBUF_DEFAULT_BUF_SIZE。
3. 设置每个端口的目的端口
这里用数组l2fwd_dst_ports来表示每一个端口的目的端口,即将端口号作为数组的索引,数组的值就是这个端口的目的端口
//3. 设置每个端口转发的目的端口
nb_ports = rte_eth_dev_count(); // 获取全部开启的端口的数量
if (nb_ports == 0)
rte_exit(EXIT_FAILURE, "No Ethernet ports - bye\n");
for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++)
l2fwd_dst_ports[portid] = 0; // 让每一个端口的目的端口初始化为0
last_port = 0;
for (portid = 0; portid < nb_ports; portid++) {
/* skip ports that are not enabled */
if ((l2fwd_enabled_port_mask & (1 << portid)) == 0) // 跳过没有开启的端口 端口掩码是命令行参数配置的 如果要使用这个端口的话 就把对应的比特位置1
continue;
if (nb_ports_in_mask % 2) {//nb_ports_in_mask 初始值为0
l2fwd_dst_ports[portid] = last_port;
l2fwd_dst_ports[last_port] = portid;// 两两一组 互相为目的端口
}
else
last_port = portid;
nb_ports_in_mask++;
//获取端口的名字、发送队列、接收队列等信息,主要就是填充每个端口的dev_info结构体
rte_eth_dev_info_get(portid, &dev_info);
}
if (nb_ports_in_mask % 2) { // 如果最后还剩下一个端口 也就是端口数量是奇数
printf("Notice: odd number of ports in portmask.\n");
l2fwd_dst_ports[last_port] = last_port; // 最后一个端口的目的端口就是自己
}
l2fwd_enabled_port_mask就是命令行参数里边接收到的端口掩码,可以看到利用nb_ports_in_mask % 2作为条件判断来实现两两端口为一组互为目的端口的逻辑,最后如果端口数为奇数,最后一个端口的目的端口就是它自己
4. 为每一个端口分配逻辑核
该应用用一个逻辑核来轮询一个或者多个端口,这个是由命令行参数里的-q参数来决定的,例如-q 4表示一个lcore轮询4个端口,如果有16个端口,并且端口掩码是-p ffff(16位全部置1),那么就需要4个lcore来轮询。
// 4. 为每一个端口分配逻辑核
rx_lcore_id = 0;
qconf = NULL;
/* Initialize the port/queue configuration of each logical core */
for (portid = 0; portid < nb_ports; portid++) {
/* skip ports that are not enabled */
if ((l2fwd_enabled_port_mask & (1 << portid)) == 0) // 根据比特位跳过没有开启的端口
continue;
/* get the lcore_id for this port */
while (rte_lcore_is_enabled(rx_lcore_id) == 0 || // 检测这个lcore是否enable
lcore_queue_conf[rx_lcore_id].n_rx_port == // 结构体变量访问成员用. 结构体指针访问成员用->
l2fwd_rx_queue_per_lcore) { // 命令行里的-q参数的值
rx_lcore_id++; // 寻找一个可用的内核
if (rx_lcore_id >= RTE_MAX_LCORE) // RTE_MAX_LCORE最大核心数的值就是128
rte_exit(EXIT_FAILURE, "Not enough cores\n");
}
if (qconf != &lcore_queue_conf[rx_lcore_id])
/* Assigned a new logical core in the loop above. */
qconf = &lcore_queue_conf[rx_lcore_id]; // qconf指向这个逻辑核的结构体
qconf->rx_port_list[qconf->n_rx_port] = portid; // 逻辑核指向这个端口
qconf->n_rx_port++;
printf("Lcore %u: RX port %u\n", rx_lcore_id, portid);
}
nb_ports_available = nb_ports; // 端口的数量
这部分主体就是遍历每一个可用的端口,while循环中有两个条件是或的关系,第一个条件是rte_lcore_is_enabled(rx_lcore_id) == 0,也就是判断这个lcore是否是可用的,如果可用的话就返回1,第二个条件是lcore_queue_conf[rx_lcore_id].n_rx_port == l2fwd_rx_queue_per_lcore),这个l2fwd_rx_queue_per_lcore就是-q参数的值,也就是每个核轮询的端口的数量,lcore_queue_conf是一个结构体,内容为
struct lcore_queue_conf {
unsigned n_rx_port;
unsigned rx_port_list[MAX_RX_QUEUE_PER_LCORE];
} __rte_cache_aligned;
n_rx_port是端口的数量,rx_port_list是端口号组成的数组,所以这里while循环的逻辑就是说找到一个可用逻辑核,并且这个逻辑核轮询的端口的数量比l2fwd_rx_queue_per_lcore的值小,否则就让rx_lcore_id++,也就是去找下一个满足条件的逻辑核,找到以后就让qconf这个临时的指针指向这个lcore的rx_port_list结构体,然后让这个结构体中的n_rx_port数量加1,并且在rx_port_list数组中添加这个端口号
5. 初始化每一个端口
这一步主要是对每一个端口进行配置,也就是将一些配置信息写入到端口的相关结构体中
nb_ports_available = nb_ports; // 端口的数量
/* Initialise each port */
for (portid = 0; portid < nb_ports; portid++) {
/* skip ports that are not enabled */
if ((l2fwd_enabled_port_mask & (1 << portid)) == 0) {
printf("Skipping disabled port %u\n", portid);
nb_ports_available--;
continue;
}
/* init port */
printf("Initializing port %u... ", portid);
fflush(stdout);// 清除缓冲区 强迫未写入磁盘的内容立即写入
ret = rte_eth_dev_configure(portid, 1, 1, &port_conf); // 配置端口 这里一个port对应一个收队列和发队列
// 设置端口的rte_eth_dev结构体的值 检查设备支持什么类型的中断、支持的包大小等
if (ret < 0)
rte_exit(EXIT_FAILURE, "Cannot configure device: err=%d, port=%u\n",
ret, portid);
ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd, &nb_txd);// 设置收发队列的descriptors的数量
if (ret < 0)
rte_exit(EXIT_FAILURE,
"Cannot adjust number of descriptors: err=%d, port=%u\n",
ret, portid);
rte_eth_macaddr_get(portid,&l2fwd_ports_eth_addr[portid]); // 获取端口的mac地址 写到最后一个参数中
/* init one RX queue */ // 初始化一个收队列 这里因为设置了一个端口只有一个收队列和发队列
fflush(stdout);
ret = rte_eth_rx_queue_setup(portid, 0, nb_rxd,
rte_eth_dev_socket_id(portid), // 这个port的socket: numa node
NULL,
l2fwd_pktmbuf_pool);
if (ret < 0)
rte_exit(EXIT_FAILURE, "rte_eth_rx_queue_setup:err=%d, port=%u\n",
ret, portid);
/* init one TX queue on each port */ // 初始化一个发队列 这里因为设置了一个端口只有一个收队列和发队列
// 设置发送队列
fflush(stdout);
ret = rte_eth_tx_queue_setup(portid, 0, nb_txd,
rte_eth_dev_socket_id(portid),
NULL);
if (ret < 0)
rte_exit(EXIT_FAILURE, "rte_eth_tx_queue_setup:err=%d, port=%u\n",
ret, portid);
/* Initialize TX buffers */
// 初始化发送缓冲区
//每个端口分配接收缓冲区,根据numa架构的socket就近分配
tx_buffer[portid] = rte_zmalloc_socket("tx_buffer",
RTE_ETH_TX_BUFFER_SIZE(MAX_PKT_BURST), 0,
rte_eth_dev_socket_id(portid));
if (tx_buffer[portid] == NULL)
rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n",
portid);
// 初始化发送缓冲区
rte_eth_tx_buffer_init(tx_buffer[portid], MAX_PKT_BURST);
// 设置缓冲区的err_callback 发生错误的回调函数 用了函数指针
ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[portid],
rte_eth_tx_buffer_count_callback,
&port_statistics[portid].dropped);
if (ret < 0)
rte_exit(EXIT_FAILURE,
"Cannot set error callback for tx buffer on port %u\n",
portid);
/* Start device */
// 启动端口
ret = rte_eth_dev_start(portid);
if (ret < 0)
rte_exit(EXIT_FAILURE, "rte_eth_dev_start:err=%d, port=%u\n",
ret, portid);
printf("done: \n");
rte_eth_promiscuous_enable(portid);
// 打印端口的mac地址
printf("Port %u, MAC address: %02X:%02X:%02X:%02X:%02X:%02X\n\n",
portid,
l2fwd_ports_eth_addr[portid].addr_bytes[0],
l2fwd_ports_eth_addr[portid].addr_bytes[1],
l2fwd_ports_eth_addr[portid].addr_bytes[2],
l2fwd_ports_eth_addr[portid].addr_bytes[3],
l2fwd_ports_eth_addr[portid].addr_bytes[4],
l2fwd_ports_eth_addr[portid].addr_bytes[5]);
/* initialize port stats */
// 初始化端口数据 收发包数等
memset(&port_statistics, 0, sizeof(port_statistics));
}
if (!nb_ports_available) {
rte_exit(EXIT_FAILURE,
"All available ports are disabled. Please set portmask.\n");
}
首先是rte_eth_dev_configure(portid, 1, 1, &port_conf),这个函数主要是对端口的rte_eth_dev结构体进行赋值,并为这个端口的分配和配置队列空间,这个例子中为每一个端口各分配1个收队列和1个发队列,函数主体如下
int rte_eth_dev_configure(uint16_t port_id, uint16_t nb_rx_q, uint16_t nb_tx_q,
const struct rte_eth_conf *dev_conf)
{
struct rte_eth_dev *dev; // 指向这个端口的rte_eth_dev结构体,结构体中主要都是函数指针指向PMD操作的相关函数和用户空间收发数据的相关参数,还包括了rte_eth_dev_data结构体,这个结构体中又有rte_eth_conf结构体
struct rte_eth_dev_info dev_info; // 这个端口的信息 包括收发队列的数量 最大数量 负载能力 descriptor的数量 mac地址最大数量 还有收发队列的默认配置信息等
struct rte_eth_conf local_conf = *dev_conf; // 指向传入的参数dev_conf
int diag;
RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);// 检查portid是否合法
……
RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_infos_get, -ENOTSUP); // 检查函数指针是否合法
RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_configure, -ENOTSUP); // 检查函数指针是否合法
……
// 赋值local_conf 到dev的data字段中
memcpy(&dev->data->dev_conf, &local_conf, sizeof(dev->data->dev_conf)); // 本地的local_conf 复制到dev中的
……
if (nb_rx_q > dev_info.max_rx_queues) { // 判断队列数量是否超过了这个端口所支持的最大数量
RTE_PMD_DEBUG_TRACE("ethdev port_id=%d nb_rx_queues=%d > %d\n",
port_id, nb_rx_q, dev_info.max_rx_queues);
return -EINVAL;
}
if (nb_tx_q > dev_info.max_tx_queues) {
RTE_PMD_DEBUG_TRACE("ethdev port_id=%d nb_tx_queues=%d > %d\n",
port_id, nb_tx_q, dev_info.max_tx_queues);
return -EINVAL;
}
// 判断端口支持的中断类型 对dev中的data字段判断
if ((dev_conf->intr_conf.lsc == 1) &&
(!(dev->data->dev_flags & RTE_ETH_DEV_INTR_LSC))) {
RTE_PMD_DEBUG_TRACE("driver %s does not support lsc\n",
dev->device->driver->name);
return -EINVAL;
}
if ((dev_conf->intr_conf.rmv == 1) &&
(!(dev->data->dev_flags & RTE_ETH_DEV_INTR_RMV))) {
RTE_PMD_DEBUG_TRACE("driver %s does not support rmv\n",
dev->device->driver->name);
return -EINVAL;
}
……
diag = rte_eth_dev_rx_queue_config(dev, nb_rx_q); // 根据数据为端口分配收队列空间
diag = rte_eth_dev_tx_queue_config(dev, nb_tx_q); // 根据数据为端口分配发队列空间
……
return 0;
}
然后是ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd, &nb_txd);这个函数是设置收发队列的descriptors的数量,主要修改的是这个端口的rte_eth_dev_info结构体中的字段,然后是rte_eth_rx_queue_setup和rte_eth_tx_queue_setup函数,分别用来初始化这个端口的收队列和发队列(前边的rte_eth_dev_configure只是分配了队列的空间)这两个函数的逻辑都是类似的,都会对queue_id的合法性检查,对mbuf的大小检查等,最后通过rte_eth_dev结构体中的函数指针rx_queue_setup或者tx_queue_setup调用对应的setup函数为队列设置numa节点,分配mbuf等,rte_eth_rx_queue_setup和rte_eth_tx_queue_setup函数执行完毕后,会为发队列分配缓存空间并开启端口,并初始化各个端口的port_statistics数据,也就是收发包数量等数据
6. 启动收发过程
最后一步,启动收发过程,这一过程同样是在第5部的循环中
check_all_ports_link_status(nb_ports, l2fwd_enabled_port_mask);
ret = 0;
/* launch per-lcore init on every lcore */
rte_eal_mp_remote_launch(l2fwd_launch_one_lcore, NULL, CALL_MASTER); // 函数指针
RTE_LCORE_FOREACH_SLAVE(lcore_id) { // for循环的宏定义 遍历除了master core的所有的lcore
if (rte_eal_wait_lcore(lcore_id) < 0) { // 如果有一个core完成了任务 就break
ret = -1;
break;
}
}
for (portid = 0; portid < nb_ports; portid++) {
if ((l2fwd_enabled_port_mask & (1 << portid)) == 0)
continue;
printf("Closing port %d...", portid);
rte_eth_dev_stop(portid);
rte_eth_dev_close(portid);
printf(" Done\n");
}
printf("Bye...\n");
return ret;
这一步主要是调用了rte_eal_mp_remote_launch函数,这个函数主要逻辑如下
int rte_eal_mp_remote_launch(int (*f)(void *), void *arg, enum rte_rmt_call_master_t call_master){
int lcore_id;
int master = rte_get_master_lcore();
……
//slave lcore启动l2fwd_launch_one_lcore函数
/* send messages to cores */
RTE_LCORE_FOREACH_SLAVE(lcore_id) {
rte_eal_remote_launch(f, arg, lcore_id);
}
//master lcore任务完成
if (call_master == CALL_MASTER) {
lcore_config[master].ret = f(arg);
lcore_config[master].state = FINISHED;
}
return 0;
}
可以看到master lcore在完成配置和初始化任务后就表记成完成状态,而slave lcore调用l2fwd_launch_one_lcore函数开始处理业务,而这个函数调用了l2fwd_main_loop()函数,就是slave lcore的主循环了,l2fwd_main_loop()的主体就是一个循环
//直到发生了强制退出,在这里就是ctrl+c或者kill了这个进程
while (!force_quit) {
cur_tsc = rte_rdtsc(); // 当前时间
/*
* TX burst queue drain
*/
diff_tsc = cur_tsc - prev_tsc;
//过了100us,把发送buffer里的报文发出去
if (unlikely(diff_tsc > drain_tsc)) {
// 这个lcore的所有的队列里的缓存全部发出去
for (i = 0; i < qconf->n_rx_port; i++) {
portid = l2fwd_dst_ports[qconf->rx_port_list[i]];
buffer = tx_buffer[portid];
sent = rte_eth_tx_buffer_flush(portid, 0, buffer);
if (sent)
port_statistics[portid].tx += sent;
}
/* if timer is enabled */
if (timer_period > 0) {
/* advance the timer */
timer_tsc += diff_tsc;
/* if timer has reached its timeout */
// 到了时间打印端口数据
if (unlikely(timer_tsc >= timer_period)) {
/* do this only on master core */
if (lcore_id == rte_get_master_lcore()) {
print_stats();
/* reset the timer */
timer_tsc = 0;
}
}
}
prev_tsc = cur_tsc;
}
/*
* Read packet from RX queues
* 没有到发送时间片就读接收队列里的报文
*/
for (i = 0; i < qconf->n_rx_port; i++) { // 遍历当前lcore的收队列
portid = qconf->rx_port_list[i]; // 获取portid
nb_rx = rte_eth_rx_burst(portid, 0, // 这个端口收到的包的数量
pkts_burst, MAX_PKT_BURST); // 数据包放入到pkts_burst
port_statistics[portid].rx += nb_rx; // 当前端口的收报数增加
for (j = 0; j < nb_rx; j++) { // 遍历收到的包
m = pkts_burst[j];
rte_prefetch0(rte_pktmbuf_mtod(m, void *));
l2fwd_simple_forward(m, portid);
}
}
}
首先是两个if判断,如果到了drain_tsc时间片,那么就调用rte_eth_tx_buffer_flush函数根据l2fwd_dst_ports数组确定的每个端口的目的端口将队列缓存中的数据包立即发送出去,如果到了timer_period时间片,就打印一次各个端口的数据,否则就遍历当前lcore的收队列,用rte_eth_rx_burst来读取收到的数据包,然后执行l2fwd_simple_forward函数,这个函数的逻辑也是很清晰的
unsigned dst_port;
int sent;
struct rte_eth_dev_tx_buffer *buffer;
dst_port = l2fwd_dst_ports[portid];
if (mac_updating)
l2fwd_mac_updating(m, dst_port);
buffer = tx_buffer[dst_port];
sent = rte_eth_tx_buffer(dst_port, 0, buffer, m);
if (sent)
port_statistics[dst_port].tx += sent;
就是根据l2fwd_dst_ports数组确定当前这个端口的目的端口,调用rte_eth_tx_buffer将数据放入发队列的缓存中,如果放入缓存中数据包被发送出去的话发包数就增加,没发出去的话就等待下次循环到达drain_tsc时间片,调用rte_eth_tx_buffer_flush函数将缓存中的数据发送出去,这里的l2fwd_mac_updating函数的逻辑就是开头提到的一个mac地址的更新过程
static void l2fwd_mac_updating(struct rte_mbuf *m, unsigned dest_portid) {
struct ether_hdr *eth;
void *tmp;
eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
/* 02:00:00:00:00:xx */
tmp = ð->d_addr.addr_bytes[0];
*((uint64_t *)tmp) = 0x000000000002 + ((uint64_t)dest_portid << 40);
/* src addr */
ether_addr_copy(&l2fwd_ports_eth_addr[dest_portid], ð->s_addr);
}
数据包的源mac会变成发送端口的mac地址,数据包的目的mac会变成02:00:00:00:00:发送端口的ID,slave lcore的循环是否停止是由force_quit变量决定的,main函数开头处signal(SIGINT, signal_handler)和signal(SIGTERM, signal_handler)绑定了对信号的处理函数,这个处理函数就是将force_quit值置为true
static void signal_handler(int signum)
{
if (signum == SIGINT || signum == SIGTERM) {
printf("\n\nSignal %d received, preparing to exit...\n",
signum);
force_quit = true;
}
}
最后在gns3上构建如下拓扑测试一下,注意要注释掉l2fwd_mac_updating,这样才能让主机通信
分配好ip地址后,可以看到PC1和PC2之间可以ping通,PC3和PC4之间可以ping通,但是PC3和PC2之间不能ping通,因为1,2端口是一组,3,4端口是一组