ReentrantReadWriteLock解析

本文代码基于Java8

前言

ReentrantReadWriteLockLock 的另一种实现方式, ReentrantLock 是一个排他锁,同一时间只允许一个线程访问,而 ReentrantReadWriteLock 允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。

读写锁内部维护了两个锁,一个用于读操作,一个用于写操作。所有 ReadWriteLock 实现都必须保证 writeLock 操作的内存同步效果也要保持与相关 readLock 的联系。也就是说,成功获取读锁的线程会看到写入锁之前版本所做的所有更新。

ReentrantReadWriteLock 支持以下功能:

  • 支持公平和非公平的获取锁的方式;
  • 支持可重入。读线程在获取了读锁后还可以获取读锁,写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁;
  • 允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不允许的;
  • 读取锁和写入锁都支持锁获取期间的中断;
  • 支持 Condition 。仅写入锁提供了一个 Conditon 实现,读取锁不支持 ConditonreadLock().newCondition() 会抛出 UnsupportedOperationException

ReentrantReadWriteLock 类结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** 内部类提供读取锁 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 内部类提供写入锁 */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** 执行所有同步机制 */
final Sync sync;

// 锁默认是非公平模式
public ReentrantReadWriteLock() {
this(false);
}
// 支持指导锁公平模式
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
// 获取写入锁
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
// 获取读取锁
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
}

其中SyncFairSyncNonfairSyncReadLockWriteLockReentrantReadWriteLock 的内部类,Sync 继承自 AbstractQueuedSynchronizer ,而 FairSyncNonfairSync 继承 Sync,分别对应公平锁和非公平锁。ThreadLocalHoldCounterHoldCounterSync的内部类。

Sync 锁

