首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

读写自旋锁详解,第 2 部分 基于简单共享变量的实现(2)

读写自旋锁详解,第 2 部分 基于简单共享变量的实现(2)

写者优先读写自旋锁我们在清单 2 的代码基础上实现写者优先的读写自旋锁,关键之处是用号码分配的方式确定写者的到来顺序 [3]。使用 2 个整型变量,一个用于存放下一个分配的写者号码 writer_requests,一个用于存放下一个允许执行的写者号码 writer_completions。初始化的时候,将二者均置为 0。
有兴趣的读者朋友也可以在清单 3 的代码基础上实现写者优先的读写自旋锁。
写者到来的时候以当前的 writer_requests 值作为自己的号码 id,并原子地递增 writer_requests。当 id == writer_completions 时,表明先来的写者已经全部离开,但是可能有读者持有锁,因此写者还得检查持有锁的读者数目 nr_readers 是否为 0。写者释放锁的时候,增加 writer_completions,通知下一个等待写者。对写者 W 而言,所谓在 W 后面到来的申请线程是指在 W 获得号码之后开始执行 reader_lock() 和 writer_lock() 的线程。
对读者而言,当 writer_requests == writer_completions 时,表明当前已经没有写者,即无写者持有锁,也无等待写者,但这并不表示读者可以马上获得锁,因为可能在准备尝试获取锁的时候又有新的写者到来并获得锁。
清单 4. 基于简单共享变量的写者优先实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#define WAFLAG 1
#define RC_INCR 2

typedef struct {
    atomic_t rdr_cnt_and_flag __cacheline_aligned_in_smp;
    atomic_t writer_requests __cacheline_aligned_in_smp;
    atomic_t writer_completions __cacheline_aligned_in_smp;
} rwlock_t;

void init_lock(rwlock_t *lock)
{
    lock->rdr_cnt_and_flag = ATOMIC_INIT(0);
    lock->writer_requests = ATOMIC_INIT(0);
    lock->writer_completions = ATOMIC_INIT(0);
}

void reader_lock(rwlock_t *lock)
{
    while (atomic_read(&lock->writer_completions) !=         //(a)
atomic_read(&lock->writer_requests))
        cpu_relax();
    atomic_add(RC_INCR, &lock->rdr_cnt_and_flag);        //(b)
    while ((atomic_read(&lock->rdr_cnt_and_flag) & WAFLAG) != 0)//(c)
        cpu_relax();
}

void reader_unlock(rwlock_t *lock)
{
    atomic_sub(RC_INCR, &lock->rdr_cnt_and_flag);        //(d)
}

void writer_lock(rwlock_t *lock)
{
    int id = atomic_inc_return(&lock->writer_requests);          //(e)
    while (atomic_read(&lock->writer_completions) != id)         //(f)
        cpu_relax();
    while (atomic_cmpxchg(&lock->rdr_cnt_and_flag, 0, WAFLAG) != 0)//(g)
        while (atomic_read(&lock->rdr_cnt_and_flag) != 0)
            cpu_relax();
}

void writer_unlock(rwlock_t *lock)
{
    atomic_dec(&lock->rdr_cnt_and_flag);                  //(h)
    lock->writer_completions.counter++;                   //(i)
}




