首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

共享模式与基于 Condition 的等待 / 通知机制实现(3)

共享模式与基于 Condition 的等待 / 通知机制实现(3)

Condition的await()方法实现原理—-构建等待队列我们知道,Condition是用于实现通知/等待机制的,和Object的wait()/notify()一样,由于本文之前描述AbstractQueuedSynchronizer的共享模式的篇幅不是很长,加之Condition也是AbstractQueuedSynchronizer的一部分,因此将Condition也放在这里写了。
Condition分为await()和signal()两部分,前者用于等待、后者用于唤醒,首先看一下await()是如何实现的。Condition本身是一个接口,其在AbstractQueuedSynchronizer中的实现为ConditionObject:
1
2
3
4
5
6
7
8
9
public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
         
        ...
}



这里贴了一些字段定义,后面都是方法就不贴了,会对重点方法进行分析的。从字段定义我们可以看到,ConditionObject全局性地记录了第一个等待的节点与最后一个等待的节点。
像ReentrantLock每次要使用ConditionObject,直接new一个ConditionObject出来即可。我们关注一下await()方法的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}



第2行~第3行的代码用于处理中断,第4行代码比较关键,添加Condition的等待者,看一下实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}



首先拿到队列(注意数据结构,Condition构建出来的也是一个队列)中最后一个等待者,紧接着第4行的的判断,判断最后一个等待者的waitStatus不是CONDITION的话,执行第5行的代码,解绑取消的等待者,因为通过第8行的代码,我们看到,new出来的Node的状态都是CONDITION的。
那么unlinkCancelledWaiters做了什么?里面的流程就不看了,就是一些指针遍历并判断状态的操作,总结一下就是:从头到尾遍历每一个Node,遇到Node的waitStatus不是CONDITION的就从队列中踢掉,该节点的前后节点相连。
接着第8行的代码前面说过了,new出来了一个Node,存储了当前线程,waitStatus是CONDITION,接着第9行~第13行的操作很好理解:
  • 如果lastWaiter是null,说明FIFO队列中没有任何Node,firstWaiter=Node
  • 如果lastWaiter不是null,说明FIFO队列中有Node,原lastWaiter的next指向Node
  • 无论如何,新加入的Node编程lastWaiter,即新加入的Node一定是在最后面
用一张图表示一下构建的数据结构就是:

对比学习,我们总结一下Condition构建出来的队列和AbstractQueuedSynchronizer构建出来的队列的差别,主要体现在2点上:
  • AbstractQueuedSynchronizer构建出来的队列,头节点是一个没有Thread的空节点,其标识作用,而Condition构建出来的队列,头节点就是真正等待的节点
  • AbstractQueuedSynchronizer构建出来的队列,节点之间有next与pred相互标识该节点的前一个节点与后一个节点的地址,而Condition构建出来的队列,只使用了nextWaiter标识下一个等待节点的地址
整个过程中,我们看到没有使用任何CAS操作,firstWaiter和lastWaiter也没有用volatile修饰,其实原因很简单:要await()必然要先lock(),既然lock()了就表示没有竞争,没有竞争自然也没必要使用volatile+CAS的机制去保证什么。
返回列表