一、本文的目的在linux下有两种实现数据互斥的基本机制,包括了semaphore(信号量),spinlock(自旋锁)。这里要说的读写锁(read write lock)是自旋锁的一个变种,与一般的自旋锁的区别是,自旋锁一次只能有一个进程进入临界区,而对读写锁而言,如果进程是读的话,那就可以有多个进程同时进入临界区,而如果是写的话,则只有一个可以。
就现在的linux内核源代码的发行版本而言,已经实现了读写锁的一个类型,就是读者优先的读写锁。(在这个设计中,作为读的请求可以更容易的进入临界区,而写的请求的请求往往容易受阻,这个我在后面会分析),而我要设计的读写锁,则是以写进程为优先的考虑的对象,如果有写的请求发出,则它会在被允许的最快时间内得到响应。这样的好处是在一个由很多客户端以读的权限访问的服务器(如一般的公众服务器),如果管理员对服务器的某些内容或配置进行修改的话,那它的及时性就有可能无法满足。这有时是不可以被接受的。
二、linux现有的读写锁状况我先来分析现在linux内核源代码中的读写锁的实现方式,这样就可以很容易的理解后面的写者优先的读写锁的设计。
先介绍一个数据结构,这是在读写锁起到重要作用。
(注:下面所有的内核源代码均来自linux 2.4.17,如果与你的现有的内核源代码不同,请你作一些相应的改变就可以了,原理部分没有变化)
1
2
3
4
5
6
| typedef struct {
volatile unsigned int lock;
#if SPINLOCK_DEBUG
unsigned magic;
#endif
} rwlock_t;
|
这里的magic是用于调试的,而lock就是允许可以加的读锁数。
这个代码在linux/include/asm-i386/spinlock.h中定义了read_lock和write_lock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| static inline void read_lock(rwlock_t *rw)
{
#if SPINLOCK_DEBUG
if (rw->magic != RWLOCK_MAGIC)
BUG();
#endif
__build_read_lock(rw, "__read_lock_failed");
}
static inline void write_lock(rwlock_t *rw)
{
#if SPINLOCK_DEBUG
if (rw->magic != RWLOCK_MAGIC)
BUG();
#endif
__build_write_lock(rw, "__write_lock_failed");
}
|
注意这里有两个参数,一个是rw就是允许的读锁数,而后面一个参数是如果加锁失败的话,处理失败的函数。
在这里真正调用的对write_lock是__build_write_lock_const或__build_write_lock_ptr,对read_lock中调用的是__build_read_lock_const或__build_read_lock_ptr,这里的取决因素是调用参数的操作指令寻址方式。我们这里只看const类
1
2
3
4
5
6
7
8
9
10
11
12
| #define __build_write_lock_const(rw, helper) \
asm volatile(LOCK "subl $" RW_LOCK_BIAS_STR ",(%0)\n\t" \
"jnz 2f\n" \
"1:\n" \
".section .text.lock,\"ax\"\n" \
"2:\tpushl %%eax\n\t" \
"leal %0,%%eax\n\t" \
"call " helper "\n\t" \
"popl %%eax\n\t" \
"jmp 1b\n" \
".previous" \
:"=m" (*(volatile int *)rw) : : "memory")
|
这里的RW_LOCK_BIAS_STR就是 "0x01000000",取这个值的原因是这个值足够大,可以使满足读的请求足够多。
在".section .text.lock,\"ax\"\n" \
".previous" \
中的内容是把这一段的代码汇编到一个叫.text.lock的节中,并且这个节的属性是可重定位和可执行的,这样在代码的执行过程中,因为不同的节会被加载到不同的页面中,所以如果在前面不出现jmp,就在1:处结束了。而call的是在前面的write_lock中所写入的__write_lock_failed,这个函数在arch/asm-i386/kernel/semaphore.c中定义
1
2
3
4
5
6
7
8
9
10
| .align
.globl __write_lock_failed
__write_lock_failed:
" LOCK "addl $" RW_LOCK_BIAS_STR ",(%eax)
1: rep; nop
cmpl $" RW_LOCK_BIAS_STR ",(%eax)
jne 1b
" LOCK "subl $" RW_LOCK_BIAS_STR ",(%eax)
jnz __write_lock_failed
ret
|
这里的LOCK是一个在SMP体系结构中锁住总线,不让其它的CPU访问内存。在这里先" LOCK "addl $" RW_LOCK_BIAS_STR ",(%eax)是为了防止在后面的自旋等待中,不会让后面的读请求受阻,要不然的话,就会出现死锁,这是致命的。而
1
2
3
| 1: rep; nop
cmpl $" RW_LOCK_BIAS_STR ",(%eax)
jne 1b
|
就是不断的检查锁是否可以得到,得不到就会nop,这种方法可以在不改变lock的值的情况下实现等待,这就不用LOCK,这样的锁住总线了。最后如果得到锁就
1
| " LOCK "subl $" RW_LOCK_BIAS_STR ",(%eax)
|
这样就实现了write_lock的功能。
对读锁也是类似
1
2
3
4
5
6
7
8
9
10
11
12
| #define __build_read_lock_const(rw, helper) \
asm volatile(LOCK "subl $1,%0\n\t" \
"js 2f\n" \
"1:\n" \
".section .text.lock,\"ax\"\n" \
"2:\tpushl %%eax\n\t" \
"leal %0,%%eax\n\t" \
"call " helper "\n\t" \
"popl %%eax\n\t" \
"jmp 1b\n" \
".previous" \
:"=m" (*(volatile int *)rw) : : "memory")
|
但这里要注意一点,就是对read_lock而言,只要减1并且只要这个值不为负的话,就可以得到锁了,但rw.lock的值在被初始化的时候就被赋值成了0x01000000,这个值足够大,而要减的很小,所以要获得读锁是很容易的,从理论上说比得到写锁要容易0x01000000倍,这就是我前面说现在在linux内部实现的读写锁是读者优先的。而这个数也让我们容易理解在要获得写锁时,要对这个lock值 减去0x01000000,就是如果有一个读或者写请求在临界区内的话,第二个写请求就无法得到写锁了。
而如果得不到读锁,所要跳的是在read_lock所指明的__read_lock_failed
1
2
3
4
5
6
7
8
9
10
| .align 4
.globl __read_lock_failed
__read_lock_failed:
lock ; incl (%eax)
1: rep; nop
cmpl $1,(%eax)
js 1b
lock ; decl (%eax)
js __read_lock_failed
ret
|
这里的道理与前面的__write_lock_failed中的道理是相似的。 |