透过 Linux 内核看无锁编程(2)加锁的层级及无锁分析-1
 
- UID
- 1066743
|

透过 Linux 内核看无锁编程(2)加锁的层级及无锁分析-1
根据复杂程度、加锁粒度及运行速度,可以得出如下图所示的锁层级:
图 1. 加锁层级 其中标注为红色字体的方案为 Blocking synchronization,黑色字体为 Non-blocking synchronization。Lock-based 和 Lockless-based 两者之间的区别仅仅是加锁粒度的不同。图中最底层的方案就是大家经常使用的 mutex 和 semaphore 等方案,代码复杂度低,但运行效率也最低。
Linux 内核中的无锁分析Linux 内核可能是当今最大最复杂的并行程序之一,它的并行主要来至于中断、内核抢占及 SMP 等。内核设计者们为了不断提高 Linux 内核的效率,从全局着眼,逐步废弃了大内核锁来降低锁的粒度;从细处下手,不断对局部代码进行优化,用无锁编程替代基于锁的方案,如 seqlock 及 RCU 等;不断减少锁冲突程度、降低等待时间,如 Double-checked locking 和原子锁等。
内核无锁第一层级 — 少锁无论什么时候当临界区中的代码仅仅需要加锁一次,同时当其获取锁的时候必须是线程安全的,此时就可以利用 Double-checked Locking 模式来减少锁竞争和加锁载荷。目前 Double-checked Locking 已经广泛应用于单例 (Singleton) 模式中。内核设计者基于此思想,巧妙的将 Double-checked Locking 方法运用于内核代码中。
当一个进程已经僵死,即进程处于 TASK_ZOMBIE 状态,如果父进程调用 waitpid() 系统调用时,父进程需要为子进程做一些清理性的工作,代码如下所示:
清单 3. 少锁操作1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| 984 static int wait_task_zombie(task_t *p, int noreap,
985 struct siginfo __user *infop,
986 int __user *stat_addr, struct rusage __user *ru)
987 {
……
1103 if (p->real_parent != p->parent) {
1104 write_lock_irq(&tasklist_lock);
1105 /* Double-check with lock held. */
1106 if (p->real_parent != p->parent) {
1107 __ptrace_unlink(p);
1108 // TODO: is this safe?
1109 p->exit_state = EXIT_ZOMBIE;
……
1120 }
1121 write_unlock_irq(&tasklist_lock);
1122 }
……
1127 }
|
如果将 write_lock_irq 放置于 1103 行之前,锁的范围过大,锁的负载也会加重,影响效率;如果将加锁的代码放到判断里面,且没有 1106 行的代码,程序会正确吗?在单核情况下是正确的,但在双核情况下问题就出现了。一个非主进程在一个 CPU 上运行,正准备调用 exit 退出,此时主进程在另外一个 CPU 上运行,在子进程调用 release_task 函数之前调用上述代码。子进程在 exit_notify 函数中,先持有读写锁 tasklist_lock,调用 forget_original_parent。主进程运行到 1104 处,由于此时子进程先持有该锁,所以父进程只好等待。在 forget_original_parent 函数中,如果该子进程还有子进程,则会调用 reparent_thread(),将执行 p->parent = p->real_parent; 语句,导致两者相等,等非主进程释放读写锁 tasklist_lock 时,另外一个 CPU 上的主进程被唤醒,一旦开始执行,继续运行将会导致 bug。
严格的说,Double-checked locking 不属于无锁编程的范畴,但由原来的每次加锁访问到大多数情况下无须加锁,就是一个巨大的进步。同时从这里也可以看出一点端倪,内核开发者为了降低锁冲突率,减少等待时间,提高运行效率,一直在持续不断的进行改进。
内核无锁第二层级 — 原子锁 原子操作可以保证指令以原子的方式执行——执行过程不被打断。内核提供了两组原子操作接口:一组针对于整数进行操作,另外一组针对于单独的位进行操作。内核中的原子操作通常是内联函数,一般是通过内嵌汇编指令来完成。对于一些简单的需求,例如全局统计、引用计数等等,可以归结为是对整数的原子计算。
内核无锁第三层级 — Lock-free1. Lock-free 应用场景一 —— Spin Lock
Spin Lock 是一种轻量级的同步方法,一种非阻塞锁。当 lock 操作被阻塞时,并不是把自己挂到一个等待队列,而是死循环 CPU 空转等待其他线程释放锁。 Spin lock 锁实现代码如下:
清单 4. spin lock 实现代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| static inline void __preempt_spin_lock(spinlock_t *lock)
{
……
do {
preempt_enable();
while (spin_is_locked(lock))
cpu_relax();
preempt_disable();
} while (!_raw_spin_trylock(lock));
}
static inline int _raw_spin_trylock(spinlock_t *lock)
{
char oldval;
__asm__ __volatile__(
"xchgb %b0,%1"
:"=q" (oldval), "=m" (lock->lock)
:"0" (0) : "memory");
return oldval > 0;
}
|
汇编语言指令 xchgb 原子性的交换 8 位 oldval( 存 0) 和 lock->lock 的值,如果 oldval 为 1(lock 初始值为 1),则获取锁成功,反之,则继续循环,接着 relax 休息一会儿,然后继续周而复始,直到成功。
对于应用程序来说,希望任何时候都能获取到锁,也就是期望 lock->lock 为 1,那么用 CAS 原语来描述 _raw_spin_trylock(lock) 就是 CAS(lock->lock,1,0);
如果同步操作总是能在数条指令内完成,那么使用 Spin Lock 会比传统的 mutex lock 快一个数量级。Spin Lock 多用于多核系统中,适合于锁持有时间小于将一个线程阻塞和唤醒所需时间的场合。
pthread 库已经提供了对 spin lock 的支持,所以用户态程序也能很方便的使用 spin lock 了,需要包含 pthread.h 。在某些场景下,pthread_spin_lock 效率是 pthread_mutex_lock 效率的一倍多。美中不足的是,内核实现了读写 spin lock 锁,但 pthread 未能实现。
2. Lock -free 应用场景二 —— Seqlock
手表最主要最常用的功能是读时间,而不是校正时间,一旦后者成了最常用的功能,消费者肯定不会买账。计算机的时钟也是这个功能,修改时间是小概率事件,而读时间是经常发生的行为。以下代码摘自 2.4.34 内核:
清单 5. 2.4.34 seqlock 实现代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| 443 void do_gettimeofday(struct timeval *tv)
444 {
……
448 read_lock_irqsave(&xtime_lock, flags);
……
455 sec = xtime.tv_sec;
456 usec += xtime.tv_usec;
457 read_unlock_irqrestore(&xtime_lock, flags);
……
466 }
468 void do_settimeofday(struct timeval *tv)
469 {
470 write_lock_irq(&xtime_lock);
……
490 write_unlock_irq(&xtime_lock);
491 }
|
不难发现获取时间和修改时间采用的是 spin lock 读写锁,读锁和写锁具有相同的优先级,只要读持有锁,写锁就必须等待,反之亦然。
Linux 2.6 内核中引入一种新型锁——顺序锁 (seqlock),它与 spin lock 读写锁非常相似,只是它为写者赋予了较高的优先级。也就是说,即使读者正在读的时候也允许写者继续运行。当存在多个读者和少数写者共享一把锁时,seqlock 便有了用武之地,因为 seqlock 对写者更有利,只要没有其他写者,写锁总能获取成功。根据 lock-free 和时钟功能的思想,内核开发者在 2.6 内核中,将上述读写锁修改成了顺序锁 seqlock,代码如下:
清单 6. 2.6.10 seqlock 实现代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| static inline unsigned read_seqbegin(const seqlock_t *sl)
{
unsigned ret = sl->sequence;
smp_rmb();
return ret;
}
static inline int read_seqretry(const seqlock_t *sl, unsigned iv)
{
smp_rmb();
return (iv & 1) | (sl->sequence ^ iv);
}
static inline void write_seqlock(seqlock_t *sl)
{
spin_lock(&sl->lock);
++sl->sequence;
smp_wmb();
}
void do_gettimeofday(struct timeval *tv)
{
unsigned long seq;
unsigned long usec, sec;
unsigned long max_ntp_tick;
……
do {
unsigned long lost;
seq = read_seqbegin(&xtime_lock);
……
sec = xtime.tv_sec;
usec += (xtime.tv_nsec / 1000);
} while (read_seqretry(&xtime_lock, seq));
……
tv->tv_sec = sec;
tv->tv_usec = usec;
}
int do_settimeofday(struct timespec *tv)
{
……
write_seqlock_irq(&xtime_lock);
……
write_sequnlock_irq(&xtime_lock);
clock_was_set();
return 0;
}
|
Seqlock 实现原理是依赖一个序列计数器,当写者写入数据时,会得到一把锁,并且将序列值加 1。当读者读取数据之前和之后,该序列号都会被读取,如果读取的序列号值都相同,则表明写没有发生。反之,表明发生过写事件,则放弃已进行的操作,重新循环一次,直至成功。不难看出,do_gettimeofday 函数里面的 while 循环和接下来的两行赋值操作就是 CAS 操作。
采用顺序锁 seqlock 好处就是写者永远不会等待,缺点就是有些时候读者不得不反复多次读相同的数据直到它获得有效的副本。当要保护的临界区很小,很简单,频繁读取而写入很少发生(WRRM--- Write Rarely Read Mostly)且必须快速时,就可以使用 seqlock。但 seqlock 不能保护包含有指针的数据结构,因为当写者修改数据结构时,读者可能会访问一个无效的指针。 |
|
|
|
|
|