Java 对象拥有一组监视方法:wait()、wait(long timeout)、notify() 以及 notifyAll() 方法,这些方法与 synchronized 同步关键字配合,可以实现等待/通知模式,进行线程之间的通讯。Condition 接口也提供了类似的方法,与 Lock 配合可以实现等待/通知模式,但两者使用方法和功能上存在差异。我们对比学习一下:

对比项 Object 对象 Condition 接口
前置条件 获取对象的锁 调用Lock.lock()获取锁,通过Lock.newCondition()获取Condition对象
调用方法 object.wait() condition.await()
等待队列(wait queue)个数 一个 多个
当前线程释放锁并进入等待状态 支持 支持
阻塞时,响应中断interrupt() 响应中断 响应中断 OR 不响应中断
当前线程释放锁,进入超时状态X(long timeout) 支持 支持
当前线程释放锁并进入等待状态到将来的某个时间 不支持 支持
唤醒等待队列中的一个线程 支持 支持
唤醒等待队列中的全部线程 支持 支持

# 一、Condition 接口

当线程调用 Condition 中的方法时,需要提前获取到 Condition 关联的锁(Condition 对象是由 Lock 对象的 newCondition() 方法创建),也就是 Condition 依赖 Lock 对象。

public class ConditionUseCase {
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    public void conditionWait() throws InterruptedException{
        lock.lock();
        try {
            condition.await();
        } finally {
            lock.unlock();
        }
    }

    public void conditionSignal() throws InterruptedException{
        lock.lock();
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

一般都会将 Condition 对象作为成员。当调用 await() 方法后,当前线程会释放锁并在此等待,而其他线程调用 Condition 对象的signal() 方法,通知当前线程后,当前线程才从 await() 方法返回,并且在返回前已经获取了锁。Condition 定义方法:

方法名称 描述
void await() throws InterruptedException 当前线程进入等待状态直到被通知(signal)或中断,当前线程将进入运行状态且从 await() 方法返回的情况,包括:其他线程调用该 Condition 的 signal()或signalAll()方法,或者被其他线程调用 interrupt 中断。如果当前线程从 await()方法返回,表明该线程已经获取了 Condition 对象所对应的锁。
void awaitUninterruptibly() 当前线程进入等待状态直到被通知,对中断不敏感。
long awaitNanos(long nanosTimeout) throws InterruptedException 当前线程进入等待状态直到被通知、中断或者超时。返回值表示剩余时间,如果在 nanosTimeout 纳秒之前被唤醒,那么返回值就是(nanosTimeout-实际消耗)返回值如果是0或者负数表示超时。
boolean awaitUntil(Date deadline) throws InterruptedException 当前线程进入等待状态直到被通知、中断或者到某个时间。如果没有到指定时间就被通知,方法返回true,否则,表示到了指定时间,方法返回 false。
void signal() 唤醒一个等待在 Condition 上的线程,该线程从等待方法返回前必须获得与Condition 相关的锁。
void signalAll() 唤醒所有等待在 Condition 上的线程,能够从等待方法返回的线程必须获得与Condition相关的锁。

# 二、Condition 的实现分析

ConditionObject 是同步器 AQS(AbstractQueuedSynchronize)的内部类,因为 Condition 的操作需要获取相关联的锁,所以作为同步器的内部类也比较合理。每个 Condition 对象都包含着一个队列(等待队列),该队列是 Condition 对象实现等待/通知功能的关键。

【1】等待队列: 等待队列是一个 FIFO 的队列,队列中包含的是在 Condition 对象上等待的线程。如果一个线程调用 Condition.await() 方法,那么该线程就会释放锁、构造成节点加入到等待队列中。事实上,节点的定义复用了同步器中节点的定义,同步队列和等待队列中节点类型都是同步器的静态内部类 AbstractQueuedSynchronizer.Node。一个 Condition 包含一个等待队列,Condition 拥有首节点(firstWaiter)和尾结点(lastWaiter)。当前线程调用 Condition.await() 方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。

【2】等待: 调用 Condition 的 await() 方法(或者以 await 开头的方法),会使当前线程进入等待队列并释放锁,同时线程进入等待状态。如果不是通过调用 Condition.signal()方法唤醒,而是通过其他线程调用interrupt() 中断,则会抛出interruptedException 异常。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //当前线程构造成节点加入等待队列
    Node node = addConditionWaiter();
    //释放同步状态,也就是释放锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // 取消时清除
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

【3】通知: 调用 Condition 的 signal() 方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒之前,会将节点移到同步队列中。调用该方法的前提就是必须获取锁。signal 源码如下:通过调用同步器的 enq(Node node)方法,等待队列中的头节点线程安全移动到同步队列。当节点移动到同步队列后,当前线程再使用 LockSupport 的unpark唤醒该节点。

public final void signal() {
        //isHeldExclusively 检查当前线程是否获取了锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
        //获取第一个Node
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
1
2
3
4
5
6
7
8
9

被唤醒的线程,将从 await() 方法的 while 循环中退出(isOnSyncQueue(Node node))是否处于同步队列,方法返回 true 表示在同步队列,然后调用同步器的 acquireQueued() 方法加入到获取同步状态竞争中。被唤醒的线程将从调用的 await() 方法返回,此时该线程已经成功获取了锁。Condition的 signalAll() 方法,相当于对等待中的每一个节点执行了一次 signal() 方法,效果就是将等待队列中所有节点转移到同步队列 AQS 中,并唤醒每个节点的线程。

(adsbygoogle = window.adsbygoogle || []).push({});