设备接口层接收数据包(一)

  • Post author:
  • Post category:其他


从软件层面看,数据包最先肯定是由网络设备驱动程序收到的,但是不同的驱动(不同的硬件)有不同的方式,这个地方的逻辑并不统一。这篇笔记关注的重点是之后驱动程序如何将数据包递交给协议栈,以及协议栈在后续是如何处理的。



数据包接收模式

当前内核提供了两种数据包接收模式:非NAPI方式(老方法)和NAPI(新方法,即New API)。新老方法的接收过程分别如下图所示:

在这里插入图片描述



非NAPI模式

如上图b所示,首先驱动程序会从硬件取出网络数据包构造出新的skb,然后直接将该skb放入当前CPU的接收队列input_pkt_queue中,该队列随后会由网络接收软中断程序处理,软中断处理程序会将数据包递交给上层协议栈。

图中的“将数据包添加到输入队列”操作是驱动程序通过netif_rx()接口实现的,该接口是设备接口层提供给驱动程序的。

关于接收队列和网络接收软中断的介绍见下文。



NAPI模式

如上图a所示,这种方式的第一步并不是将数据包从硬件中读取出来,而是将网络设备对象(实际上是struct napi_struct)添加到轮询队列poll_list中,然后激活网络接收软中断,在软中断处理函数中遍历该poll_list,依次处理轮询队列上的设备,回调设备提供的数据接收函数完成数据包的接收。


注:轮询队列中挂接的是实际上是struct napi_struct对象,但是由于网络设备对象和该对象是一一对应的,这么描述也没什么毛病。



新老模式对比

非NAPI方式使得驱动程序的接收过程大部分在中断处理过程中(也有其它写法代替),在高负载场景下,可能会频繁的中断CPU,造成资源浪费。新方法背后的思想也简单:就是中断+轮询。首次数据包到达时,中断CPU,然后驱动程序关掉设备中断,将设备放入poll_list中,只要设备中一直有数据,那么就让该设备一直在poll_list中,网络接收软中断会不断的处理该poll_list,这样可以不断的从设备中读取数据,直到该设备中的数据读取完毕为止。这种模式可以尽可能的减少中断的次数,但是又不会引入太大的时延,所以内核建议新开发驱动都采用NAPI方式,当然了,作为内核框架是完全兼容这两种方式的,下文会介绍这是怎么实现的。



接收队列

从上面可以看出,非NAPI模式,接收队列就是input_pkt_queue队列;NAPI模式的接收队列就是poll_list队列。

为了保证接收效率,接收队列是PER-CPU类型的变量,这样就可以避免加锁导致效率变低,设备接口层将接收队列定义为struct softnet_data,其初始化是在设备接口层初始化过程中完成的,具体见

这里

/*
 * Incoming packets are placed on per-cpu queues so that
 * no locking is needed.
 */
struct softnet_data
{
	// 对于非NAPI方式的接收,驱动通过轮询或者硬中断(或二者结合)的方式将数据包放入该队列,然后激活
	// 接收软中断程序,软中断程序会基于流量控制的排队规则将数据包递交给上层协议栈
	struct sk_buff_head	input_pkt_queue;
	// 轮询队列。驱动程序会将需要轮询的网络设备对象的struct napi_struct成员链接到该队列并激活接收
	// 软中断程序。接收软中断程序会遍历该队列,调用驱动程序的ndo_netpoll()完成数据包接收过程
	struct list_head	poll_list;
	// 为了将软中断接收处理程序对非NAPI方式和NAPIF方式的处理统一,对于非NAPI接收,在硬中断处理
	// 后,将backlog结构加入到poll_list,然后触发软中断接收程序,具体使用见下面非NAPI方式的接收
	struct napi_struct	backlog;
};

PS:

实际上softnet_data还包括发送队列,只不过这里我们先将其忽略,聚焦接收过程



struct napi_struct

内核开始支持NAPI模式后,将NAPI和非NAPI两种情况下的接收过程进行了整合,实际上就是把非NAPI模式下的数据包接收过程抽象成了一个特殊的NAPI接收流程,这是靠接收队列中的backlog成员实现的,具体见后面非NAPI模式接收部分实现。

