三、写者优先的读写锁的实现那既然要实现以写者为优先的读写锁,很自然,我们就想到了在读的请求发生时,不先去试图获得读锁,而是去检查有没有写的请求正在等待,如果有写的请求正在等待,则读的请求必须先处于等待状态。让写的请求完成之后,发现已经没有写的请求在等待了,才去试图获得读的锁。
这里我们先来设计rwlock_t这个数据结构,
1
2
3
4
5
6
7
8
9
| typedef struct {
volatile unsigned int lock;
#if WLOCK_PRIORITY
volatile unsigned int wlock_waiting;
#endif
#if SPINLOCK_DEBUG
unsigned magic;
#endif
} rwlock_t;
|
这里所增加的这个wlock_waiting就是作为检测是否有写的请求在等待的标志数。如果这个数不等于0则说明已经有写的请求在等待。它的负值的大小决定了写请求等待的个数。
这里我们先修改__build_write_lock_const中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| #define __build_write_lock_const(rw, wlock_wait,helper,helper1) \
asm volatile(
"cmpl $0,(%1)\n\t" \
"jnz 3f\n" \
"1:\t" LOCK "subl $" RW_LOCK_BIAS_STR ",(%0)\n\t" \
"jnz 4f\n" \
"2:\n" \
".section .text.lock,\"ax\"\n" \
"3:\tpushl %%ebx\n\t" \
"leal %1,%%ebx\n\t" \
"call " helper1 "\n\t" \
"popl %%ebx\n\t" \
"jmp 1b\n" \
"4:\tpushl %%eax\n\t" \
pushl %%ebx\n\t" \
"leal %0,%%eax\n\t" \
"leal %1,%%ebx\n\t" \
"call " helper "\n\t" \
"popl %%ebx\n\t" \
"popl %%eax\n\t" \
"jmp 2b\n" \
".previous" \
:"=m" (*(volatile int *)rw) ,"=m" (*(volatile int *)wlock_waiting) : : "memory")
|
这里的新增加的wlock_waiting是表示前面所定义的wlock_waiting的地址,这个是地址,而不是值本身,它的原因是指令寻址的方式决定的,为了保证指令的操作在直接对内存,而不是把内存中的数据load到寄存器中,再进行处理后放回到内存中,如果不是这样的话,就有可能使这个变量的在寄存器内的处理时被其它的cpu在它们的寄存器中被改变。而helper1则是知道已经有了写请求在等待得到锁,而跳转的处理地址。这里我取的名字是__read_lock_failed_wlock_wait
1
2
3
4
5
6
7
8
| .align 4
.globl __read_lock_failed_wlock_wait
__read_lock_failed_wlock_wait:
1: rep; nop
cmpl $0,(%ebx)
jnz 1b
js __read_lock_failed_wlock_wait
ret
|
这里就是不断的检查wlock_waiting的数值是否为0, 如果不是0就要执行空转指令。
而helper就是取前面的__read_lock_failed的名字,但有一点的变化。
1
2
3
4
5
6
7
8
9
10
11
12
13
| .align 4
.globl __read_lock_failed
__read_lock_failed:
lock ; incl (%eax)
1: rep; nop
cmpl $0,(%ebx)
jnz 1b
rep; nop
cmpl $1,(%eax)
js 1b
lock ; decl (%eax)
js __read_lock_failed
ret
|
这里的还是要不断的检查是否有写请求在等待,因为如果不是这样的话,在前面的指令的跳转过程中就有可能有写的请求到来,而我们是要严格的执行写者优先的读写规则。如果在试图得到读锁过程中失败,也要跳转到检查写请求的地址,也是这个原因。
对于写锁的获得也要修改。
1
2
3
4
5
6
7
8
9
10
11
12
13
| #define __build_write_lock_const(rw,wlock_wait, helper) \
asm volatile(LOCK "subl $1,(%1)\n\t" \
LOCK "subl $" RW_LOCK_BIAS_STR ",(%0)\n\t" \
"jnz 2f\n" \
"1:\n"LOCK "addl $1,(%1)\n\t" \
".section .text.lock,\"ax\"\n" \
"2:\tpushl %%eax\n\t" \
"leal %0,%%eax\n\t" \
"call " helper "\n\t" \
"popl %%eax\n\t" \
"jmp 1b\n" \
".previous" \
:"=m" (*(volatile int *)rw) ,"=m" (*(volatile int *)wlock_waiting) : : "memory")
|
这里与在__build_read_lock_const中的原理类似,但有一点不同,就是一旦要试图取得写锁的时候,要先对wlock_waiting这个数行减1,如果得到就要加1,这是为了让同样的读的请求可以知道已经有写的请求在等待获得读写锁了。而这里应用了addl 与subl而不是incl 与decl是因为后两个指令只能保证在24个bits的情况下有效,而前两个指令可以对32个bits情况下有效,而采用先减再加的策略,而不是先加后减,是因为如果一减,那原来是0就变成了0xffffffff,这样所有的位都是1对于硬件的可靠性加强。而对于helper与前面的读者优先的读写锁是相同的。
上面只是给出了在修改读写锁使其是写优先的最主要的内容,其实如果真要实现必须修改十处以上,这里由于篇幅的关系,我把修改好的代码在提供在这里下载:
四、小结这里对linux内核中的读写锁进行了写者优先的修改,这种修改从代码的内容上看比读者优先要增加了进行的成本,特别是如果有一个写请求在临界区外面等待,那可能会有很多的读请求在__read_lock_failed_wlock_wait中进行空转。但如果考虑到写请求与读请求的发生概率可能是1:100甚至更小,而且对系统刷新要求的高标准。那么这一点的损失是值得的,尤其是对路由器,或是实时要求很高的信息发布平台上(如证券)就应该如果。这在代码上是"大巧若拙"。而如果把这个代码在一般的PC机平台上应用,可能这样只会是"弄巧成拙"了。从这当中看出开放源代码的精神之所在,让用户实现自己最佳的配置与功能。 |