代码 (i) 不用写成原子递增操作 atomic_inc(&lock->writer_completions),因为此时有且仅有一个线程对 writer_completions 赋值。atomic_t 结构中的 counter 域有 volatile 关键字修饰,所以 (i) 一旦执行完毕,其它处理器就能感知 writer_completions 的新值。代码 (h) 是个原子操作,隐含了一个内存屏障,所以 (h) 的执行效果必定发生在 (i) 之前。
如果写者的数目和到来的频率较大,那么 writer_requests 和 writer_completions 这 2 个共享变量也会被频繁修改,如果它们和 rdr_cnt_and_flag 被放置在同一缓存行中,将增加处理器间缓存的同步开销。解决这种伪共享(False Sharing)问题的一种简单优化方法是用 __cacheline_aligned_in_smp 宏将锁结构中的 3 个共享变量放置在不同的缓存行中。
从代码 (e) 可知,每个写者都会取得一个号码 id,id 从 0 开始,中间显然不会遗漏,申请的顺序就是取得 id 的顺序,也就是说 id 小的写者较早申请。我们可以假定 Wn是 id 为 n 的写者,不失一般性,可以认为每个写者都是不同的。我们以 reader_lock() 和 writer_lock() 成功前最后一条原子操作(atomic_read(&lock->rdr_cnt_and_flag) 和 atomic_cmpxchg(&lock->rdr_cnt_and_flag, 0, 1))的执行顺序作为锁的获得顺序。
后面的证明还依赖这个事实:对于任意 n >= 0,Wn必定在有限时间内执行一次对 writer_completions 的递增操作,将其从 n 变为 (n + 1),且当 n > 0 时,W(n – 1)在 Wn完成该操作。证明请参阅下节关于公平读写自旋锁的论述。
我们给出代码正确性的简要证明:
  • 互斥。和读者优先的读写自旋锁的证明一样。
  • 读者并发。从代码 (b) - (c) 可看出,只要先到达的写者都已离开且当前没有写者持有锁,多个读者都能结束循环,从而获得锁。
  • 无死锁。从锁申请角度来证明,假定申请线程为 A,分情况讨论:
    • A 是读者,通过了代码 (a) 处循环。这说明通过时已无写者,但当执行代码 (b) 前,可能有新的写者到来并获得锁,不过一旦其释放,A 必定能获得锁。
    • A 是读者,无等待写者,锁被写者持有,导致无法通过代码 (a) 处循环。不过锁迟早被释放,因此 A 在有限时间内能通过 (a) 处循环,与上种情况相同。
    • A 是读者,至少有一个等待写者,导致无法通过代码 (a) 处循环,等待写者可能是比 A 晚来的。假定号码最小的等待写者是 B,我们证明 B 或别的线程能获得锁,请参考 d。
    • A 是写者,前面无等待写者。如果锁被写者持有,锁一旦释放后,A 就能跳出代码 (f) 处的循环。锁处于其它 2 种状态时,A 当然能通过 (f)。不过此时 A 不一定能通过代码 (g) 处的循环,因为可能已经有读者持有锁或者比 A 早到的等待读者抢先获得了锁。但是这些持有锁的读者数目是有限的,且必定在有限时间内释放锁,因此 A 一定能获得锁。
    • A 是写者,前面有至少一个等待写者。假定号码最小的等待写者是 B,我们证明 B 或别的线程能获得锁,请参考 d。
  • 写者优先。一旦写者 W 完成代码 (e) 而获得分配的号码,那么在释放锁之前,writer_completions 必定小于 writer_requests,那么后到的读者不能退出代码 (a) 处的循环,可见 W 一定在后到的读者之前获得锁。同理,后到的写者会在代码 (f) 处不断循环,W 一定在后到的写者之前获得锁。
公平读写自旋锁现实中,公平读写自旋锁可以保证当线程提交一个申请操作后,其等待时间有个可控的上界。这个优良特性,往往受到用户的青睐。
笔者在上节基础上实现了这个公平读写自旋锁。还是使用号码分配的方式为所有的读者 / 写者定序,但是这有个缺陷:线程无法知道直接后继的角色。如果读者 A 的直接后继 B 是个读者,那么 A 获得锁后,B 应该马上获得锁,而不是等到 A 释放锁之后。A 如何及时通知 B 呢?我们采用“事不关己,高高挂起”的态度,即读者获得锁后,立刻增加 completions 值,剩下的事丢给后继者自行处理。如果直接后继是写者咋办?写者此时还应判断是否仍有读者持有锁,即检查 nr_readers 是否大于 0。对写者而言,释放锁的时候才增加 completions 值。
清单 5. 基于简单共享变量的公平实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
typedef struct {
    atomic_t requests __cacheline_aligned_in_smp;
    atomic_t completions __cacheline_aligned_in_smp;
    atomic_t nr_readers __cacheline_aligned_in_smp;
} rwlock_t;

void init_lock(rwlock_t *lock)
{
    lock->requests = ATOMIC_INIT(0);
    lock->completions = ATOMIC_INIT(0);
    lock->nr_readers = ATOMIC_INIT(0);
}

void reader_lock(rwlock_t *lock)
{
    int id = atomic_inc_return(&lock->requests);             //(a)
    while (atomic_read(&lock->completions) != id)        //(b)
        cpu_relax();
    atomic_inc(&lock->nr_readers);               //(c)
    lock->completions.counter++;                     //(d)
}

