StampedLock

本文代码基于Java8

前言

读写锁如果使用不当,容易出现“饥饿”问题,比如在读线程非常多,写线程非常少的情况下,很容易导致写线程“饥饿”。虽然公平策略在一定程度上可以缓解这个问题,但是鱼与熊掌不可兼得,公平策略是以牺牲系统吞吐量为代价的。

于是StampedLock 类应运而生,在 JDK1.8 时引入,是对读写锁 ReentrantReadWriteLock 的增强,该类提供了一些功能,优化了读锁、写锁的访问,同时使读写锁之间可以互相转换,可以更细粒度控制并发。

不过该类的设计初衷是作为一个内部工具类,用于辅助开发其它线程安全组件。用得好,该类可以提升系统性能,用不好,容易产生死锁和其它莫名其妙的问题。

StampedLock 类结构

StampedLock 类结构如下:

StampedLock 的内部类包括四个,分别为 WriteLockViewReadLockViewReadWriteLockView 以及 WNode

StampedLock 底层实现

StampedLock 虽然不像其它锁一样定义了内部类来实现 AQS 框架,但是 StampedLock 的基本实现思路还是利用 CLH 队列进行线程的管理,通过同步状态值来表示锁的状态和类型。

StampedLock 把读分为了悲观读和乐观读,悲观读就等价于 ReadWriteLock 的读,而乐观读在一个线程写共享变量时,不会被阻塞,乐观读是不加锁的。

StampedLock 内部定义了很多常量,定义这些常量的根本目的和 ReentrantReadWriteLock 一样,对同步状态值按位切分,以通过位运算对State进行操作

对于 StampedLock 来说,写锁被占用的标志是第8位为1,读锁使用0-7位,正常情况下读锁数目为1-126,超过126时,使用一个名为的 readerOverflow int整型保存超出数。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class StampedLock implements java.io.Serializable {
private static final long serialVersionUID = -6001602636862214147L;
/** CPU 核数,用于控制自旋次数 */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/** 尝试获取锁失败次数超过该值,则将其加入等待队列 */
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
/** 等待队列首节点自旋获取锁失败次数超过该值,会继续阻塞 */
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
/** 等待队列首节点再次阻塞之前的最大尝试次数 */
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
/** 等待自旋锁溢出时的 yield 周期 */
private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1
/** 在读锁数量溢出之前的 bit 位数 */
private static final int LG_READERS = 7;

// 锁状态和标记操作的位常量
private static final long RUNIT = 1L; // 一单位读锁
private static final long WBIT = 1L << LG_READERS; // 写状态标识
private static final long RBITS = WBIT - 1L; // 读状态标识
private static final long RFULL = RBITS - 1L; // 读锁的最大数量
private static final long ABITS = RBITS | WBIT; // 用于获取读写状态
private static final long SBITS = ~RBITS; // note overlap with ABITS

// 锁状态初始值,避免初始化为失败值 0
private static final long ORIGIN = WBIT << 1;

// 取消获取方法返回的特殊值,以便调用方可以抛出 InterruptedException
private static final long INTERRUPTED = 1L;

// 节点状态值,和顺序有关
private static final int WAITING = -1;
private static final int CANCELLED = 1;

// 节点的模式,使用 int 而不是 boolean,方便计算
private static final int RMODE = 0;
private static final int WMODE = 1;

/** Wait nodes */
static final class WNode {
volatile WNode prev;
volatile WNode next;
volatile WNode cowait; // list of linked readers
volatile Thread thread; // non-null while possibly parked
volatile int status; // 0, WAITING, or CANCELLED
final int mode; // RMODE or WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}

/** CLH 队列的头节点 */
private transient volatile WNode whead;
/** CLH 队列的尾节点 */
private transient volatile WNode wtail;

// 视图
transient ReadLockView readLockView;
transient WriteLockView writeLockView;
transient ReadWriteLockView readWriteLockView;

/** 锁状态 */
private transient volatile long state;
/** 读锁饱和时的额外读锁计数 */
private transient int readerOverflow;

// 创建一个 StampedLock,初始是 unlocked 状态
public StampedLock() {
state = ORIGIN;
}
}

三种锁视图

三种锁视图分别为读锁视图、写锁视图、读写锁(悲观读)视图。这些视图其实是对 StamedLock 方法的封装,便于习惯了 ReentrantReadWriteLock 的用户使用。具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 读锁视图
final class ReadLockView implements Lock {
// 非独占获取锁,如果需要会阻塞到可用
public void lock() { readLock(); }
public void lockInterruptibly() throws InterruptedException {
readLockInterruptibly();
}
// 尝试获取锁
public boolean tryLock() { return tryReadLock() != 0L; }
// 尝试获取锁,带超时时间
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return tryReadLock(time, unit) != 0L;
}
// 无标识释放锁
public void unlock() { unstampedUnlockRead(); }
// 不支持 Condition
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
// 写锁视图
final class WriteLockView implements Lock {
public void lock() { writeLock(); }
public void lockInterruptibly() throws InterruptedException {
writeLockInterruptibly();
}
public boolean tryLock() { return tryWriteLock() != 0L; }
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return tryWriteLock(time, unit) != 0L;
}
public void unlock() { unstampedUnlockWrite(); }
// 不支持 Condition
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
// 读写锁(悲观读)视图
final class ReadWriteLockView implements ReadWriteLock {
// 获取读锁
public Lock readLock() { return asReadLock(); }
// 获取写锁
public Lock writeLock() { return asWriteLock(); }
}
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}

