NAPI是内核最新的驱动Rx/Tx接口。
netif_napi_add
以太驱动通过netif_napi_add
()来添加一个napi,例如:
netif_napi_add(netdev, &rx_info->napi_rx, syn_dp_napi_poll_rx, SYN_DP_NAPI_BUDGET_RX)
函数原型如下:
void netif_napi_add(struct net_device *dev, struct napi_struct *napi,
int (*poll)(struct napi_struct *, int), int weight)
第一参数是网络设备,第二个参数是napi结构体,第3个参数是poll函数,由驱动提供,内核调用,用来做收包。
第4个参数是weight, 用来指定单次poll调用收包的个数。
通常,由以太驱动,在它的中断处理函数中,调用napi_schedule(),来通知内核,我有包要进来了,你要准备调用我的poll函数。
void __napi_schedule(struct napi_struct *n)
{
unsigned long flags;
local_irq_save(flags);
____napi_schedule(this_cpu_ptr(&softnet_data), n);
local_irq_restore(flags);
}
可以看到,传入了一个softnet_data的每cpu全局变量。
static inline void ____napi_schedule(struct softnet_data *sd,
struct napi_struct *napi)
{
list_add_tail(&napi->poll_list, &sd->poll_list);
__raise_softirq_irqoff(NET_RX_SOFTIRQ)
}
可以看到,他将napi加入到softnet_data中,然后raise NET_RX_SOFTIRQ软中断。
在 net/dev/core.c的net_dev_init()中,注册NET_RX_SOFTIRQ软中断:
open_softirq(NET_RX_SOFTIRQ, net_rx_action);
static __latent_entropy void net_rx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);
unsigned long time_limit = jiffies +
usecs_to_jiffies(netdev_budget_usecs);
int budget = netdev_budget;
LIST_HEAD(list);
LIST_HEAD(repoll);
local_irq_disable();
list_splice_init(&sd->poll_list, &list);
local_irq_enable();
for (;;) {
struct napi_struct *n;
if (list_empty(&list)) {
if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
return;
break;
}
n = list_first_entry(&list, struct napi_struct, poll_list);
budget -= napi_poll(n, &repoll);
/* If softirq window is exhausted then punt.
* Allow this to run for 2 jiffies since which will allow
* an average latency of 1.5/HZ.
*/
if (unlikely(budget <= 0 ||
time_after_eq(jiffies, time_limit))) {
sd->time_squeeze++;
break;
}
}
local_irq_disable();
list_splice_tail_init(&sd->poll_list, &list);
list_splice_tail(&repoll, &list);
list_splice(&list, &sd->poll_list);
if (!list_empty(&sd->poll_list))
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
net_rps_action_and_irq_enable(sd);
}
net_rx_action遍历当前cpu softnet_data中,所有的napi struct, 执行napi_poll()函数。
单次net_rx_action的最大收包个数是netdev_budget个,或者时间超过了netdev_budget_usecs。如果这种情况发生了,
将sd的time_squeeze计数+1.
这两个参数都是sysctl可以控制的:
/ # cat /proc/sys/net/core/netdev_budget
300
/ # cat /proc/sys/net/core/netdev_budget_usecs
20000
time_squeeze计数,可以通过softnet_stat读取:
/ # cat /proc/net/softnet_stat
00000346 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000000
00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000001
没有完成收包的napi struct,将加入到repoll列表中,最终又加入到sd的poll list中,如果执行完net_rx_action后,sd的poll list为非空。
会再次raise NET_RX_SOFTIRQ.
如下,执行__napi_poll, 返回收包的个数,以及是否需要repoll, 如果需要repoll, 则将其加入到repoll列表中。
static int napi_poll(struct napi_struct *n, struct list_head *repoll)
{
bool do_repoll = false;
void *have;
int work;
list_del_init(&n->poll_list);
have = netpoll_poll_lock(n);
work = __napi_poll(n, &do_repoll);
if (do_repoll)
list_add_tail(&n->poll_list, repoll);
netpoll_poll_unlock(have);
return work;
}
static int __napi_poll(struct napi_struct *n, bool *repoll)
{
int work, weight;
weight = n->weight;
/* This NAPI_STATE_SCHED test is for avoiding a race
* with netpoll's poll_napi(). Only the entity which
* obtains the lock and sees NAPI_STATE_SCHED set will
* actually make the ->poll() call. Therefore we avoid
* accidentally calling ->poll() when NAPI is not scheduled.
*/
work = 0;
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
work = n->poll(n, weight);
trace_napi_poll(n, work, weight);
}
if (unlikely(work > weight))
pr_err_once("NAPI poll function %pS returned %d, exceeding its budget of %d.\n",
n->poll, work, weight);
if (likely(work < weight))
return work;
。。。
*repoll = true;
return work;
}
可以看到,调用poll(),函数,第一个是napi struct,第二个参数是添加napi时候的weight. 最终返回收包的个数。
如果收包的个数小于weight, 则直接返回,在驱动中,需要调用 napi_complete.
如果收包的个数等于weight,则会设置repoll为true。
这个poll指的驱动的poll函数,poll从dma中拿到数据,封装好skb后,有2种方法交给协议栈处理。
int netif_receive_skb(struct sk_buff *skb)
或者
gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
nss dp的skb封装和递交协议栈代码:
skb_put(rx_skb, frame_length);
rx_skb->protocol = eth_type_trans(rx_skb, netdev);
#if defined(NSS_DP_ENABLE_NAPI_GRO)
napi_gro_receive(&rx_info->napi_rx, rx_skb);
#else
netif_receive_skb(rx_skb);
#endif
Generic Receive Offloading (GRO)
是一项技术,将多个连续的skb,如果是一条流的,可以合并成一个传递给协议栈处理,加快处理速度。
测试IPQ50XX nss dp有线驱动,关闭NSS_DP_ENABLE_NAPI_GRO后,lan-wan吞吐量从500MBps下降到340Mbps