所以无论是NAPI和非NAPI接收模式,都离不开struct napi_struct对象。

/*
 * Structure for NAPI scheduling similar to tasklet but with weighting
 */
struct napi_struct {
	/* The poll_list must only be managed by the entity which
	 * changes the state of the NAPI_STATE_SCHED bit.  This means
	 * whoever atomically sets that bit can add this napi_struct
	 * to the per-cpu poll_list, and whoever clears that bit
	 * can remove from the list right before clearing the bit.
	 */
	// 用于将该对象链入轮询队列poll_list中
	struct list_head	poll_list;
	unsigned long		state; // 见下方
	// 接收配额,一次轮询可以接收的最大数据包数目,见net_rx_action()
	int			weight;
	// 轮询接口,由驱动程序提供
	int			(*poll)(struct napi_struct *, int);
#ifdef CONFIG_NETPOLL
	spinlock_t		poll_lock;
	int			poll_owner;
#endif
	struct net_device	*dev;
	struct list_head	dev_list;
	struct sk_buff		*gro_list;
	struct sk_buff		*skb;
};

// state取值
enum
{
    // 调度标记,设置该标记说明该对象正在轮询队列poll_list中等待被轮询
	NAPI_STATE_SCHED,	/* Poll is scheduled */
	// 禁用标记,设置该标记说明该对象当前不能被调度
	NAPI_STATE_DISABLE,	/* Disable pending */
	NAPI_STATE_NPSVC,	/* Netpoll - don't dequeue from poll_list */
};



接收软中断

从上面的介绍可以发现,无论是哪种接收模式,都需要接收软中断参与,其实现如下。

int netdev_budget __read_mostly = 300;

static void net_rx_action(struct softirq_action *h)
{
    // 获取当前CPU上的待轮询设备队列
	struct list_head *list = &__get_cpu_var(softnet_data).poll_list;
	// 记录本次轮询的限制时间
	unsigned long time_limit = jiffies + 2;
	// 每次软中断处理过程可以接收的最大数据包数目,即配额,这是一个总配额,
	// 即所有网络设备对象的接收总数不能超过该值
	int budget = netdev_budget;
	void *have;

	// 由于驱动的中断处理函数也会操作poll_list(添加napi_struct),所以虽然接收队列
	// 是每个CPU一份,但是还是需要关闭本地CPU的中断
	local_irq_disable();
	// 遍历轮询链表,轮询接收每个设备
	while (!list_empty(list)) {
		struct napi_struct *n;
		int work, weight;

		// cond1:本次接收软中断的总接收配额已经用完了,所以停止接收,等待下次调度
		// cond2: 本次接收软中断的执行时间已经超过2个时钟嘀嗒,所以也停止接收,等待下次调度
		// 显然这种设计从接收包数和处理时长两个维度来控制软中断的执行时长,避免其长时间执行(因为关闭本地CPU中断了)
		// 进而影响整个系统的响应速度
		if (unlikely(budget <= 0 || time_after(jiffies, time_limit)))
			goto softnet_break;
		/* Even though interrupts have been re-enabled, this
		 * access is safe because interrupts can only add new
		 * entries to the tail of this list, and only ->poll()
		 * calls can remove this head entry from the list.
		 */
		local_irq_enable();
		n = list_entry(list->next, struct napi_struct, poll_list);

		have = netpoll_poll_lock(n); // net poll相关
        // 获取该网络设备驱动自己的一次轮询对应的接收配额
		weight = n->weight;
		/* This NAPI_STATE_SCHED test is for avoiding a race
		 * with netpoll's poll_napi().  Only the entity which
		 * obtains the lock and sees NAPI_STATE_SCHED set will
		 * actually make the ->poll() call.  Therefore we avoid
		 * accidently calling ->poll() when NAPI is not scheduled.
		 */
		// 调用驱动程序提供的poll接口进行数据的接收,返回值work代表实际读取到的数据包数
		// 如果数据已经全部读取完毕,poll的实现应该将该设备从轮询队列中移除
		work = 0;
		if (test_bit(NAPI_STATE_SCHED, &n->state))
			work = n->poll(n, weight);
        // 一次接收超过了指定配额,那么属于驱动程序设计不合理,打印告警,
		WARN_ON_ONCE(work > weight);
        // 从总的配额中减去该网络设备消耗的配额
		budget -= work;

        // 下面可能会修改poll_list,所以需要重新关闭本地CPU中断
		local_irq_disable();
		/* Drivers must not modify the NAPI state if they
		 * consume the entire weight.  In such cases this code
		 * still "owns" the NAPI instance and therefore can
		 * move the instance around on the list at-will.
		 */
		// 这段代码的逻辑在下面解释
		if (unlikely(work == weight)) {
			if (unlikely(napi_disable_pending(n)))
				__napi_complete(n);
			else
				list_move_tail(&n->poll_list, list);
		}
		netpoll_poll_unlock(have);
	}
out:
	local_irq_enable();

#ifdef CONFIG_NET_DMA
	/*
	 * There may not be any more sk_buffs coming right now, so push
	 * any pending DMA copies to hardware
	 */
	dma_issue_pending_all();
#endif
	return;

softnet_break:
	// 更新统计值time_squeeze,如果该值过大,表示每次软中断都没有处理完数据包,
	// 说明网卡接收是非常忙碌的,所以这里可能是性能瓶颈
	__get_cpu_var(netdev_rx_stat).time_squeeze++;
	// 因为poll_list中还有设备需要接收数据,所以需要再次激活软中断
	__raise_softirq_irqoff(NET_RX_SOFTIRQ);
	goto out;
}