ReadWriteLock 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 返回读锁
public Lock asReadLock() {
ReadLockView v;
return ((v = readLockView) != null ? v :
(readLockView = new ReadLockView()));
}

// 返回写锁
public Lock asWriteLock() {
WriteLockView v;
return ((v = writeLockView) != null ? v :
(writeLockView = new WriteLockView()));
}

使用示例

在使用 StampedLock 的时候,建议这样操作:乐观读时,如果有写操作修改了共享变量则升级乐观读为悲观读锁,因为这样可用避免乐观读反复的循环等待写锁的释放,造成 CPU 资源的浪费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();

void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock(); //获取写锁
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp); //释放写锁
}
}

double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead(); //乐观读
double currentX = x, currentY = y;
if (!sl.validate(stamp)) { //判断共享变量是否已经被其他线程写过
stamp = sl.readLock(); //如果被写过则升级为悲观读锁
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp); //释放悲观读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}

void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock(); //获取读锁
try {
while (x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp); //升级为写锁
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
}
else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
}

StampedLock 获取锁和释放锁的实现

StampedLock 在获取锁和乐观读时,都会返回一个 stamp,解锁时需要传入这个 stamp,在乐观读时是用来验证共享变量是否被其他线程写过。

StampedLock 中,等待队列的结点要比AQS中简单些,仅仅三种状态。

  1. 初始状态
  2. 等待中
  3. 取消

获取写锁

写锁(独占锁),如果获取失败则进入阻塞队列,不响应中断。返回非 0 代表成功。

1
2
3
4
5
6
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}

(s = state) & ABITS) == 0L 为 true 代表读锁与写锁均未被使用,compareAndSwapLong 表示通过 CAS 更新同步状态值 state。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public long tryWriteLock() {
long s, next;
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : 0L);
}

public long tryWriteLock(long time, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(time);
if (!Thread.interrupted()) {
long next, deadline;
if ((next = tryWriteLock()) != 0L)
return next;
if (nanos <= 0L)
return 0L;
if ((deadline = System.nanoTime() + nanos) == 0L)
deadline = 1L;
if ((next = acquireWrite(true, deadline)) != INTERRUPTED)
return next;
}
throw new InterruptedException();
}

public long writeLockInterruptibly() throws InterruptedException {
long next;
if (!Thread.interrupted() &&
(next = acquireWrite(true, 0L)) != INTERRUPTED)
return next;
throw new InterruptedException();
}

获取悲观读锁

悲观读锁(非独占锁),为获得锁一直处于阻塞状态,直到获得锁为止,不响应中断。返回非 0 代表成功。

1
2
3
4
5
6
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}

whead == wtail 代表队列为空 ,s & ABITS) < RFULL 没有写锁且读锁数小于 126 。compareAndSwapLong 表示通过 CAS 更新同步状态值 state。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public long tryReadLock() {
for (;;) {
long s, m, next;
if ((m = (s = state) & ABITS) == WBIT)
return 0L;
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
}

public long tryReadLock(long time, TimeUnit unit)
throws InterruptedException {
long s, m, next, deadline;
long nanos = unit.toNanos(time);
if (!Thread.interrupted()) {
if ((m = (s = state) & ABITS) != WBIT) {
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
if (nanos <= 0L)
return 0L;
if ((deadline = System.nanoTime() + nanos) == 0L)
deadline = 1L;
if ((next = acquireRead(true, deadline)) != INTERRUPTED)
return next;
}
throw new InterruptedException();
}

public long readLockInterruptibly() throws InterruptedException {
long next;
if (!Thread.interrupted() &&
(next = acquireRead(true, 0L)) != INTERRUPTED)
return next;
throw new InterruptedException();
}

获取乐观读锁

相对于悲观读锁来说的,在操作数据前并没有通过 CAS 设置锁的状态,仅仅是通过位运算测试。如果当前没有线程持有写锁,则简单的返回一个非 0 的 stamp 版本信息。由于 tryOptimisticRead 并没有使用 CAS 设置锁状态,所以不需要显示的释放该锁。

1
2
3
4
public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

获取该 stamp 后在具体操作数据前还需要调用 validate 验证下该 stamp 是否已经不可用,也就是看当调用 tryOptimisticRead 返回 stamp 后,到当前时间是否有其它线程持有了写锁。如果是那么 validate 会返回 0,否者就可以使用该 stamp 版本的锁对数据进行操作。

1
2
3
4
public boolean validate(long stamp) {
U.loadFence();
return (stamp & SBITS) == (state & SBITS);
}

使用乐观读锁还是很容易犯错误的,必须要严谨。需要遵循相应的调用模板(判断共享变量是否已经被其他线程写过,如果被写过则升级为悲观读锁),防止出现数据不一致的问题。

释放锁

锁释放需要锁状态和 stamp 匹配,才能释放对应的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// 如果锁状态和 stamp 匹配,则释放写锁
public void unlockWrite(long stamp) {
WNode h;
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
if ((h = whead) != null && h.status != 0)
release(h);
}

// 如果锁状态和 stamp 匹配,则释放读锁
public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) {
if (((s = state) & SBITS) != (stamp & SBITS) ||
(stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException();
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
break;
}
}
else if (tryDecReaderOverflow(s) != 0L)
break;
}
}

