NAPI简介
它的核心概念就是不采用中断的方式读取数据,而代之以首先采用中断唤醒数据接收的服务程序,然后 POLL 的方法来轮询数据。NAPI是综合中断方式与轮询方式的技术。
中断的好处是响应及时,如果数据量较小,则不会占用太多的CPU事件;缺点是数据量大时,会产生过多中断,而每个中断都要消耗不少的CPU时间,从而导致效率反而不如轮询高。
轮询方式与中断方式相反,它更适合处理大量数据,因为每次轮询不需要消耗过多的CPU时间;缺点是即使只接收很少数据或不接收数据时,也要占用CPU
时间。
NAPI是两者的结合,数据量低时采用中断,数据量高时采用轮询。当有数据到达时,会触发中断处理函数执行,中断处理函数关闭中断开始处理。如果此时有数据到达,则没必要再触发中断了,因为中断处理函数中会轮询处理数据,直到没有新数据时才打开中断。很明显,数据量很低与很高时,NAPI可以发挥中断与轮询方式的优点,性能较好。如果数据量不稳定,且说高不高说低不低,则NAPI则会在两种方式切换上消耗不少时间,效率反而较低一些。
2.实现区别分析:
NAPI目前要求驱动设备提供poll 方法
非NAPI的内核接口为netif_rx(),NAPI的内核接口为napi_schedule()。
非NAPI使用共享的CPU队列softnet_data->input_pkt_queue,NAPI使用设备内存或者驱动程序的接受环。
*/
struct napi_struct {
/* The poll_list must only be managed by the entity which* changes the state of the NAPI_STATE_SCHED bit. This means* whoever atomically sets that bit can add this napi_struct* to the per-CPU poll_list, and whoever clears that bit* can remove from the list right before clearing the bit.*/struct list_head poll_list;/* 用于加入处于轮询状态的设备队列 */unsigned long state; /* 设备的状态 */ int weight; /* 每次处理的最大数量,非NAPI有默认值*/unsigned long gro_bitmask;int (*poll)(struct napi_struct *, int);/* 设备注册的轮询方法,非NAPI为process_backlog() */#ifdef CONFIG_NETPOLLint poll_owner;#endifstruct net_device *dev;struct gro_list gro_hash[GRO_HASH_BUCKETS];struct sk_buff *skb;struct list_head rx_list; /* Pending GRO_NORMAL skbs */int rx_count; /* length of rx_list */struct hrtimer timer;struct list_head dev_list;struct hlist_node napi_hash_node;unsigned int napi_id;struct task_struct *thread;
};
NAPI方式下收包流程:
NAPI方式下,过程如下:
1、驱动中调用netif_napi_add注册NAPI
// netif_napi_add(ð->dummy_dev, ð->tx_napi, mtk_napi_tx, MTK_NAPI_WEIGHT);
// netif_napi_add(ð->dummy_dev, ð->rx_napi[0].napi, mtk_napi_rx, MTK_NAPI_WEIGHT);void netif_napi_add(struct net_device *dev, struct napi_struct *napi,int (*poll)(struct napi_struct *, int), int weight)
{INIT_LIST_HEAD(&napi->poll_list);hrtimer_init(&napi->timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL_PINNED);napi->timer.function = napi_watchdog;init_gro_hash(napi);napi->skb = NULL;INIT_LIST_HEAD(&napi->rx_list);napi->rx_count = 0;napi->poll = poll;if (weight > NAPI_POLL_WEIGHT)netdev_err_once(dev, "%s() called with weight %d\n", __func__,weight);napi->weight = weight;napi->dev = dev;
#ifdef CONFIG_NETPOLLnapi->poll_owner = -1;
#endifset_bit(NAPI_STATE_SCHED, &napi->state);set_bit(NAPI_STATE_NPSVC, &napi->state);list_add_rcu(&napi->dev_list, &dev->napi_list);napi_hash_add(napi);/* Create kthread for this napi if dev->threaded is set.* Clear dev->threaded if kthread creation failed so that* threaded mode will not be enabled in napi_enable().*/if (dev->threaded && napi_kthread_create(napi))dev->threaded = 0;
}
NAPI 方式在硬中断处理函数中调用napi_schedule 来获取数据报文
static irqreturn_t mtk_handle_irq_rx(int irq, void *priv)
{struct mtk_napi *rx_napi = priv;struct mtk_eth *eth = rx_napi->eth;struct mtk_rx_ring *ring = rx_napi->rx_ring;if (likely(napi_schedule_prep(&rx_napi->napi))) {mtk_rx_irq_disable(eth, MTK_RX_DONE_INT(ring->ring_no));__napi_schedule(&rx_napi->napi);}return IRQ_HANDLED;
}/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd,struct napi_struct *napi)
{struct task_struct *thread;if (test_bit(NAPI_STATE_THREADED, &napi->state)) {/* Paired with smp_mb__before_atomic() in* napi_enable()/dev_set_threaded().* Use READ_ONCE() to guarantee a complete* read on napi->thread. Only call* wake_up_process() when it's not NULL.*/thread = READ_ONCE(napi->thread);if (thread) {if (thread->state != TASK_INTERRUPTIBLE)set_bit(NAPI_STATE_SCHED_THREADED, &napi->state);wake_up_process(thread);return;}}list_add_tail(&napi->poll_list, &sd->poll_list); //将napi->poll_list加入到sd->poll_list链表尾部__raise_softirq_irqoff(NET_RX_SOFTIRQ); //触发软中断,软中断处理函数中回调注册的poll接口
}
软中断处理接口net_rx_action:
static __latent_entropy void net_rx_action(struct softirq_action *h)
{/* 设置了两个限制:budget和time_limit。前者限制本次处理数据包的总量,后者限制本次处理总时间。只有二者均有剩余的情况下,才会继续处理 */struct softnet_data *sd = this_cpu_ptr(&softnet_data);unsigned long time_limit = jiffies +usecs_to_jiffies(netdev_budget_usecs); //设置软中断处理程序一次允许的最大执行时间2个jiffesint budget = netdev_budget;LIST_HEAD(list);LIST_HEAD(repoll);local_irq_disable();list_splice_init(&sd->poll_list, &list);local_irq_enable();for (;;) {struct napi_struct *n;if (list_empty(&list)) {if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))goto out;break;}/* 从sd->poll_list头部取出一个napi,即使现在硬中断抢占软中断,会把一个napi挂到pool_list的尾端软中断只会从pool_list 头部移除一个pool_list,这样不存在临界区*/n = list_first_entry(&list, struct napi_struct, poll_list);budget -= napi_poll(n, &repoll); //减去poll的数量/* 若时间已经过去2jiffes或无数据可poll */if (unlikely(budget <= 0 ||time_after_eq(jiffies, time_limit))) {sd->time_squeeze++;break;}}
/*下面会有把没执行完的NAPI挂到softnet_data尾部的操作,和硬中断存在临界区,所以关闭CPU本地中断(使CPU无法响应系统硬件中断) */local_irq_disable();list_splice_tail_init(&sd->poll_list, &list);list_splice_tail(&repoll, &list);list_splice(&list, &sd->poll_list);if (!list_empty(&sd->poll_list))//还有数据需要处理 就打开软中断__raise_softirq_irqoff(NET_RX_SOFTIRQ);// 设置softirq bitmask 等待 执行软中断回调net_rps_action_and_irq_enable(sd); //里面会打开CPU本地中断
out:__kfree_skb_flush();
}static int napi_poll(struct napi_struct *n, struct list_head *repoll)
{bool do_repoll = false;void *have;int work;list_del_init(&n->poll_list);have = netpoll_poll_lock(n);work = __napi_poll(n, &do_repoll);if (do_repoll)list_add_tail(&n->poll_list, repoll);netpoll_poll_unlock(have);return work;
}static int __napi_poll(struct napi_struct *n, bool *repoll)
{int work, weight;weight = n->weight;/* This NAPI_STATE_SCHED test is for avoiding a race* with netpoll's poll_napi(). Only the entity which* obtains the lock and sees NAPI_STATE_SCHED set will* actually make the ->poll() call. Therefore we avoid* accidentally calling ->poll() when NAPI is not scheduled.*/work = 0;if (test_bit(NAPI_STATE_SCHED, &n->state)) {/* NAPI的napi_struct是自己构造的,该结构上的poll钩子函数也是自己定义的。非NAPI的napi_struct结构是默认的,也就是per cpu的softnet_data>backlog,其poll钩子函数为process_backlog,在net_dev_init注册。*/work = n->poll(n, weight);trace_napi_poll(n, work, weight);}WARN_ON_ONCE(work > weight);if (likely(work < weight))return work;/* Drivers must not modify the NAPI state if they* consume the entire weight. In such cases this code* still "owns" the NAPI instance and therefore can* move the instance around on the list at-will.*/if (unlikely(napi_disable_pending(n))) {napi_complete(n);return work;}if (n->gro_bitmask) {/* flush too old packets* If HZ < 1000, flush all packets.*/napi_gro_flush(n, HZ >= 1000);}gro_normal_list(n);/* Some drivers may have called napi_schedule* prior to exhausting their budget.*/if (unlikely(!list_empty(&n->poll_list))) {pr_warn_once("%s: Budget exhausted after napi rescheduled\n",n->dev ? n->dev->name : "backlog");return work;}*repoll = true;return work;
}
static int mtk_napi_rx(struct napi_struct *napi, int budget)
{struct mtk_napi *rx_napi = container_of(napi, struct mtk_napi, napi);struct mtk_eth *eth = rx_napi->eth;struct mtk_rx_ring *ring = rx_napi->rx_ring;u32 status, mask;int rx_done = 0;int remain_budget = budget;mtk_handle_status_irq(eth);poll_again:mtk_w32(eth, MTK_RX_DONE_INT(ring->ring_no), MTK_PDMA_INT_STATUS);rx_done = mtk_poll_rx(napi, remain_budget, eth);if (unlikely(netif_msg_intr(eth))) {status = mtk_r32(eth, MTK_PDMA_INT_STATUS);mask = mtk_r32(eth, MTK_PDMA_INT_MASK);dev_info(eth->dev,"done rx %d, intr 0x%08x/0x%x\n",rx_done, status, mask);}if (rx_done == remain_budget)return budget;status = mtk_r32(eth, MTK_PDMA_INT_STATUS);if (status & MTK_RX_DONE_INT(ring->ring_no)) {remain_budget -= rx_done;goto poll_again;}if (napi_complete(napi))mtk_rx_irq_enable(eth, MTK_RX_DONE_INT(ring->ring_no));return rx_done + budget - remain_budget;
}
napi的状态NAPI_STATE_SCHED:
初始化时:netif_napi_add 会将 napi->state 设置set为调度状态。// set_bit(NAPI_STATE_SCHED, &napi->state);
open网卡时会enable napi,此时会clear 掉标志也就是非调度状态。// clear_bit(NAPI_STATE_SCHED, &n->state);
后续 net_rx_action 会处理报文,如果报文处理完了,就回调用napi_complete,此时会将状态设置位非调度状态,同时enable 网卡中断。