首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

读写自旋锁详解,第 3 部分 可扩展性研究(2)

读写自旋锁详解,第 3 部分 可扩展性研究(2)

清单 1. 基于单向链表的公平读写自旋锁的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#define NONE 0
#define READER 1
#define WRITER 2

typedef struct _qnode{
    int type;
    union {
        volatile int state;                     //(1)
        struct {
            volatile short waiting;
            volatile short successor_type;
        };
    };
    _qnode *volatile next;
} __cacheline_aligned_in_smp qnode;

typedef struct {
    qnode *volatile tail;
    qnode *volatile next_writer;
    atomic_t nr_readers;
} rwlock_t

void init_lock(rwlock_t *lock)
{
    lock->tail = NULL;
    lock->next_writer = NULL;
    lock->nr_readers = ATOMIC_INIT(0);
}

void reader_lock(rwlock_t *lock)
{
    qnode *me = get_myself();                   //(2)
    qnode *pred = me;

    me->type = READER;
    me->next = NULL;
    me->state = 1;// successor_type == NONE && waiting == 1

    xchg(&lock->tail, pred);
    if (pred == NULL) {
        atomic_inc(&lock->nr_readers);
        me->waiting = 0;
    } else {
        If ((pred->type == WRITER)
            || (cmpxchg(&pred->state, 1, 0x00010001) == 1)} { //(3)
            pred->next = me;
            while (me->waiting)
                cpu_relax();
        } else {
            atomic_inc(&lock->nr_readers);
            pred->next = me;
            me->waiting = 0;
        }
    }

    if (me->successor_type == READER) {
        while (me->next == NULL)
            cpu_relax();
        atomic_inc(&lock->nr_readers);
        me->next->waiting = 0;
    }
}

void reader_unlock(rwlock_t *lock)
{
    qnode *w;
    qnode *me = get_myself();

    if ((me->next != NULL)
|| (cmpxchg(&lock->tail, me, NULL) != me)) {
        while (me->next == NULL)
            cpu_relax();

        if (me->successor_type == WRITER)
            lock->next_writer = me->next;
    }

    if ((atomic_dec_return(&lock->nr_readers) == 1)
        && ((w = lock->next_writer) != NULL)
&& (atomic_read(lock->nr_readers) == 0)
&& (cmpxchg(&lock->next_writer, w, NULL) == w))
    w->waiting = 0;
}

void writer_lock(rwlock_t *lock)
{
    qnode *me = get_myself();
qnode *pred = me;

    me->type = WRITER;
    me->next = NULL;
    me->state = 1;

    xchg(&lock->tail, pred);
    if (pred == NULL) {
        lock->next_writer = me;
        if ((atomic_read(lock->nr_readers) == 0)
            && (cmpxchg(&lock->next_writer, me, NULL) == me))
            me->waiting = 0;
    } else {
pred->successor_type = WRITER;
smp_wmb();                          //(4)
        pred->next = me;
}

while (me->waiting)
        cpu_relax();
}

void writer_unlock(rwlock_t *lock)
{
    qnode *me = get_myself();

    if ((me->next != NULL)
|| (cmpxchg(&lock->tail, me, NULL) != me)) {
while (me->next == NULL)
            cpu_relax();
        
        if (me->next->type == READER)
            atomic_inc(&lock->nr_readers);

me->next->waiting = 0;
    }  
}




  • 使用 union 结构是为了方便后面将 successor_type 和 waiting 合在一起做原子操作。
  • 如果事先知道线程的数目,例如代码用于中断上下文,qnode 可以包装在 lock 数据结构中,每个线程一个;否则,可以使用线程本地存储区(Thread Local Storage,使用 gcc 关键字 __thread)分配 qnode。我们不关心 qnode 的分配细节,假定有个 get_myself() 函数可以获得当前线程的 qnode。
  • cmpxchg 原子操作将 [successor_type,waiting] 原子地从 [NONE,TRUE] 改变为 [READER,TRUE]。
  • 此处的“Write Memory Barrier”目的是确保对 successor_type 的赋值先完成。因为这两个赋值没有相关性,如果处理器乱序执行写指令,且对 next 的赋值先完成,那么链表就已构建完毕,前驱可能随时释放锁导致 pred 指针失效。
进一步提高读写自旋锁的可扩展性基于单向链表的读写自旋锁并不完美,因为线程还是得操作额外的共享变量 nr_readers 和 next_writer,这是由于读者释放锁的时候无法继续保持单向链表的结构。一种改进想法是使用双向链表 [5],因为双向链表的插入和删除操作能够做得相对高效。于是读者释放锁的时候可以将自己的节点结构从双向链表中删除,从而继续保持链表的结构。读者通过判断前驱指针是否为 NULL 就知道自己是不是最后一个持有者,而且也再不需要 next_writer 指针。但是该算法中对双向链表的操作使用了节点级别的细粒度普通自旋锁,在连续读者较多且几乎同时释放读写自旋锁的情况下,同步开销依然巨大。
第二种想法是为每个线程分配一个局部的普通自旋锁,读者只需获得自己的自旋锁即可,而写者必须将包括自己在内的所有人的自旋锁都获得才行 [6]。这个算法显然是个读者优先的实现,在写者较少的情况下可扩展性相当理想。不足之处有两点:一是写者得知道其它线程的自旋锁在哪儿,因此比较适用于固定线程的场景;其次是读者数目越多,对写者也就越不公平。
我们看到:一般而言,在一段可观测的时间内,读者数量远远大于写者,很多时候甚至没有写者。因此在基于单向链表的实现中,只有共享变量 nr_readers 才是一个明显的瓶颈。进一步分析可知,我们其实并不需要知道 nr_readers 的具体值,只是想了解 nr_readers 是否大于 0 还是等于 0,于是 Sun 的研究人员提出使用一种称为可扩展非零指示器(Scalable Non-Zero Indicator)的数据结构,大大降低了线程间的同步开销 [7]。
结束语本系列文章详细论述读写自旋锁的原理和实现,本文是其中的第三部分,针对大规模多核系统讨论如何设计和实现可扩展的读写自旋锁。
返回列表