AQS (AbstractQueuedSynchronizer,抽象队列同步器) 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 ReentrantLock,Semaphore,其他的诸如 ReentrantReadWriteLock,SynchronousQueue,FutureTask 等等皆是基于 AQS 的。可见,AQS是我们理解其他Java JUC并发容器的基础。
AQS 核心思想是为了解决多线程环境下对共享资源的请求同步问题。对于请求的共享资源,如果资源存在空闲,则当前请求获取到对应资源正常执行,否则,会转换为阻塞状态,直到有资源被释放后,由工作线程来唤醒。
AQS借助CLH队列锁(由于是 Craig、Landin 和 Hagersten三位大佬的发明,因此命名为CLH锁。)来实现上述的线程阻塞等待功能。这里CLH锁是一种自旋锁,当多线程竞争一把锁时,获取不到锁的线程,会排队进入CLH队列的队尾,然后自旋等待,直到其前驱线程释放锁。CLH队列锁的好处是唤醒线程时避免了惊群效应,因为只会唤醒当前线程的下一个等待线程。
除了CLH队列外,AQS中还有一个很重要的队列:Condition条件队列。Condition
声明了一组等待/通知的方法,这些方法的功能与Object
中的wait/notify/notifyAll
等方法相似,只不过Condition 中的方法则要配合锁对象使用,并通过newCondition
方法获取实现类对象。注意Condition对象只能在独占锁中才能使用。
下图给出了AQS组件的结构框架图:
AQS对资源的共享方式包括两种:
AQS内部维护着一个FIFO的队列,该队列就是用来实现线程的并发访问控制。队列中的元素是一个Node类型的节点,Node的主要属性如下:
static final class Node {int waitStatus;Node prev;Node next;Node nextWaiter;Thread thread;
}
各属性表示的含义如下:
其中,CLH队列里还有一个head
节点和一个tail
节点,分别表示头结点和尾节点,其中头结点是空节点,本身不存储Thread,仅保存next结点的引用。
CLH队列用来存储Node节点封装的线程阻塞队列。每个获取资源失败的线程,根据先来后到,利用CAS原子操作依次插入到CLH队列的尾部,等待被唤醒获取资源。为了便于尾节点插入数据,CLH队列做成了双端队列,即包含head和tail的FIFO队列。因此可以说,AQS实现多线程间对共享资源的同步访问,就是依靠CLH队列来实现的。
CLH队列每次unpark
解除线程阻塞时,都会从队列的头节点来选择线程进行接触阻塞。
CLH每一个线程都是一个自旋锁,非常消耗CPU。等待锁的每个线程在自己的某个变量上自旋等待,这个变量指向自己的前驱节点中的变量,通过不断地自旋,感知到前驱节点的变化后成功获取到锁。
我们可以看到公平锁就是最初的实现理念就是CLH队列。
在AQS中维护了一个同步状态变量state
,用来表示同步状态。
不同线程获取和释放资源时,都会对state进行更新。比如state > 0表示可以获取共享资源,否则无法获取。该变量对不同的子类实现具有不同的意义。
比如,对ReentrantLock来说,它表示加锁的状态:
exclusiveOwnerThread==Thread.currentThread()
),即可加锁,每次加锁都会将state的值+1,state等于几,就代表当前持有锁的线程加了几次锁;其他
state变量定义如下:
/*** The synchronization state.*/
private volatile int state;
Object 的 wait、notify 函数是配合 Synchronized 锁实现线程间同步协作的功能,AQS 的 ConditionObject 条件变量也提供这样的功能,通过 ConditionObject 的 await 和 signal 两类函数完成。
ConditionObject是通过基于单链表的条件队列来管理等待线程的。线程在调用await方法进行等待时,会释放同步状态。同时线程将会被封装到一个等待节点中,并将节点置入条件队列尾部进行等待。当有线程在获取独占锁的情况下调用signal或singalAll方法时,队列中的等待线程将会被唤醒,重新竞争锁。另外,需要说明的是,一个锁对象可同时创建多个 ConditionObject 对象,这意味着多个竞争同一独占锁的线程可在不同的条件队列中进行等待。在唤醒时,可唤醒指定条件队列中的线程。
不同于 Synchronized 锁,一个 AQS 可以对应多个条件变量,而 Synchronized 只有一个。
如上图所示,AQS中维护这两个队列:CLH队列和Condititon条件队列。Condition条件队列时单向队列,利用上述提及的nextWaiter指针指向下一个节点。Condition队列只入队执行await的线程节点,并等待被调用singnal方法唤醒。从Condition队列中被唤醒的线程会入队到CLH队列。过程类似于Object类的wait/notify原理过程。
AQS针对上述提及的资源共享的两种不同模式,利用不同的方式进行实现:
独占式
acquire()
:获取资源release()
:释放资源共享式
acquireShard
:共享模式下获取资源releaseShard
:共享模式下释放资源acquire
是AQS中的方法,代码如下:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
该方法主要工作如下:
图中比较重要的几个方法:
看下addWaiter方法的定义:
private Node addWaiter(Node mode) {// 根据当前线程创建一个Node对象Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;// 判断tail是否为空,如果为空表示队列是空的,直接enqif (pred != null) {node.prev = pred;// 这里尝试CAS来设置队尾,如果成功则将当前节点设置为tail,否则enqif (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}
该方法核心逻辑就是将当前线程封装为一个Node,然后利用CAS操作添加到队列尾部。
该方法的功能是循环的尝试获取锁,直到成功为止,最后返回中断标志位。
/*** Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.** @param node the node* @param arg the acquire argument* @return {@code true} if interrupted while waiting*/final boolean acquireQueued(final Node node, int arg) {// 异常状态,默认是trueboolean failed = true;try {// 表示当前线程是否被中断过,默认是否boolean interrupted = false;// 一直自旋for (;;) {// 获取node节点的前驱节点final Node p = node.predecessor();// 如果前驱节点是首节点,说明当前线程所在的节点是列表中第一个节点,则获取资源成功if (p == head && tryAcquire(arg)) {// 获取资源成功,设置当前节点为头节点,清空当前节点的信息setHead(node);// 原来首节点的next置为null,这样该节点就变成一个孤立的节点,有利于下次GC时,比如使用标记清除法遍历时快速GC掉p.next = null; // help GC// 非异常状态,防止指向finally逻辑failed = false;// 返回线程中断状态return interrupted;}/*** 如果前驱节点不是首节点,先执行shouldParkAfterFailedAcquire函数,* shouldParkAfterFailedAcquire做了三件事* 1.如果前驱节点的等待状态是SIGNAL,返回true,执行parkAndCheckInterrupt函数,返回false* 2.如果前驱节点的等大状态是CANCELLED,把CANCELLED节点全部移出队列(条件节点)* 3.以上两者都不符合,更新前驱节点的等待状态为SIGNAL,返回false*/if (shouldParkAfterFailedAcquire(p, node) &&//使用LockSupport类的静态方法park挂起当前线程,直到被唤醒,// 唤醒后检查当前线程是否被中断,返回该线程中断状态并重置中断状态parkAndCheckInterrupt())interrupted = true;}} finally {// 尝试获取资源失败并执行异常,取消请求,将当前节点从队列中移除if (failed)cancelAcquire(node);}}
整体流程如下图所示:
AQS使用release()
方法来对资源进行释放,release
方法中,唤醒线程时会首先唤醒CLH队列中的队头节点所对应的线程,代码如下:
public final boolean release(int arg) {// 释放资源成功,tryRelease子类实现if (tryRelease(arg)) {// 获取头部线程节点Node h = head;// 首节点不为null,并且等待状态不为0if (h != null && h.waitStatus != 0)// 唤醒CLH队列队首节点对应的线程(首节点的下一个节点)unparkSuccessor(h);return true;}return false;}private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/// 获取节点等待状态int ws = node.waitStatus;if (ws < 0)// CAS操作更新节点状态为0compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/// s指向队首第一个节点(不包括首节点)Node s = node.next;// 如果队首节点状态为CANCELLED,则从队尾开始循环向前,直到获取到第一个正常节点为止if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)// 唤醒队首节点对应线程LockSupport.unpark(s.thread);}
释放流程如下所示:
acquireShared 是个模板函数,模板流程就是线程获取共享资源,如果获取到资源,线程直接返回,否则进入 CLH 队列,直到获取到资源为止,且整个过程忽略中断的影响,acquireShared 函数代码如下:
/*** Acquires in shared mode, ignoring interrupts. Implemented by* first invoking at least once {@link #tryAcquireShared},* returning on success. Otherwise the thread is queued, possibly* repeatedly blocking and unblocking, invoking {@link* #tryAcquireShared} until success.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquireShared} but is otherwise uninterpreted* and can represent anything you like.*/public final void acquireShared(int arg) {/**1. 负数表示失败2. 0表示成功,但没有剩余可用资源3. 正数表示成功且有剩余资源*/// <0表示获取资源失败if (tryAcquireShared(arg) < 0)// 自旋阻塞等待获取资源doAcquireShared(arg);}
doAcquireShared 函数与独占式的 acquireQueued 函数逻辑基本一致,区别在于:
addWaiter
方法:在共享锁的方式中,创建节点的方式是:final Node node = addWaiter(Node.SHARED);
这里,Node node = new Node(Thread.currentThread(), mode);
表明添加了一个共享式节点,节点标记是共享式,并入队CLH队列。setHeadAndPropagate(node, r)
方法:设置自己尾队列头节点,并尝试唤醒后继节点。即获取资源成功时,还会尝试唤醒后继资源,因为资源数可能>0,代表还有资源可获取,所以需要做后续线程节点的唤醒。同样的,AQS中提供了releaseShared
方法来释放共享资源,唤醒CLH队列的头节点的下一个节点,也就是CLH队列中第一个等待线程所对应的节点。
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {//释放资源成功,tryReleaseShared子类实现//唤醒后继节点doReleaseShared();return true;}return false;}private void doReleaseShared() {for (;;) {//获取头节点Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {//如果头节点等待状态为SIGNALif (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//更新头节点等待状态为0continue; // loop to recheck cases//唤醒头节点下个线程节点unparkSuccessor(h);}//如果后继节点暂时不需要被唤醒,更新头节点等待状态为PROPAGATEelse if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; }if (h == head) break;}}
与独占式释放资源区别不大,都是唤醒头节点的下个节点,就不做过多描述了。
本文从独占锁的实现出发,比较完整的分析了AQS内部独占锁和共享锁的实现。总体来说实现的思路很清晰,就是使用了标志位+队列的方式来处理锁的状态,包括锁的获取,锁的竞争以及锁的释放。在AQS中,利用CLH队列和Condition队列来维护多线程队共享资源的同步访问,state可以表示锁的数量,也可以表示其他状态,state的含义由子类去定义,自己只是提供了对state的维护。AQS通过state来实现线程对资源的访问控制,而state具体的含义要在子类中定义。
对于 AbstractQueuedSynchronizer 的分析,最核心的就是 sync queue 的分析。