Sync 也是一个继承于AQS的抽象类。Sync 也包括公平锁 FairSync 和非公平锁 NonfairSync。sync 对象是 FairSyncNonfairSync 中的一个,默认是 NonfairSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/** Lock 状态逻辑上分为两个无符号 short 类型:
* 1. 低位表示独占(writer)锁写入计数
* 2. 高位表示共享(reader)锁读取次数
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** 返回共享(reader)锁被持有次数 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回独占(writer)锁被持有次数 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// 持有计数器:每个线程持有锁的次数
static final class HoldCounter {
int count = 0;
// 为了避免垃圾滞留(garbage retention),使用 id 而不是引用
final long tid = getThreadId(Thread.currentThread());
}
// 本地线程持有锁的计数器
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 当前线程持有的可重入读锁数。每当线程的读取保持计数降至0时删除。
private transient ThreadLocalHoldCounter readHolds;
// 成功获取读锁的最后一个线程的保持计数器
private transient HoldCounter cachedHoldCounter;
// 第一个获取读锁的线程
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // 确保读取锁持有次数可见
}
// 如果当前线程尝试获取读锁,且在其他情况下有资格获取该锁,但由于超过其他等待线程的策略而应该阻塞,则返回true。
abstract boolean readerShouldBlock();

// 如果当前线程尝试获取写锁,且在其他情况下有资格获取该锁,但由于超过其他等待线程的策略而应该阻塞,则返回true。
abstract boolean writerShouldBlock();

// 释放写锁
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
// 获取写锁
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
Thread current = Thread.currentThread();
int c = getState(); // 获取当前状态
int w = exclusiveCount(c); // 获取写的状态
if (c != 0) { // 读状态不为0,读锁已获取
// 写状态为 0 或 当前线程不是独占线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 增加写状态,写锁获取成功
setState(c + acquires);
return true;
}
// 读锁未获取
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
// 当前线程应该阻塞或通过 CAS 更新 state 为 c + acquires 失败
return false;
// 设置当前线程为独占线程
setExclusiveOwnerThread(current);
return true;
}
// 释放读锁
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// 当前线程是第一个成功获取读锁的线程
if (firstReaderHoldCount == 1)
// 线程读锁数量为1,那么第一个成功获取读锁的线程置为null
firstReader = null;
else
// 线程读锁数量减一
firstReaderHoldCount--;
} else { // 从缓存中获取当前线程的读锁数量
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count; // 当前线程的读锁数量
if (count <= 1) {
// 从本地线程移除读锁线程
readHolds.remove();
if (count <= 0)
// 无读锁可以释放
throw unmatchedUnlockException();
}
// 线程读锁数量减一
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT; // state读锁状态减一
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0; // 如果state为0,表示无锁状态,返回true
}
}
// 获取读锁
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
int c = getState(); // 获取当前状态态
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
// 写状态不为 0 且当前线程是独占线程
return -1;
int r = sharedCount(c); // 获取读状态
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 读锁线程不阻塞、读取锁的共享计数未超过限制且通过 CAS 获取锁失败
if (r == 0) { // 读状态为 0
firstReader = current; //当前线程是第一个成功获取读锁的线程
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 当前线程是第一个成功获取读锁的线程
firstReaderHoldCount++; // 持有读锁数量加一
} else { // 从缓存中获取当前线程的读锁数量
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
// 缓存读锁为 null 或当前线程的 id 不等于缓存线程 id
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 将读锁线程加到本地线程
readHolds.set(rh);
rh.count++; // 持有读锁数量加一
}
return 1;
}
// 读锁线程阻塞等待
// 读取锁的共享计数超过限制
// 通过 CAS 获取锁失败
return fullTryAcquireShared(current);
}

// 如果读锁线程阻塞等待、读取锁的共享计数超过限制、通过 CAS 获取锁失败,则通过该方法获取读锁
final int fullTryAcquireShared(Thread current) {
//
HoldCounter rh = null;
for (;;) {
int c = getState(); // 获取当前状态
if (exclusiveCount(c) != 0) { // 写锁已经被获取
if (getExclusiveOwnerThread() != current)
// 当前线程不是独占线程
return -1;
} else if (readerShouldBlock()) { // 读锁线程阻塞等待
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else { // 当前线程不是 firstReader
if (rh == null) { // 获取缓存读锁数量
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 如果读锁数量超过 MAX_COUNT,则抛出异常。
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 通过 CAS 获取锁成功
if (sharedCount(c) == 0) { // 当前读锁数量为 0
firstReader = current; // 当前线程设置为 firstReader
firstReaderHoldCount = 1; // firstReader 计数初始化
} else if (firstReader == current) { // 当前线程是 firstReader
firstReaderHoldCount++; // firstReader 计数加一
} else { // 获取缓存读锁数量
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++; // 读锁数量加一
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
// 创建并返回 Condition 对象
final ConditionObject newCondition() {
return new ConditionObject();
}
// 获取读锁数量
final int getReadHoldCount() {
if (getReadLockCount() == 0)
return 0;
Thread current = Thread.currentThread();
// 当前线程是首次成功获取读锁的线程
if (firstReader == current)
return firstReaderHoldCount;
// 获取缓存读锁数量
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == getThreadId(current))
return rh.count;
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}
// 反序列化方法,将当前 state 重置为 0
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
readHolds = new ThreadLocalHoldCounter();
setState(0); // reset to unlocked state
}
}

公平锁和非公平锁实现

和互斥锁 ReentrantLock 一样,读写锁也分为公平锁和非公平锁。公平锁和非公平锁的区别,体现在判断是否需要阻塞的函数是不同的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
// 总是不阻塞
return false; // writers can always barge
}
final boolean readerShouldBlock() {
// 只要该非公平读锁对应的线程不为 null,则返回 true。
return apparentlyFirstQueuedIsExclusive();
}
}
// AQS 中实现
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
// 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
// 如果在当前线程的前面有其他线程在等待获取共享锁,则返回true;否则,返回false。
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
// 如果在当前线程的前面有其他线程在等待获取共享锁,则返回true;否则,返回false。
return hasQueuedPredecessors();
}
}
// AQS 中实现
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

readerShouldBlock 的本质就是在检测这次获取读锁资源的操作时,AQS 的等待队列中是否已经有写锁了。

  • 如果已经有写锁,那么要判断写锁是不是本线程,是本线程可以做锁降级。不是本线程就执行fullTryAcquireShared
  • 如果没有写锁,就可以继续执行,做 r<MAX_COUNT 判断。

对公平锁而言,!readerShouldBlock() 就是 !hasQueuedPredecessors()h == t || ((s = h.next)!=null && s.thread == Thread.currentThread())

  1. h==t 说明Node的等待队列为空
  2. (s = h.next)!=null && s.thread == Thread.currentThread() 说明等待队列中有值且是本线程申请锁资源。

满足以上2点的任何一个,可以申请读锁,继续执行下面的 r<MAX_COUNT 判断。

ReadLock 与 WriteLock

读锁

读锁是一个可重入的共享锁,获取读锁的思想(即 lock()的步骤),是先通过 tryAcquireShared() 尝试获取共享锁。尝试成功的话,则直接返回;尝试失败的话,则通过 doAcquireShared() 不断的循环并尝试获取锁,若有需要,则阻塞等待。doAcquireShared() 在循环中每次尝试获取锁时,都是通过 tryAcquireShared() 来进行尝试的。

释放读锁的思想(即 unlock()的步骤),是先通过 tryReleaseShared() 尝试释放共享锁。尝试成功的话,则通过doReleaseShared() 唤醒“其他等待获取共享锁的线程”,并返回true;否则的话,返回 flase。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
// 读锁构造函数
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 获取读锁,不感知中断
public void lock() {
// 参考 AQS 获取共享锁方法 acquireShared
sync.acquireShared(1);
}
//获取读锁,感知中断
public void lockInterruptibly() throws InterruptedException {
// 参考 AQS 获取共享锁方法 acquireSharedInterruptibly
sync.acquireSharedInterruptibly(1);
}
// 尝试获取锁
public boolean tryLock() {
return sync.tryReadLock();
}
// 尝试获取读锁
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState(); //当前状态
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
// 写锁已经被持有且当前线程不是独占线程
return false;
int r = sharedCount(c); // 获取读锁数量
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 通过 CAS 获取锁
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) { // 写锁未被获取
firstReader = current; // 设置当前线程为 firstReader
firstReaderHoldCount = 1; // firstReader 计数初始化
} else if (firstReader == current) { // 当前线程是 firstReader
firstReaderHoldCount++; // firstReader 计数加一
} else { // 从缓存获取读锁数量
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
// 尝试获取读锁,感知中断且有超时时间
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 释放读锁
public void unlock() {
sync.releaseShared(1);
}
// 创建并返回 Condition,读锁不支持该操作
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}

写锁

写锁是一个可重入的排它锁。如果当前线程获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
// 写锁构造函数
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 获取写锁,感知中断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// 获取写锁,不感知中断
public boolean tryLock( ) {
return sync.tryWriteLock();
}
// 获取写锁
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState(); //获取当前状态
if (c != 0) {
int w = exclusiveCount(c); // 获取写状态
if (w == 0 || current != getExclusiveOwnerThread())
// 写状态为0 或 当前线程不是独占线程
return false;
if (w == MAX_COUNT)
// 写状态超过 MAX_COUNT
throw new Error("Maximum lock count exceeded");
}
// 通过 CAS 获取锁失败
if (!compareAndSetState(c, c + 1))
return false;
// 设置当前线程为独占线程
setExclusiveOwnerThread(current);
return true;
}
// 获取写锁,感知中断且有超时时间
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
// 参考 AQS 的获取排它锁方法 tryAcquireNanos
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// 释放排它锁
public void unlock() {
// 参考 AQS 的释放排它锁方法 release
sync.release(1);
}
// 创建并返回 Condition 对象
public Condition newCondition() {
return sync.newCondition();
}
// 判断当前线程是否是独占线程
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
// 获取写锁数量
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}

使用示例

利用重入来执行升级缓存后的锁降级

锁降级指的是写锁降级成为读锁。锁降级是指把持住当前拥有的写锁的同时,再获取到读锁,随后释放写锁的过程。锁降级的意义在于:在一边读一边写的情况下感知数据变化并提高性能

  1. 首先写锁是独占的,读锁是共享的,然后读写锁是线程间互斥的,锁降级的前提是所有线程都希望对数据变化敏感,但是因为写锁只有一个,所以会发生降级。如果先释放写锁,再获取读锁,可能在获取之前,会有其他线程获取到写锁,阻塞读锁的获取,就无法感知数据变化了。所以需要先hold住写锁,保证数据无变化,获取读锁,然后再释放写锁。

  2. 如果长时间用写锁独占,对于某些高响应的应用是不允许的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class CachedData {
Object data;
volatile boolean cacheValid; //缓存是否有效
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
rwl.readLock().lock(); //获取读锁
//如果缓存无效,更新cache;否则直接使用data
if (!cacheValid) {
//获取写锁前须释放读锁
rwl.readLock().unlock();
rwl.writeLock().lock();
// Recheck state because another thread might have acquired
// write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
// 锁降级,在释放写锁前获取读锁
rwl.readLock().lock();
rwl.writeLock().unlock(); // Unlock write, still hold read
}

use(data);
rwl.readLock().unlock(); //释放读锁
}
}

使用 ReentrantReadWriteLock 来提高 Collection 的并发性

通常在 collection 数据很多,读线程访问多于写线程并且附带操作的开销高于同步开销时尝试这么做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class RWDictionary {
private final Map<String, Data> m = new HashMap<>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock(); //读锁
private final Lock w = rwl.writeLock(); //写锁

public Data get(String key) {
r.lock();
try {
return m.get(key);
} finally {
r.unlock();
}
}

public String[] allKeys() {
r.lock();
try {
return m.keySet().toArray(new String[0]);
} finally {
r.unlock();
}
}

public Data put(String key, Data value) {
w.lock();
try {
return m.put(key, value);
} finally {
w.unlock();
}
}

public void clear() {
w.lock();
try {
m.clear();
} finally {
w.unlock();
}
}
}

总结

相对于排他锁,读写锁提高了并发性。在实际应用中,大部分情况下对共享数据(如缓存)的访问都是读操作远多于写操作,这时 ReentrantReadWriteLock 能够提供比排他锁更好的并发性和吞吐量。