内核信号量类似于自旋锁,当锁关闭时,它不允许内核控制路径继续执行。与自旋锁不同的是,当内核控制路径试图获取内核信号量所保护的忙资源时,相应的进程被挂起,进而会导致进程切换;而自旋锁不会导致进程切换。因此,只有可以睡眠的函数才能获取内核信号量;中断处理程序和可延迟函数都不能使用内核信号量。
内核信号量结构如下:
/**
* 内核信号量结构
*/
struct semaphore {
/**
* 如果该值大于0,表示资源是空闲的。如果等于0,表示信号量是忙的,但是没有进程在等待这个资源。
* 如果count为负,表示资源忙,并且至少有一个进程在等待。
* 但是请注意,负值并不代表等待的进程数量。
*/
atomic_t count;
/**
* 存放一个标志,表示是否有一些进程在信号量上睡眠。
* 如果没有进程在信号量等待队列上睡眠时,sleeper通常为0,否则为1(这点很重要,见下面的down函数)
*/
int sleepers;
/**
* 存放等待队列链表的地址。当前等待资源的所有睡眠进程都放在这个链表中。
* 如果count>=0,那么这个链表就应该是空的。
*/
wait_queue_head_t wait;
};
当进程希望释放内核信号量锁时,就调用up()函数,如下:
static inline void up(struct semaphore * sem)
{
__asm__ __volatile__(
"# atomic up operation\n\t"
/**
* 首先增加count的值
*/
LOCK "incl %0\n\t" /* ++sem->count */
/**
* 测试count值,如果当前小于等于0,那么有进程在等待,跳到2f,唤醒等待进程。
*/
"jle 2f\n"
/**
* 运行到这里表示count>0,不用做任何事情,返回。
* 注意1后面的代码在单独的段中。此处就是函数的结束处。
*/
"1:\n"
LOCK_SECTION_START("")
/**
* 调用__up_wakeup,注意它是从寄存器传参的。
* 它最终调用的是__up,再调用wakeup。
* eax寄存器中传递的是第一个参数sem
*/
"2:\tlea %0,%%eax\n\t"
"call __up_wakeup\n\t"
"jmp 1b\n"
LOCK_SECTION_END
".subsection 0\n"
:"=m" (sem->count)
:
:"memory","ax");
}
fastcall void __up(struct semaphore *sem)
{
/*
* 这里只是唤醒,并不一定能马上恢复执行,所以没有把进程从等待队列移除
* 移除操作是在__down函数中执行的
*/
wake_up(&sem->wait);
}
当进程希望获取内核信号量锁时,调用down()函数,如下:
static inline void down(struct semaphore * sem)
{
might_sleep();
__asm__ __volatile__(
"# atomic down operation\n\t"
/**
* 首先减少并检查sem->count的值
* 如果为负(说明减少前就是0或负)就挂起
* 这里的减一操作是原子的
* 请注意当count<0时,此时-1是不正确的,因为调用进程会被挂起,而没有真正的获得信号量。
* 它恢复count值的时机,不在down中,在__down中。
*/
LOCK "decl %0\n\t" /* --sem->count */
"js 2f\n"
"1:\n"
/**
* 为负了,调用__down_failed
* __down_failed会保存参数并调用__down
*/
LOCK_SECTION_START("")
"2:\tlea %0,%%eax\n\t"
"call __down_failed\n\t"
"jmp 1b\n"
LOCK_SECTION_END
:"=m" (sem->count)
:
:"memory","ax");
}
__down()如下:
/**
* 当申请信号量失败时,调用__down使线程挂起。直到信号量可用。
* 本质上,它将线程设置为TASK_UNINTERRUPTIBLE并将进程放到信号量的等待队列。
*/
fastcall void __sched __down(struct semaphore * sem)
{
struct task_struct *tsk = current;
/*
* 将当前进程设置为等待队列的结构类型,包括设置唤醒函数
*/
DECLARE_WAITQUEUE(wait, tsk);
unsigned long flags;
/**
* 设置状态为TASK_UNINTERRUPTIBLE。
*/
tsk->state = TASK_UNINTERRUPTIBLE;
/**
* 在将进程放到等待队列前,先获得锁,并禁止本地中断。
*/
spin_lock_irqsave(&sem->wait.lock, flags);
/**
* 等待队列的__locked版本假设在调用函数前已经获得了自旋锁。
* 请注意加到等待队列上的睡眠进程是互斥的。这样wakeup最多唤醒一个进程。
* 将进程链入等待队列中
*/
add_wait_queue_exclusive_locked(&sem->wait, &wait);
sem->sleepers++;
for (;;) {
int sleepers = sem->sleepers;
/*
* atomic_add_negative把第一个参数加到第二个参数中,并测试第二个参数是否为负,如果为负,返回1,否则返回0
* 若count为负,则sleeper将被设为0
* 若sleeper为1,则count不变
* count不会小于-1,如果进入down前count为-1,则sleeper应该为1,则down将count置为-2,由于上面sleeper++变为2,函数调用后count还是变为-1
*/
if (!atomic_add_negative(sleepers - 1, &sem->count)) {
sem->sleepers = 0;
break;
}
sem->sleepers = 1;
spin_unlock_irqrestore(&sem->wait.lock, flags);
/*
* 这里进程会被挂起或者恢复执行
* 如果恢复执行则再次进入循环,可以验证上述atomic_add_negative函数仍能正确执行,
* 到这里会觉得上面的设计(atomic_add_negative)非常神奇,个人认为可能是经受各种考验之后设计的吧
*/
schedule();
spin_lock_irqsave(&sem->wait.lock, flags);
tsk->state = TASK_UNINTERRUPTIBLE;
}
/**
* 将进程从等待队列中移除,注意释放信号量是只是唤醒,唤醒之后可能会再次进入睡眠,所以真正移除是在这里
*/
remove_wait_queue_locked(&sem->wait, &wait);
/**
* 既然上面的进程已经唤醒并恢复执行了,为什么这里还要唤醒一次能,因为有可能前面已经唤醒了很多个进程
* 但是那些进程还没调度执行(有可能要很久之后才执行,所以这段时间就不要浪费,给另外进程咯),所以这里会试图再唤醒一个进程
*/
wake_up_locked(&sem->wait);
spin_unlock_irqrestore(&sem->wait.lock, flags);
tsk->state = TASK_RUNNING;
}