// 如果锁状态和 stamp 匹配,则释放相关模式的锁
public void unlock(long stamp) {
long a = stamp & ABITS, m, s; WNode h;
while (((s = state) & SBITS) == (stamp & SBITS)) {
if ((m = s & ABITS) == 0L)
break;
else if (m == WBIT) {
if (a != m)
break;
state = (s += WBIT) == 0L ? ORIGIN : s;
if ((h = whead) != null && h.status != 0)
release(h);
return;
}
else if (a == 0L || a >= WBIT)
break;
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return;
}
}
else if (tryDecReaderOverflow(s) != 0L)
return;
}
throw new IllegalMonitorStateException();
}

释放读锁、写锁的 “try” 版本,即“一次性”版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public boolean tryUnlockWrite() {
long s; WNode h;
if (((s = state) & WBIT) != 0L) {
state = (s += WBIT) == 0L ? ORIGIN : s;
if ((h = whead) != null && h.status != 0)
release(h);
return true;
}
return false;
}

public boolean tryUnlockRead() {
long s, m; WNode h;
while ((m = (s = state) & ABITS) != 0L && m < WBIT) {
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return true;
}
}
else if (tryDecReaderOverflow(s) != 0L)
return true;
}
return false;
}

锁转换

StampedLock 读写锁之间可以互相转换,可以更细粒度控制并发。包括三种转换:

  1. 转换成写锁
  2. 转换成悲观读锁
  3. 转换成乐观读锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// 尝试转换成写锁
public long tryConvertToWriteLock(long stamp) {
long a = stamp & ABITS, m, s, next;
while (((s = state) & SBITS) == (stamp & SBITS)) {
if ((m = s & ABITS) == 0L) {
if (a != 0L)
break;
if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
return next;
}
else if (m == WBIT) {
if (a != m)
break;
return stamp;
}
else if (m == RUNIT && a != 0L) {
if (U.compareAndSwapLong(this, STATE, s,
next = s - RUNIT + WBIT))
return next;
}
else
break;
}
return 0L;
}

// 尝试转换成悲观读锁
public long tryConvertToReadLock(long stamp) {
long a = stamp & ABITS, m, s, next; WNode h;
while (((s = state) & SBITS) == (stamp & SBITS)) {
if ((m = s & ABITS) == 0L) {
if (a != 0L)
break;
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
else if (m == WBIT) {
if (a != m)
break;
state = next = s + (WBIT + RUNIT);
if ((h = whead) != null && h.status != 0)
release(h);
return next;
}
else if (a != 0L && a < WBIT)
return stamp;
else
break;
}
return 0L;
}

// 尝试转换成乐观读锁
public long tryConvertToOptimisticRead(long stamp) {
long a = stamp & ABITS, m, s, next; WNode h;
U.loadFence();
for (;;) {
if (((s = state) & SBITS) != (stamp & SBITS))
break;
if ((m = s & ABITS) == 0L) {
if (a != 0L)
break;
return s;
}
else if (m == WBIT) {
if (a != m)
break;
state = next = (s += WBIT) == 0L ? ORIGIN : s;
if ((h = whead) != null && h.status != 0)
release(h);
return next;
}
else if (a == 0L || a >= WBIT)
break;
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return next & SBITS;
}
}
else if ((next = tryDecReaderOverflow(s)) != 0L)
return next & SBITS;
}
return 0L;
}

性能对比

ReadWritLock相比,在一个线程情况下,是读速度其4倍左右,写是1倍。

ReadWritLock相比,六个线程情况下,读性能是其几十倍,写性能也是近10倍左右。

ReadWritLock相比,吞吐量提高

总结

StampedLock 提供的读写锁与 ReentrantReadWriteLock 类似,前者不支持重入,不支持条件变量,也就是没 Condition。不过前者通过提供乐观读锁在多线程多读的情况下能提供更好的性能,这是因为获取乐观读锁时候不需要进行 CAS 操作设置锁的状态,而只是简单的测试状态。

writeLock() 或者 readLock() 获得锁之后,线程还没执行完就被 interrupt() 的话,会导致 CPU 飙升。

另外,StampedLock 使用时要特别小心,避免锁重入的操作,在使用乐观读锁时也需要遵循相应的调用模板,防止出现数据不一致的问题。