系统参数:netdev_budget和/proc/sys/net/core/netdev_budget文件对应,默认为300。



NAPI模式接收数据包

需要说明的是,驱动程序可以选择不支持这种方式,但是对于高速网络设备,实现它是必要的,如前面所讲,这种方式可以减少中断CPU的次数,对接收效率有很大的提升。NAPI方式的数据包处理方式如下图所示:

在这里插入图片描述

实现要点有两个:

  1. 中断处理程序的实现;
  2. 轮询接口poll()的实现;

下面以e100网卡为例,看这两个过程的核心代码实现。



中断处理程序

static irqreturn_t e100_intr(int irq, void *dev_id)
{
	struct net_device *netdev = dev_id;
	struct nic *nic = netdev_priv(netdev);
...
	// 需要接收数据包,检查是否可以激活,如果需要激活则关闭硬中断,然后开始调度
	if(likely(netif_rx_schedule_prep(netdev, &nic->napi))) {
		e100_disable_irq(nic);
		__netif_rx_schedule(netdev, &nic->napi);
	}
	return IRQ_HANDLED;
}



激活接收软中断

激活接收软中断之前,先得判断下是否真的需要激活,netif_rx_schedule_prep()用于检查这种必要性:

/* Test if receive needs to be scheduled but only if up */
static inline int netif_rx_schedule_prep(struct net_device *dev, struct napi_struct *napi)
{
	return napi_schedule_prep(napi);
}

static inline int napi_disable_pending(struct napi_struct *n)
{
	return test_bit(NAPI_STATE_DISABLE, &n->state);
}

/**
 *	napi_schedule_prep - check if napi can be scheduled
 *	@n: napi context
 *
 * Test if NAPI routine is already running, and if not mark
 * it as running.  This is used as a condition variable
 * insure only one NAPI poll instance runs.  We also make
 * sure there is no pending NAPI disable.
 */
static inline int napi_schedule_prep(struct napi_struct *n)
{
	// 没有被DISABLE,设备也还没有处于调度状态(NAPI_STATE_SCHED标志位置位)
	return !napi_disable_pending(n) &&
		!test_and_set_bit(NAPI_STATE_SCHED, &n->state);
}

实际上就是确定state的两个标记都没有设置,这种情况下说明满足激活条件(没有NAPI_STATE_DISABLE置位说明设备还可以工作;没有NAPI_STATE_SCHED置位说明当前设备还没有被调度),那么设置NAPI_STATE_SCHED标记。

__netif_rx_schedule()进行真正的调度处理,代码如下:

/* Add interface to tail of rx poll list. This assumes that _prep has
 * already been called and returned 1.
 */
static inline void __netif_rx_schedule(struct net_device *dev,  struct napi_struct *napi)
{
	__napi_schedule(napi);
}

