同步队列的底层是双向链表,链表中的结点就是 Node 类。
// AQS 中的队列是通过双向链表实现的,Node 就是链表中的结点。static final class Node {static final Node SHARED = new Node(); // 共享锁模式static final Node EXCLUSIVE = null; // 互斥锁模式// 采用 volatile 修饰的变量private transient volatile Node head; // 双向链表头结点private transient volatile Node tail; // 双向链表尾结点volatile int waitStatus; // Node 结点的等待状态volatile Node prev; // 前继结点volatile Node next; // 后继结点volatile Thread thread; // Node 中封装的线程// Node 结点的等待状态枚举static final int CANCELLED = 1; // 取消状态static final int SIGNAL = -1; // 后继结点已挂起,需要它来唤醒static final int CONDITION = -2; // 结点在条件队列中static final int PROPAGATE = -3; // 共享锁模式下,传播唤醒的行为Node nextWaiter; // 特指条件队列中的 next}
表明当前同步队列当前的状态。
[[ReentrantLock]] 中 state 表示互斥锁是否被线程持有,即 0 没有线程持有,1 已经被线程持有。
[[Semaphore]] 中 state 变量来实现信号量计数。
[[ReentrantReadWriteLock]] 中 state 的高 16 位用于表示持有读锁的线程的数量,低 16 位用于表示持有写锁的线程的重入次数。
private volatile int state;protected final int getState() { return state;
}// 注:此方法不具备线程安全性,需要调用方自行保证上下文线程安全
protected final void setState(int newState) { state = newState;
}// 线程安全的方式修改 state 变量
protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
try 前缀的方法都是由子类实现的,AQS 中只是定义了方法,比如 tryAcquire
,tryRelease
。
do 前缀的方法,都是 AQS 实现的去同步队列排队,比如 doAcquireShared
。
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // Node.EXCLUSIVE表示互斥锁selfInterrupt(); // 如果在等待过程中发生中断,传递中断标志位
}
acquire 方法依赖于 tryAcquire 方法的返回值,tryAcquire 方法的具体逻辑由 AQS 子类实现。
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}
tryAcquire
具体实现由子类决定。[[模版方法模式]]
将当前线程封装为 Node 结点,插入到 AQS 双向链表的末尾。
// 没竞争到锁资源的线程封装成 Node 插入双向链表排队private Node addWaiter(Node mode) {// 将当前线程封装为 Node 结点Node node = new Node(Thread.currentThread(), mode);// 保存 tail 快照Node pred = tail;if (pred != null) {// 分为三步将新结点挂到链表末端node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 若 CAS 失败或者 pred 为 null,则以死循环的方式,保证当前线程挂到双向链表末尾enq(node);return node;}
enq:node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}
addWaiter:node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}
为什么 enq 方法和 addWaiter 方法里都有 CAS 替换尾指针来原子性插入 Node 结点这段代码,这是代码冗余么?为什么要这么设计?
因为在竞争激烈的条件下,可以直接通过 CAS 来操作插入,而不需要先判断后插入。从代码层面上来说只是多了一个条件判断。但是如果在 CPU 指令流水线层面上减少判断,则是一个重要的优化。
private Node enq(final Node node) {for (;;) { // 存在多线程竞争,所以这里需要死循环。// 拿到尾节点Node t = tail;// 如果尾节点为空,则构建一个伪头节点作为 head 和 tail。if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
通过死循环来让当前结点获取锁或者阻塞。返回值表示等待过程中是否发生了中断(true 表示等待过程中发生了中断)。
// acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 前继结点 pfinal Node p = node.predecessor();// 若前继结点是伪头结点,则尝试获取锁if (p == head && tryAcquire(arg)) {// 获取锁资源成功,更新头结点setHead(node);// 释放 p 结点指向当前结点的引用,帮助 GCp.next = null; // help GCfailed = false;return interrupted; // 传递中断标志位}// 当前线程必须将前继结点的 ws 设置为 -1 后,才能将自身挂起if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) interrupted = true;}} finally { // tryAcquire(arg) 抛异常的情况下会进这里if (failed)cancelAcquire(node);}}
private void setHead(Node node) {head = node;node.thread = null;node.prev = null;}
如果结点的状态是 SIGNAL(-1),那么它释放锁以后必须唤醒后继结点。shouldParkAfterFailedAcquire()
就是要确保前继结点的 ws 是 SIGNAL 后,再调用 parkAndCheckInterrupt()
将自身挂起。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 前继结点的 wsint ws = pred.waitStatus;// 若 ws 为 Node.SIGNAL,说明前继结点阻塞后会唤醒当前线程,当前线程可以放心阻塞if (ws == Node.SIGNAL)return true;// ws 大于 0 的状态只有取消状态 CANCELLED(1)if (ws > 0) {// 往前找,直到找到一个结点的状态不等于 1 的结点,作为当前结点的上一个结点do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else { // ws 为 0 或 PROPAGATE 的情况// 尝试将前继结点的 ws 修改为 SIGNAL(这里没用死循环,不一定成功)compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false; // 返回 false,重新进入外层死循环}
跳过 ws 是 CANCELLED 的结点:
do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;
将线程设置为可中断的等待并返回线程是否在等待过程中发生中断。如果通过 LockSupport.unpark
方法唤醒线程,那么返回 false。如果通过 Thread.interrupt
方法唤醒线程,那么返回 true。
private final boolean parkAndCheckInterrupt() {// 阻塞线程 LockSupport.park(this); // 可以确认当前挂起的线程是被中断唤醒的还是被正常唤醒的// 被中断唤醒返回 true,正常唤醒返回 falsereturn Thread.interrupted();
}
注:Thread.interrupted();
会清除中断标志位
// The interrupted status of the thread is cleared by this method.
public static boolean interrupted() { return currentThread().isInterrupted(true);
}
Cancels an ongoing attempt to acquire.
node.thread = null;
node.waitStatus = Node.CANCELLED;
private void cancelAcquire(Node node) {if (node == null)return;node.thread = null;Node pred = node.prev;// Skip cancelled predecessorswhile (pred.waitStatus > 0)node.prev = pred = pred.prev;Node predNext = pred.next;node.waitStatus = Node.CANCELLED;// 若当前 node 是尾结点,由于是多线程竞争尾指针,必须通过 CAS 操作修改尾指针为前一个结点if (node == tail && compareAndSetTail(node, pred)) {// 若 CAS 修改尾指针成功后,将前继结点的 next 设置为 nullcompareAndSetNext(pred, predNext, null);} else {// 当前结点不是尾结点或者 compareAndSetTail 失败int ws;// 当前结点不是伪头结点的后继结点// 如果 pred 结点的状态是 SIGNAL,则表明 pred 需要唤醒后一个结点。尝试设置 pred 结点的 next 指针为需要他来唤醒的结点。if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) // 存在一个状态 ws == Node.SIGNAL 但是 pred.thread == null// 因为 cancelAcquire 是先执行 node.thread = null; 后执行 node.waitStatus = Node.CANCELLED;// 所以这里需要校验 pred.thread != null&& pred.thread != null) {Node next = node.next;// node 的 next 不为 null,且不是取消结点,则将 pred 的 next 指向当前结点的 next// 这里只是试一下,没有用死循环,因为查找有效节点是从 tail 往前找,这里失败也没事if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {// pred == head,当前结点是伪头结点的后继结点,则唤醒后继结点// 进入条件判断if (p == head && tryAcquire(arg))尝试竞争锁 unparkSuccessor(node);}// 当前结点的 next 指向当前结点node.next = node; // help GC}}
private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)node.compareAndSetWaitStatus(ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) { // 后继结点为 null 或者是 CANCELLED 状态s = null;// 从 tail 往前找,找到一个离 node 最近的未取消结点for (Node p = tail; p != node && p != null; p = p.prev)if (p.waitStatus <= 0)s = p;}if (s != null)LockSupport.unpark(s.thread); // 唤醒
}
响应中断的方法,如果在获取锁过程中发生了线程被中断,则会抛出中断异常。
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 若线程被中断,抛出中断异常if (!tryAcquire(arg)) // 调用子类获取锁的方法doAcquireInterruptibly(arg); // 执行可中断的获取锁流程
}
doAcquireInterruptibly 方法和 acquire 方法逻辑大致相同,区别在于 doAcquireInterruptibly 会响应中断即如果线程被中断,则不会继续获取锁,直接抛出中断异常。acquire 方法则不会响应中断标志位,仅仅是把中断标识位传递出去。
// 拿不到锁资源就一直等,直到锁资源释放后被唤醒或者被中断唤醒
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); // 响应中断,与 acquire 主要区别} } finally { if (failed) cancelAcquire(node); }
学习共享锁时,我们需要特别关注读写锁是如果解决写线程饥饿(写线程抢不到锁)问题。
acquireShared 和 acquire 主要区别是,当前线程获取锁后是否会唤醒后继结点。
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}
Semaphore、CountDownLatch、ReentrantReadWriteLock 中都有 tryAcquireShared 方法的实现。
private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();// 若当前结点的前继结点是头节点,则调用子类实现的 tryAcquireShared 获取锁if (p == head) {// 若成功获取锁,则设置当前结点为头节点,并唤醒同样等待在 SHARED 模式下的线程// ReentrantReadWriteLock 的实现中返回值为 1 表示获取到读锁,-1 表示未获取到int r = tryAcquireShared(arg); if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt(); // 若被中断过,传递中断标志位failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
Sets head of queue, and checks if [[successor]] may be waiting in shared mode, if so propagating if either propagate > 0 or PROPAGATE status was set.
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // 保存旧的 head 结点setHead(node); // 当前结点设置为头结点// 老爷子的习惯,在使用变量之前都会做非空校验,虽然这里 h 和 head 必不为 null,但是还是校验了 h == null 和 (h = head) == null// 下述条件判断可简化为 if(propagate > 0 || h.waitStatus < 0 || head.waitStatus < 0)if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared()) // 此处 s 也不可能为 null,所以可简写为 if(s.isShared())。下一个结点是读线程才唤醒,写线程就不唤醒了doReleaseShared();}}
[[ReentrantReadWriteLock]] 的实现中,tryAcquireShared 获取到读锁时返回值为 1,即 propagate 为 1,propagate > 0 恒成立,setHeadAndPropagate 可简写为:
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; setHead(node); Node s = node.next;if (s.isShared()) doReleaseShared();}
Invokes release with current state value; returns saved state. Cancels node and throws exception on failure.
final int fullyRelease(Node node) {boolean failed = true;try {// 若是重入锁,则 savedState 是重入次数int savedState = getState();if (release(savedState)) { // 释放锁并唤醒后继结点failed = false;return savedState;} else {throw new IllegalMonitorStateException(); // 若线程未获取到锁就执行该方法,则抛出异常}} finally {if (failed)node.waitStatus = Node.CANCELLED; // 若失败则将结点状态更新为 CANCELLED(-1)}
}
release 方法存在伪唤醒问题,即唤醒的线程又继续阻塞。
public final boolean release(int arg) {if (tryRelease(arg)) {// 头结点也即当前结点Node h = head; // 若当前结点的 waitStatus 状态不等于 0(等于 -1)// 说明当前结点需要唤醒后面的结点if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
多个线程同时释放读锁时,第一个线程将头结点的状态从 SIGNAL 修改为 0。第二个线程进来发现 ws == 0 了,于是就将 ws 修改为 -3,表明此时有多个线程同时释放了读锁,被唤醒的线程需要将唤醒传播下去。
private void doReleaseShared() {for (;;) { // 允许多个持有读锁的线程同时释放锁Node h = head;if (h != null && h != tail) { // 校验是否存在等待读锁的线程int ws = h.waitStatus;// 若当前头结点的状态为 SIGNAL,则表明后面有结点需要它来唤醒。CAS 将其修改为 0,并唤醒后驱结点if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; unparkSuccessor(h); // 唤醒后继结点}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 传播唤醒continue; }if (h == head) // 若头结点发生变化,必须重新进入死循环校验条件 break;}}
在生产者消费者模型中,消费者消费时,如果队列为空,则需要等待生产者生产后,唤醒消费者。生产者生产时,如果队列满了,则需要等待消费者消费后,由消费者唤醒生产者。可以参考笔者的 ArrayBlockingQueue 源码分析。
显然只有同步队列是满足不了我们的需求,我们还需要 2 个来队列来保存等待条件满足的生产者的消费者即条件队列。
想象一下,如果没有条件队列,队列为空时,消费者获取锁成功,发现队列为空,于是释放锁,又重新竞争锁资源,导致生产者一直没有获取到锁进行生产。这时极大的性能浪费。
public class ConditionObject implements Condition, java.io.Serializable {private transient Node firstWaiter; // 第一个等待结点private transient Node lastWaiter; // 最后一个等待结点private static final int REINTERRUPT = 1; // 重复中断状态位private static final int THROW_IE = -1; // 发生异常状态位}
先将线程结点放入等待队列中,然后释放同步队列的锁,等待条件达成、中断、超时。当线程等待完成后,将重新调用 acquireQueued 方法获取锁。
Implements uninterruptible condition wait.
public final void awaitUninterruptibly() {// 加入同步队列Node node = addConditionWaiter();// 释放锁int savedState = fullyRelease(node);boolean interrupted = false;// 若线程不在同步队列中,则阻塞while (!isOnSyncQueue(node)) {LockSupport.park(this); // 阻塞if (Thread.interrupted())interrupted = true; // 中断唤醒}if (acquireQueued(node, savedState) || interrupted)selfInterrupt(); // 传递中断标志位}
await 和 awaitUninterruptibly 的区别在于,await 是会响应中断的。
public final void await() throws InterruptedException {if (Thread.interrupted()) throw new InterruptedException(); // 响应中断// 将线程添加到条件队列中Node node = addConditionWaiter();int savedState = fullyRelease(node); // 释放锁int interruptMode = 0;while (!isOnSyncQueue(node)) { // while(不在同步队列中)LockSupport.park(this); // 挂起当前线程// 有两种情况会导致线程被唤醒:1. 中断唤醒 2. singal 将其移入同步队列后唤醒// 被唤醒后需要校验等待过程中是否发生中断if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break; // 若等待过程中发生中断,跳出 while 循环}// 被唤醒后,重新获取锁,此时线程已在同步队列中if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0) // 处理中断reportInterruptAfterWait(interruptMode);
}
Adds a new waiter to wait queue.
private Node addConditionWaiter() {Node t = lastWaiter;// 若最后一个等待结点状态不是 CONDITION,则执行 unlinkCancelledWaiters 方法去掉所有非 Node.CONDITION 状态的变量if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter; // 获取最新的 lastWaiter}// 新建 Node 结点,插入等待队列Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}
Unlinks cancelled waiter nodes from condition queue. Called only while holding lock. This is called when cancellation occurred during condition wait, and upon insertion of a new waiter when lastWaiter is seen to have been cancelled. This method is needed to avoid garbage retention in the absence of signals. So even though it may require a full traversal, it comes into play only when timeouts or cancellations occur in the absence of signals. It traverses all nodes rather than stopping at a particular target to unlink all pointers to garbage nodes without requiring many re-traversals during cancellation storms.
总结一下就是线程被 transfer 到同步队列以后,可能还在条件队列中,此时需要将他们 unlink 掉。
private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) {Node next = t.nextWaiter;if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)lastWaiter = trail;}elsetrail = t;t = next;}}
Checks for interrupt, returning THROW_IE if interrupted before signalled, REINTERRUPT if after signalled, or 0 if not interrupted.
检查在等待过程中是否发生了中断:
// 传递中断状态位
private static final int REINTERRUPT = 1;
// 发生异常状态位
private static final int THROW_IE = -1;private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ? // 检查中断标志位(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;
}
final boolean transferAfterCancelledWait(Node node) {// 当前线程是通过中断方式唤醒,需要抛出中断异常if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {enq(node); // transferreturn true; // 抛出中断异常}// // 其他线程 signal 当前线程时,被中断了。while (!isOnSyncQueue(node)) Thread.yield();return false; // REINTERRUPT}
Throws InterruptedException, reinterrupts current thread, or does nothing, depending on mode.
private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt();}
Moves the longest-waiting thread, if one exists, from the wait queue for this condition to the wait queue for the owning lock.
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter; // 先进先出if (first != null)doSignal(first); // 唤醒等待队列中的头节点}
Removes and transfers nodes until hit non-cancelled one or null. Split out from signal in part to encourage compilers to inline the case of no waiters.
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}
if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;
first.nextWaiter = null;
相当于 notifyAll
public final void signalAll() {if (!isHeldExclusively()) // 必须持有锁throw new IllegalMonitorStateException(); Node first = firstWaiter;if (first != null)doSignalAll(first);}
private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first); // 移动到同步队列first = next; // 更新 first 的引用} while (first != null);}
final boolean transferForSignal(Node node) {// If cannot change waitStatus, the node has been cancelled.if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 将 node 放入同步队列,并返回前继结点 pNode p = enq(node);int ws = p.waitStatus;// 若 p 的状态大于 0,那么就只有 CANCELLED 状态了,说明前继结点并不能唤醒 node,那么直接调用 unpark 唤醒 node// 若 p 不是 CANCELLED 状态,那么尝试将其修改为 -1,后续由 p 来唤醒 node,若 CAS 失败,则直接唤醒 nodeif (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 线程协作LockSupport.unpark(node.thread);return true;}