AQS同步器解析

本文代码基于Java8

前言

AQS (AbstractQueuedSynchronizer) 提供了基于 FIFO 队列机制的模板,可以用来实现阻塞锁以及相关的同步锁 (semaphores, events等),是构建 Java 同步组件的基础。自定义子类通过继承 AQS 类,实现它的抽象方法来管理同步状态。AQS 提供了大量的模板方法来实现同步,主要是分为三类:

  1. 独占式获取、释放同步状态。
  2. 共享式获取、释放同步状态
  3. 查询同步队列中的等待线程情况。
1
2
3
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable

继承关系及 public 方法如下图所示

需要自定义同步组件实现的方法

基于 AQS 实现的自定义同步组件主要有五种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 独占式获取同步状态
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 独占式释放同步状态
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 共享式获取同步状态
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 共享式释放同步状态
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 如果当前(调用)线程以独占方式持有同步,则返回{@code true}
// 每次调用非等待的方法时都会调用此方法。(等待方法改为调用{@link release}。)
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}

独占式

独占式,同一时刻仅有一个线程持有同步状态。

在 AQS 中维护着一个 FIFO 的同步队列,当线程获取同步状态失败后,则会加入到这个 CLH 同步队列的对尾并一直保持着自旋。在 CLH 同步队列中的线程在自旋时会判断其前驱节点是否为首节点,如果为首节点则不断尝试获取同步状态,获取成功则退出 CLH 同步队列。当线程执行完逻辑后,会释放同步状态,释放后会唤醒其后继节点。

独占式获取同步状态

acquire(int arg) 方法是 AQS 提供的模板方法,该方法为独占式获取同步状态,忽略中断,即线程获取同步状态失败加入到 CLH 同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
  • tryAcquire(int arg) 去尝试获取锁,获取成功则设置锁状态并返回 true,否则返回 false。该方法需要自定义同步组件实现,该方法必须要保证线程安全的获取同步状态。
  • 如果 tryAcquire(int arg) 返回 false 即获取同步状态失败,则调用 addWaiter(Node mode) 将当前线程加入到 CLH 同步队列尾部。
  • acquireQueued(final Node node, int arg) : 当前线程会根据公平性原则来进行阻塞等待,直到获取锁为止,并且返回当前线程在等待过程中有没有中断过。
  • selfInterrupt() 会产生一个中断。

acquireQueued(final Node node, int arg) 是一个自旋的过程,也就是说当前线程(Node) 进入同步队列后,当前线程会一直尝试获取同步状态,当前驱节点是头节点并且获取锁成功才会退出。其中只有其前驱节点为头结点才能够尝试获取同步状态,原因有二:1.保持 FIFO 同步队列原则。2.头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己是否为头节点。具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 中断标志
for (;;) {
//当前线程的前驱节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 前驱节点是头节点并且获取锁成功
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获取失败,线程等待 && 检查中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 置中断标志位true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

具体流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
【tryAcquire 获取锁--失败-->【addWaiter 生成 Node 节点并将当前线程加入 CLH 同步队列】
| |<-------------------------------
| *** |
| * |
| 【acquireQueued 判断前驱节点是否是头节点】--否-- 线程被中断或前驱节点被释放
| | | |
| 是 【acquireQueued 线程进入线程等待】
| | *
| *** ***
| * |
| 【acquireQueued 获取锁成功】---失败--
| |
| 成功
| |
| ***
| *
*** 【设置当前节点为头节点】
* |
结束<----------------------------

独占式中断获取同步状态

AQS 提供了 acquire(int arg) 以独占式获取同步状态,但是该方法对中断不响应,对线程进行中断操作后,该线程会依然位于 CLH 同步队列中等待着获取同步状态。为了响应中断,AQS 提供了 acquireInterruptibly(int arg) ,该方法在等待获取同步状态时,如果当前线程被中断了,会立刻响应中断抛出异常InterruptedException

1
2
3
4
5
6
7
8
9
10
11
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 校验该线程是否已经中断
if (Thread.interrupted())
// 抛出InterruptedException
throw new InterruptedException();
if (!tryAcquire(arg)) // 获取同步状态
// 获取失败,执行doAcquireInterruptibly(arg)
// 如果获取失败,线程等待 && 检查有中断则抛出InterruptedException,和 acquireQueued 置中断标志位true不一样
doAcquireInterruptibly(arg);
}

独占式超时获取同步状态

AQS 除了提供上面两个方法外,还提供了 tryAcquireNanos(int arg,long nanos)。该方法除了响应中断外,还有超时控制。即如果当前线程没有在指定时间内获取同步状态,则会返回 false,否则返回 true。

1
2
3
4
5
6
7
8
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 获取到直接返回,否则执行 doAcquireNanos(arg, nanosTimeout)
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

超时获取同步状态最终是在 doAcquireNanos(int arg, long nanosTimeout)中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 截止时间
final long deadline = System.nanoTime() + nanosTimeout;
// 生成 Node节点并当前线程加入 CLH 同步队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 前驱节点是头节点获取同步状态成功
setHead(node); // 设置当前节点为头节点
p.next = null; // help GC
failed = false;
return true;
}
// 剩余时间计算
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false; // 超时
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 获取失败,线程等待并且剩余时间大于 1000 纳秒
// 等待 nanosTimeout 纳秒,线程会直接从LockSupport.parkNanos中返回
LockSupport.parkNanos(this, nanosTimeout);
// 超时非常短nanosTimeout<=spinForTimeoutThreshold,AQS会进行无条件的快速自旋。
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

独占式释放同步状态

当线程获取同步状态后,执行完相应逻辑后就需要释放同步状态。AQS 提供了 release(int arg) 来释放同步状态。

1
2
3
4
5
6
7
8
9
10
11
public final boolean release(int arg) {
// 调用自定义同步器自定义的 tryRelease(int arg) 方法来释放同步状态
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}

共享式

共享式与独占式的最主要区别在于独占式同一时刻独占式只能有一个线程获取同步状态,而共享式在同一时刻可以有多个线程获取同步状态。

共享式同步状态获取

AQS 提供 acquireShared(int arg) 共享式获取同步状态。

1
2
3
4
5
6
public final void acquireShared(int arg) {
// 尝试获取同步状态,需要自定义同步组件实现
if (tryAcquireShared(arg) < 0)
// 自旋方式获取同步状态
doAcquireShared(arg);
}

doAcquireShared(int arg) 自旋方式获取同步状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doAcquireShared(int arg) {
// 创建共享式节点并将当前线程加入 CLH 同步队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) { // 前驱节点为头节点
// 尝试获取同步
int r = tryAcquireShared(arg);
if (r >= 0) { // 获取成功
// 设置当前节点为头节点并检查后续进程是否可以在共享模式下等待
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 获取失败线程等待并检查中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

acquireShared(int arg) 不响应中断,与独占式类似,AQS 也提供了响应中断、超时的方法,分别是:acquireSharedInterruptibly(int arg)tryAcquireSharedNanos(int arg,long nanos),与独占式类似。

共享式释放同步状态

获取同步状态后,需要调用 releaseShared(int arg) 释放同步状态。

1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
// 尝试释放同步状态,需要自定义同步组件实现
if (tryReleaseShared(arg)) {
// 自旋释放同步状态
doReleaseShared();
return true;
}
return false;
}

doReleaseShared() 自旋释放同步状态。因为可能会存在多个线程同时进行释放同步状态资源,所以需要确保同步状态安全地成功释放,一般都是通过 CAS (Conmpare And Swap) 和循环来完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}