void reader_unlock(rwlock_t *lock)
{
    atomic_dec(&lock->nr_readers);               //(e)
}

void writer_lock(rwlock_t *lock)
{
    int id = atomic_inc_return(&lock->requests);             //(f)
    while (atomic_read(&lock->completions) != id)        //(g)
        cpu_relax();
    while (atomic_read(&lock->nr_readers) > 0)            //(h)
        cpu_relax();
}

void writer_unlock(rwlock_t *lock)
{
    lock->completions.counter++;                     //(i)
}




代码 (c) 必须用原子递增操作,因为此时可能有读者正在执行代码 (e) 释放锁。(d) 和 (i) 不用写成原子递增操作,因为此时有且仅有一个线程对 completions 赋值。由于所有的线程均在 completions 这个共享变量上自旋、写者还需在 nr_readers 上自旋且读者不断修改 nr_readers,因此处理器间的同步开销比较大,将这 3 个共享变量放置在不同的缓存行中可以提高性能。
从代码 (a) 和 (f) 可知,每个申请线程都会取得一个号码 id,id 从 0 开始,中间显然不会遗漏,申请的顺序就是取得 id 的顺序,也就是说 id 小的线程较早申请。我们可以假定 An是 id 为 n 的申请线程,不失一般性,可以认为每个申请线程都是不同的。
先证明:对于任意 n >= 0,An必定在有限时间内执行一次对 completions 的递增操作,将其从 n 变为 (n + 1),且当 n > 0 时,A(n – 1)在 An完成该操作。采用第二数学归纳法:
  • n 等于 0、1 或 2 时,显然成立。
  • 假设 n < k(k >= 2) 时成立。
  • 当 n = k 时,只有当 A(k – 1)执行完对 completions 的递增操作后, completions 才变为 k。此时 A1,…,A(k – 1)已不能再对该变量进行操作,而 Ak之后的线程要么还没到来,要么在代码 (b) 或 (g) 处循环,只有 Ak才有可能跳出这两处的循环。如果 Ak是读者,那么它很快就可以对 completions 执行递增操作;如果 Ak是写者,它还得先等 nr_readers 为 0。nr_readers 必定在有限时间内变为 0,因为读者迟早要释放锁。然后 Ak在有限时间内执行 writer_lock() 时将 completions 递增。不论哪种情况,Ak必定在有限时间内执行一次对 completions 的递增操作,将其从 k 变为 (k + 1)。显然 Ak的执行在 A(k – 1)之后。
因为线程对 completions 的递增操作完成时意味着已经获得了锁,所以上面这个命题告诉我们申请线程必定在有限时间内获得锁,且获得锁的顺序和取得 id 的顺序一致,即“无死锁”和“公平”。
对于“互斥”性,我们假定在时刻 t,有 AI1,AI2,…,AIn这 n(> = 1) 个线程同时持有锁,且在 AI1之前获得锁的线程此时都已释放锁。因为申请线程依次获得锁,所以 I1 < I2 < … < In。可能存在线程 B,B 在某个 Ai和 A(i+1)之间获得锁,但在时刻 t,B 已经释放锁。分情况讨论:
  • AI1是读者。如果 AI2– AIn中有写者,假定 AIm是第一个写者,由于锁释放后线程对 nr_readers 的总贡献为 0(实际上写者根本不修改这个变量),AIm执行到代码 (g) 时因为尚有读者持有锁,nr_readers 必定大于 0,因此 AIm无法跳出该处的循环。可见 AI2– AIn中没有写者。
  • AI1是写者。因为写者持有锁的时候并不对 completions 执行递增操作,所以此时 AI1之后的申请线程不可能获得锁,于是 n 只能等于 1。
综上可知,读者和写者不可能同时持有锁,任何时刻至多只有一个写者持有锁。
一旦读者 A 获得锁,由代码 (d) 知它会递增 completions,如果 A 的直接后继 B 是个读者,那么 B 可以跳出代码 (b) 处的循环,从而获得锁。可见“读者并发”也是成立的。
结束语本系列文章详细论述读写自旋锁的原理和实现,本文是其中的第二部分,论述如何设计和实现基于简单共享变量的读写自旋锁。
返回列表