Linux 内核提供的四种中断下半部中 softirq(软中断)、tasklet(小任务)、workqueue(工作队列) 、request thread(中断线程)中的其中一种,其效率仅次于软中断,但远高于request thread 和 workqueue。
如网卡的 fifo 半满中断触发,被 cpu0 处理,cpu0 会在关闭中断后,将数据从网卡的 fifo 拷贝到 ram 之后触发软中断,再打开中断,基于谁触发谁处理原则,cpu0 会继续执行软中断服务函数。若网卡的 fifo 全满中断有再次触发,就会被 cpu1 处理,同样是关闭中断后拷贝数据再开启中断,再去触发和执行软中断进行网卡数据包处理。若此时 cpu0\cpu1 都还在软中断处理数据,网卡再次产生中断,那么 cpu2 就会继续相同的流程。由此可见,软中断充分利用的多 cpu 进行并发处理,因此性能非常高,但也同时因为并发的存在,就需要考虑临界区的问题。
由于 tasklet 基于 softirq 的基础实现,为了易用性考虑,同一种 tasklet 在多个 cpu 上不会并行执行,因此不存在并发问题,在使用上就可以少一些顾虑。但也正是因为不存在并发,导致了性能较之 softirq 差一些。
原因是因为前者是在软中断上下文件工作(意味着不能调用任何阻塞的接口),而后两者是在进程上下文工作(实质上是在内核线程里面执行)。
#include <linux/module.h>
#include <linux/init.h>
#include <linux/kernel.h>
#include <linux/interrupt.h>
static struct tasklet_struct my_tasklet;
static void my_tasklet_handle(unsigned long data)
{
printk("tasklet handle running...\n");
}
static irqreturn_t xxx_interrupt(int irq, void *dev_id)
{
// 调度 tasklet
tasklet_schedule(&my_tasklet);
}
static int __init demo_driver_init(void)
{
// 初始化一个 tasklet ,关联处理函数
tasklet_init(&my_tasklet, my_tasklet_handle, 0);
request_irq(xxx, xxx_interrupt, IRQF_SHARED, xxx, xxx);
return 0;
}
static void __exit demo_driver_exit(void)
{
tasklet_kill(&my_tasklet);
return ;
}
module_init(demo_driver_init);
module_exit(demo_driver_exit);
MODULE_LICENSE("GPL v2");
Linux 内核被启动后,会执行 start_kernel() 函数,tasklet 是基于 softirq 实现的,会在 softirq_init() 里面进行必要的初始化,主要是初始化 tasklet 链表和与相应的软中断号建立关联。
tasklet_hi_action 是高优先级的 tasklet,tasklet_action 是普通的 tasklet,两者实现原理都一样。
// kernel\linux-4.9\init\main.c
asmlinkage __visible void __init start_kernel(void)
{
...
softirq_init();
...
}
// kernel\linux-4.9\kernel\softirq.c
void __init softirq_init(void)
{
int cpu;
// 初始化链表
for_each_possible_cpu(cpu) {
per_cpu(tasklet_vec, cpu).tail =
&per_cpu(tasklet_vec, cpu).head;
per_cpu(tasklet_hi_vec, cpu).tail =
&per_cpu(tasklet_hi_vec, cpu).head;
}
// 建立TASKLET_SOFTIRQ、HI_SOFTIRQ软中断号的对应服务接口
open_softirq(TASKLET_SOFTIRQ, tasklet_action);
open_softirq(HI_SOFTIRQ, tasklet_hi_action);
}
当调用 tasklet_schedule() 时,如果该 tasklet 没有被设置 TASKLET_STATE_SCHED 标记时,才会加入链表内,如果已经设置了 TASKLET_STATE_SCHED 了,那么就会忽略此次的 tasklet _schedule(),这就意味着如果在极短的时间内调用 tasklet_schedule() 只会触发一次(这里可能会存在丢失中断事件的情况)。之后会通过 raise_softirq_irqoff() 启用 TASKLET_SOFTIRQ 软中断。
哪颗 cpu 受理该软中断,就将 tasklet 加入到该 cpu 的 tasklet 链表内,由于相同的软中断可以同时被其他 cpu 触发执行,因此会出现 cpu0\cpu1 的 tasklet 链表内有同一个 tasklet 的情况。
// kernel\linux-4.9\include\interrupt.h
static inline void tasklet_schedule(struct tasklet_struct *t)
{
if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
__tasklet_schedule(t);
}
// kernel\linux-4.9\kernel\softirq.c
void __tasklet_schedule(struct tasklet_struct *t)
{
unsigned long flags;
// 将指定的 tasklet 加入到链表内并设置软中断
local_irq_save(flags);
t->next = NULL;
*__this_cpu_read(tasklet_vec.tail) = t;
__this_cpu_write(tasklet_vec.tail, &(t->next));
raise_softirq_irqoff(TASKLET_SOFTIRQ);
local_irq_restore(flags);
}
在初始化的时候已经为 TASKLET_SOFTIRQ 软中断与 tasklet_action() 建立关联的关系,因此软中断触发时,就会调用 tasklet_action(),这里是精髓部分。
多次触发软中断时,当前 cpu 只能同一时间执行一次相同的软中断,但是如果有多个 cpu 的话,那么会有多个 cpu 并发执行软中断,所以下面的 tasklet_action() 要想到这一点。
1. tasklet_action() 屏蔽当前 CPU 中断,获取当前 CPU 的 tasklet 链表并清空原有的链表,在恢复中断。避免在操作链表的过程中,被硬件中断打断。
2. 开始遍历链表取出 tasklet,先 TASKLET_STATE_RUN 标记确定该 tasklet 是否已经被其它 cpu 执行,因为该 tasklet 在执行的过程中,又被加入到当前的 cpu 的 tasklet 链表内。
3. 如果没有被执行,就继续检查该 tasklet 是否被 tasklet_disable(),如果有被 disable 就清除 TASKLET_STATE_RUN 标记,这样可以重新被添加会当前 tasklet 链表内,等待再次执行。如果有被执行,即使又被设置了 TASKLET_STATE_RUN 标记,也会在第 5 步执行完成后,会清除掉该标记。
4. 如果没有被 disable,那么就清除 TASKLET_STATE_SCHED 标记,该标记一旦被清除,就意味着在 tasklet 执行期间,tasklet_schedule() 可以继续添加新的 tasklet 其他 cpu 的 tasklet 链表内。如果当前 cpu 已经完成了 tasklet_action() ,新的 tasklet 也可能会重新添加到当前的 tasklet 链表。
5. 这里就会执行通过 tasklet_init() 绑定的 func,也就是示例中的 my_tasklet_handle(),执行完成后再清除 TASKLET_STATE_RUN 标记,继续下一个 tasklet。
6. 能走到这一步,会有两种情况,一种情况是即将执行 tasklet ,发现已经被 disable 掉了,另外一种情况是 tasklet 已经在其它 CPU 上执行中。无论哪种情况,都会将当前的 tasklet 重新放回到当前 cpu 的 tasklet 链表内,并调用 __raise_softirq_irqoff() 重新触发软中断(应该是启用该软中断)。
注意,以上获取 TASKLET_STATE_RUN 和 TASKLET_STATE_SCHED 标记都是位原子操作,所以不会出现因并发引发的问题。
// kernel\linux-4.9\kernel\softirq.c
// 某 CPU 要调度各个 tasklet 的实现
static __latent_entropy void tasklet_action(struct softirq_action *a)
{
struct tasklet_struct *list;
// 1 ------------------------------------------------------
local_irq_disable();
list = __this_cpu_read(tasklet_vec.head); // 获取 tasklet 链表
__this_cpu_write(tasklet_vec.head, NULL); // 清空 tasklet 链表
__this_cpu_write(tasklet_vec.tail, this_cpu_ptr(&tasklet_vec.head));
local_irq_enable();
// 如果存在 tasklet 就会进入循环
while (list)
{
struct tasklet_struct *t = list;
list = list->next;
// 2 ------------------------------------------------------
// TASKLET_STATE_SCHED: 表示该 tasklet 已经被挂接到某个 CPU 上
// TASKLET_STATE_RUN: 表示该 tasklet 正在某个 CPU 上执行
// 检查并设置 TASKLET_STATE_RUN 标记
// 返回 1: 表示 tasklet 没有被执行 返回 0: 表示 tasklet 已经被执行
if (tasklet_trylock(t))
{
// 3 ------------------------------------------------------
// 如果当前 tasklet 没有被 tasklet_disable()
if (!atomic_read(&t->count))
{
// 4 ----------------------------------------------
// 清除 TASKLET_STATE_SCHED 状态,便于该 tasklet 可以再次被触发
if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
BUG();
// 5 ----------------------------------------------
// 这期间,该 tasklet 可以被 tasklet_schedule(),从而引出下面第二种情况
t->func(t->data); // 如果没有执行且没有 disable 则执行
tasklet_unlock(t); // 清理 TASKLET_STATE_RUN 标记
continue; // 继续下一个 tasklet
}
// 如果当前已经被 disable 了,那就清理 TASKLET_STATE_RUN 标记
tasklet_unlock(t);
}
// 6 -----------------------------------------------------
// 有两种情况下,会将该 tasklet 再挂接回链表内,并重新触发,等待下一次执行的机会
// 1. 如果没有被执行,但是被调用 tasklet_disable() 接口 disable 了
// 2. 当前 tasklet 已经在其它 CPU 正在执行 func 这时候 tasklet 又会被挂回在
// 原来的链表中,为了满足同一种类型的 tasklet 只能在一个 CPU 上执行的设计
// 因此此次不执行 tasklet,挂入链表后等待下一次被执行的时机执行
local_irq_disable();
t->next = NULL;
*__this_cpu_read(tasklet_vec.tail) = t;
__this_cpu_write(tasklet_vec.tail, &(t->next));
__raise_softirq_irqoff(TASKLET_SOFTIRQ);
local_irq_enable();
}
}
贴出判断和标记和清除 tasklet 运行的 TASKLET_STATE_RUN 标记代码
// kernel\linux-4.9\include\interrupt.h
static inline int tasklet_trylock(struct tasklet_struct *t)
{
return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
}
// kernel\linux-4.9\include\interrupt.h
static inline void tasklet_unlock(struct tasklet_struct *t)
{
smp_mb__before_atomic();
clear_bit(TASKLET_STATE_RUN, &(t)->state);
}
贴出关闭 tasklet 执行的代码
// kernel\linux-4.9\include\interrupt.h
static inline void tasklet_disable(struct tasklet_struct *t)
{
tasklet_disable_nosync(t);
tasklet_unlock_wait(t);
smp_mb();
}
// kernel\linux-4.9\include\interrupt.h
// 关闭 tasklet(实际上应该理解为暂停调度执行)
static inline void tasklet_disable_nosync(struct tasklet_struct *t)
{
// 这里对整型原子操作 count 自增了,对应 tasklet_action() 里面的 atomic_read()
atomic_inc(&t->count);
smp_mb__after_atomic();
}
// kernel\linux-4.9\include\interrupt.h
static inline void tasklet_enable(struct tasklet_struct *t)
{
// 这里对整型原子操作 count 自减了,对应 tasklet_action() 里面的 atomic_read()
smp_mb__before_atomic();
atomic_dec(&t->count);
}
tasklet_init() 实现与用户函数关联的接口,也是很简单
// kernel\linux-4.9\kernel\softirq.c
void tasklet_init(struct tasklet_struct *t,
void (*func)(unsigned long), unsigned long data)
{
t->next = NULL;
t->state = 0;
atomic_set(&t->count, 0);
t->func = func;
t->data = data;
}
tasklet_kill() 主要实现是尽可能快的让 tasklet 得到执行,等待执行完成后再退出。
1. 判断该 tasklet 是否已经被挂接到某个 cpu 的 tasklet 链表内,如果有挂接到,那么就立即让出 cpu,直至 tasklet 清除 TASKLET_STATE_SCHED 标记(参考 tasklet_action() 第 4 个步骤),进入运行状态。
2. 如果 tasklet 没有挂接或者一旦进入到执行状态,那么就会不停的检测 TASKLET_STATE_RUN 是否有被清除,被清除的话说明已经运行完成(参考 tasklet_action() 第5个步骤),可以放心的退出了。
// kernel\linux-4.9\kernel\softirq.c
void tasklet_kill(struct tasklet_struct *t)
{
if (in_interrupt())
pr_notice("Attempt to kill tasklet from interrupt\n");
// 1 ----------------------------------------------
while (test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) {
do {
yield();
} while (test_bit(TASKLET_STATE_SCHED, &t->state));
}
// 2 ----------------------------------------------
tasklet_unlock_wait(t);
clear_bit(TASKLET_STATE_SCHED, &t->state);
}
// kernel\linux-4.9\include\interrupt.h
static inline void tasklet_unlock_wait(struct tasklet_struct *t)
{
while (test_bit(TASKLET_STATE_RUN, &(t)->state)) { barrier(); }
}
至此,tasklet 关键的实现原理分析完成,实际上还应联合 softirq,才算完整的了解整个机制。
1. 每颗 cpu 都有自己的 tasklet 链表,这样可以将 tasklet 分布在各个 cpu 上,可实现并发不同的 tasklet。
2. 相同的 tasklet 只能在某一颗 cpu 上串行执行,其它 cpu 会暂时避让,在此情况下,不需要考虑并发问题(即不需要加锁)。
3. tasklet_schedule() 接口调用时,如果 tasklet 还未被执行,或者处于 disable 期间,指定的 tasklet 不会被加入链表内,即该请求不会被受理。
4. tasklet_disable() 接口只是暂时停止指定的 tasklet 执行,依然会被加回待执行链表内。而在 disable 期间,相同的 tasklet 将无法被加入链表调度。