/**
 * __napi_schedule - schedule for receive
 * @n: entry to schedule
 *
 * The entry's receive function will be scheduled to run
 */
void __napi_schedule(struct napi_struct *n)
{
	unsigned long flags;

	local_irq_save(flags);
	// 将设备加入到接收轮询队列中
	list_add_tail(&n->poll_list, &__get_cpu_var(softnet_data).poll_list);
	// 激活接收软中断
	__raise_softirq_irqoff(NET_RX_SOFTIRQ);
	local_irq_restore(flags);
}

还提供了一个函数来同时完成上面的两步:

/* Try to reschedule poll. Called by irq handler. */
static inline void netif_rx_schedule(struct net_device *dev, struct napi_struct *napi)
{
	if (netif_rx_schedule_prep(dev, napi))
		__netif_rx_schedule(dev, napi);
}



poll()接口的实现

static int e100_poll(struct napi_struct *napi, int budget)
{
	struct nic *nic = container_of(napi, struct nic, napi);
	struct net_device *netdev = nic->netdev;
	unsigned int work_done = 0;

	// 进行数据包的接收处理,work_done保存了本次接收了多少个数据包
	e100_rx_clean(nic, &work_done, budget);
	e100_tx_clean(nic);

	// 接收的数据包量小于配额,说明硬件中的数据已经收完了,那么结束调度过程,并且重新使能硬件中断
	if (work_done < budget) {
		netif_rx_complete(netdev, napi);
		e100_enable_irq(nic);
	}
	// 返回实际接收的数据包个数
	return work_done;
}

回头再来理解下前面接收软中断对poll()回调返回值的处理。实际上这里要实现的目标很明确:

如果驱动已经接收完了所有的数据包,那么停止调度它;否则将其放入poll_list的末尾等待下次轮询。

针对返回值work和接收配额weight的大小关系,有如下三种情况:

  1. work小于weight,这种情况说明驱动肯定已经接收完了数据,这时由驱动程序负责结束调度(如上面e100_poll()中对netif_rx_complete()的调用);
  2. work等于weight,这种情况驱动是否是刚好接收完了全部数据,只有驱动程序知道,但是框架不能做接收完毕的假设,此时框架按照没有接收完处理,这时由框架负责将设备加入poll_list的末尾继续调度;
  3. work大于weight,这种情况是bug,不允许出现的,见前面的poll()返回后的BUG_ON()检查,这也是第二种情况的原因。
static void net_rx_action(struct softirq_action *h)
{
...
	/* Drivers must not modify the NAPI state if they
	 * consume the entire weight.  In such cases this code
	 * still "owns" the NAPI instance and therefore can
	 * move the instance around on the list at-will.
	 */
	if (unlikely(work == weight)) {
		// 这是一种特殊情况处理,如果设备已经被Disable了,也结束调度
		if (unlikely(napi_disable_pending(n)))
			__napi_complete(n);
		else
			list_move_tail(&n->poll_list, list);
	}
...
}



结束调度: __napi_complete()

/* Remove interface from poll list: it must be in the poll list
 * on current cpu. This primitive is called by dev->poll(), when
 * it completes the work. The device cannot be out of poll list at this
 * moment, it is BUG().
 */
static inline void netif_rx_complete(struct napi_struct *napi)
{
	napi_complete(napi);
}

// 将设备从轮询列表中删除
void __napi_complete(struct napi_struct *n)
{
	BUG_ON(!test_bit(NAPI_STATE_SCHED, &n->state));
	BUG_ON(n->gro_list);

	list_del(&n->poll_list);
	smp_mb__before_clear_bit();
	clear_bit(NAPI_STATE_SCHED, &n->state);
}
EXPORT_SYMBOL(__napi_complete);

void napi_complete(struct napi_struct *n)
{
	unsigned long flags;

	/*
	 * don't let napi dequeue from the cpu poll list
	 * just in case its running on a different cpu
	 */
	if (unlikely(test_bit(NAPI_STATE_NPSVC, &n->state)))
		return;

	napi_gro_flush(n);
	local_irq_save(flags);
	__napi_complete(n);
	local_irq_restore(flags);
}



非NAPI模式接收数据包

这种模式下,驱动程序在收到数据后会调用设备接口层提供的netif_rx()将数据包放入input_pkt_queue中等待接收软中断程序进一步处理。



netif_rx()

int netdev_max_backlog __read_mostly = 1000;

/**
 *	netif_rx	-	post buffer to the network code
 *	@skb: buffer to post
 *
 *	This function receives a packet from a device driver and queues it for
 *	the upper (protocol) levels to process.  It always succeeds. The buffer
 *	may be dropped during processing for congestion control or by the
 *	protocol layers.
 *
 *	return values:
 *	NET_RX_SUCCESS	(no congestion)
 *	NET_RX_DROP     (packet was dropped)
 *
 */
int netif_rx(struct sk_buff *skb)
{
	struct softnet_data *queue;
	unsigned long flags;

	// 如果被netpoll处理了,直接返回DROP,协议栈不处理。
	// 这里忽略netpoll机制,认为其没有处理
	if (netpoll_rx(skb))
		return NET_RX_DROP;
    // 如果驱动程序没有为数据包设置接收时间戳,在这里设置它
	if (!skb->tstamp.tv64)
		net_timestamp(skb);

	/*
	 * The code is rearranged so that the path is the most
	 * short when CPU is congested, but is still operating.
	 */
	// 下面要把SKB放入input_pkt_queue队列,所以要先关闭本地CPU的硬中断,理由同上面poll_list
	local_irq_save(flags);
	// 获取本地CPU的收发队列
	queue = &__get_cpu_var(softnet_data);
    // 网卡设备接收数据包个数统计值+1
	__get_cpu_var(netdev_rx_stat).total++;
	
	// 判断input_pkt_queue队列中数据包个数是否超过了系统限制netdev_max_backlog
	if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {
	    // 如果队列不为空,说明软中断处理程序已经在处理该队列了,只需将数据包放入input_pkt_queue中即可
		if (queue->input_pkt_queue.qlen) {
enqueue:
			__skb_queue_tail(&queue->input_pkt_queue, skb);
			local_irq_restore(flags);
			return NET_RX_SUCCESS;
		}
        // 队列为空,作为第一个数据包需要将backlog调度到poll_list中,如果需要还要激活软中断处理程序,
		// 这里内核巧妙的将NAPI和非NAPI模式的处理流程进行了兼容
		napi_schedule(&queue->backlog);
		goto enqueue;
	}
    // 到这里说明input_pkt_queue队列中的数据包超过了系统限制,这时会统计后丢弃该数据包
	__get_cpu_var(netdev_rx_stat).dropped++;
	local_irq_restore(flags);
	kfree_skb(skb);
	return NET_RX_DROP;
}

从代码中可以看出,netif_rx()所做的工作就是将数据包放入input_pkt_queue中,如果没有激活接收软中断,那么激活它。


系统参数:netdev_max_backlog和/proc/sys/net/core/netdev_max_backlog文件对应,默认为1000。



非NAPI模式的poll()回调: process_backlog()

到这里就能看得出来内核是如何将非NAPI方式的接收处理和NAPI方式统一起来的。对于非NAPI,其轮询回调为process_backlog(),它需要做的事情就是遍历input_pkt_queue,将其中的数据包递交给上层协议。

static int process_backlog(struct napi_struct *napi, int quota)
{
	int work = 0;
	// 获取本地CPU的接收队列
	struct softnet_data *queue = &__get_cpu_var(softnet_data);
	unsigned long start_time = jiffies;

	// 读取数据量超过配额或者到达1个jiffies后结束
	napi->weight = weight_p;
	do {
		struct sk_buff *skb;

		local_irq_disable();
		// 取出一个数据包
		skb = __skb_dequeue(&queue->input_pkt_queue);
		// 如果队列已空,接收过程结束,将这个特殊的napi_struct从轮询队列中移除,结束调度
		if (!skb) {
			local_irq_enable();
			napi_complete(napi);
			goto out;
		}
		local_irq_enable();
        // 直接将数据包递交给高层协议
		napi_gro_receive(napi, skb);
	} while (++work < quota && jiffies == start_time);
	napi_gro_flush(napi);

out:
	// 返回本次实际接收的数据包数
	return work;
}



版权声明:本文为fanxiaoyu321原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。