ReentrantLock解析

本文代码基于Java8

前言

Reentrant 可重入的;重入;可再入的。即再次进入的意思,entrant: 新职员;新生;新会员;新成员。

JDK 中独占锁的实现除了使用关键字 synchronized 外,还可以使用 ReentrantLock。虽然在性能上ReentrantLocksynchronized 没有什么区别,但 ReentrantLock 相比 synchronized 而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。即 ReentrantLock 粒度更小。

  • ReentrantLock 是独占锁且可重入的。
  • ReentrantLock 可以实现公平锁。
  • ReentrantLock 可以响应中断。
  • ReentrantLock 提供了获取锁超时等待。
  • ReentrantLock 结合 Condition 接口可以实现线程间的等待通知机制。

ReentrantLock 类结构

1
2
3
4
5
6
7
8
9
10
11
public class ReentrantLock implements Lock, java.io.Serializable {
// 默认非公平可重入锁
public ReentrantLock() {
sync = new NonfairSync();
}

// 指定可重入锁公平模式
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
}

其中SyncFairSyncNonfairSyncReentrantLock 的内部类,Sync 继承自 AbstractQueuedSynchronizer ,而 FairSyncNonfairSync 继承 Sync,分别对应公平锁和非公平锁。

Sync 内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
// 执行锁操作,由子类实现
abstract void lock();

// 执行非公平 tryLock()
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 利用 CAS 来更新 state 的状态
if (compareAndSetState(0, acquires)) {
// 设置当前线程拥有独占访问权限
setExclusiveOwnerThread(current);
return true;
}
}
// 获取拥有独占访问权限的线程和当前线程比较
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
// 溢出
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置新状态
setState(nextc);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 拥有独占访问权限的线程不是当前线程,抛出 IllegalMonitorStateException
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// 清空拥有独占访问权限的线程
setExclusiveOwnerThread(null);
}
// 设置新状态
setState(c);
return free;
}
// 当前线程是否拥有独占访问权限
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 创建 Condition 对象
final ConditionObject newCondition() {
return new ConditionObject();
}

// 获取拥有独占访问权限的线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 获取当前状态
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
// 是否锁住
final boolean isLocked() {
return getState() != 0;
}

// 解序列化,将状态重置为 0,即锁释放状态
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

NonfairSync、FairSync 内部类

线程会重复获取锁。如果申请获取锁的线程足够多,那么可能会造成某些线程长时间得不到锁。这就是非公平锁的“饥饿”问题。

公平锁是指当锁可用时,在锁上等待时间最长的线程将获得锁的使用权,而非公平锁则随机分配这种使用权。和synchronized 一样,默认的 ReentrantLock 实现是非公平锁,因为相比公平锁,非公平锁性能更好。当然公平锁能防止饥饿,某些情况下也很有用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 锁实现
final void lock() {
// 利用 CAS 来更新 state 的状态为1
if (compareAndSetState(0, 1))
// 设置当前线程拥有独占访问权限,随机分配
setExclusiveOwnerThread(Thread.currentThread());
else
// AQS 的 acquire()
acquire(1);
}
// 参考上文 Sync 的 nonfairTryAcquire 实现
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

// 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 锁实现
final void lock() {
// AQS 的 acquire()
acquire(1);
}

// 尝试获取锁,和 AQS 的 nonfairTryAcquire 相比,多了判断队列有没有前置节点。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 队列没有前置节点,且成功利用 CAS 来更新 state 的状态为 acquires
//
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 设置当前线程拥有独占访问权限
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

ReentrantLock 提供的方法

ReentrantLock 锁方法

ReentrantLocksynchronized 都是独占锁。不同的是 ReentrantLock 需要手动加锁和解锁,且解锁的操作尽量要放在 finally 代码块中,保证线程正确释放锁。ReentrantLock 操作较为复杂,但是因为可以手动控制加锁和解锁过程,在复杂的并发场景中能派上用场。synchronized 加锁解锁的过程是隐式的,用户不用手动操作,优点是操作简单,但显得不够灵活。一般并发场景使用 synchronized 就足够。

ReentrantLocksynchronized 都是可重入的。synchronized 因为可重入,所以可以放在被递归执行的方法上,且不用担心线程最后能否正确释放锁。而 ReentrantLock 在重入时要却确保重复获取锁的次数必须和重复释放锁的次数一样,否则可能导致其他线程无法获得该锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 加锁,分别调用公平锁和非公平锁的加锁方法
public void lock() {
sync.lock();
}

// 中断敏感的加锁,参考 AQS 的 acquireInterruptibly 方法
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

// 非公平模式加锁,参考 Sync 的 nonfairTryAcquire 方法
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

// 尝试加锁直至超时,参考 AQS 的 tryAcquireNanos 方法
// 使用该方法配合失败重试机制可以用来解决死锁问题
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

// 解锁
public void unlock() {
sync.release(1);
}

Condition 相关方法

ConditionReentrantLock 对象创建,并且可以同时创建多个。Condition 接口在使用前必须先调用ReentrantLocklock() 方法获得锁。之后调用 Condition 接口的 await() 将释放锁,并且在该Condition 上等待,直到有其他线程调用 Conditionsignal() 方法唤醒线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 创建 Condition
public Condition newCondition() {
return sync.newCondition();
}
// 查询在给定条件下,是否有线程在等待
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}

// 查询在给定条件下,等待线程队列的长度
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}

// 查询在给定条件下,所有等待线程
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}

除了锁方法和 Condition 相关方法外,还有一些判断是否是公平锁、是否加锁、获取阻塞队列长度、当前队列是否独占等方法,比较简单参考源码即可。

ReentrantLock 使用示例

基于官方示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockTest {
// 公平锁
public static Lock lock = new ReentrantLock(true);

public static void main(String[] args) {

for (int i = 0; i < 5; i++) {
new Thread(new ThreadDemo(i)).start();
}

}

static class ThreadDemo implements Runnable {
Integer id;

public ThreadDemo(Integer id) {
this.id = id;
}

@Override

public void run() {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 2; i++) {
lock.lock();
System.out.println("获得锁的线程:" + id);
lock.unlock();
}
}
}
}

如果是公平锁,线程几乎是轮流的获取到了锁。

1
2
3
4
5
6
7
8
9
10
获得锁的线程:1
获得锁的线程:2
获得锁的线程:3
获得锁的线程:1
获得锁的线程:2
获得锁的线程:3
获得锁的线程:4
获得锁的线程:4
获得锁的线程:0
获得锁的线程:0

如果是非公平锁,线程会重复获取锁。

1
2
3
4
5
6
7
8
9
10
获得锁的线程:4
获得锁的线程:4
获得锁的线程:1
获得锁的线程:1
获得锁的线程:2
获得锁的线程:2
获得锁的线程:3
获得锁的线程:3
获得锁的线程:0
获得锁的线程:0

结合 Condition 实现等待通知机制

使用 synchronized 结合 Object 上的 wait()notify() 方法可以实现线程间的等待通知机制。ReentrantLock 结合 Condition 接口同样可以实现这个功能,而且相比前者使用起来更清晰也更简单。

使用Condition 实现简单的阻塞队列,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionBlockingQueue<E> {
// 阻塞队列最大容量
int size;
// 可重入锁
ReentrantLock lock = new ReentrantLock();
// 队列底层实现
LinkedList<E> list = new LinkedList<>();
// 队列满时的等待条件
Condition notFull = lock.newCondition();
// 队列空时的等待条件
Condition notEmpty = lock.newCondition();

public ConditionBlockingQueue(int size) {
this.size = size;
}

public void enqueue(E e) throws InterruptedException {
lock.lock();
try {
// 队列已满,在 notFull 条件上等待
while (list.size() == size) {
notFull.await();
}
list.add(e); // 加入链表末尾
notEmpty.signal(); // 通知在 notEmpty 条件上等待的线程
} finally {
lock.unlock();
}
}

public E dequeue() throws InterruptedException {
E e;
lock.lock();
try {
// 队列为空,在 notEmpty 条件上等待
while (list.size() == 0) {
notEmpty.await();
}

e = list.removeFirst(); // 移除链表首元素
notFull.signal(); // 通知在 notFull 条件上等待的线程
return e;
} finally {
lock.unlock();
}
}
}

总结

ReentrantLock 是可重入的独占锁。比 synchronized 功能更加丰富,控制粒度更小,支持公平锁实现,支持中断响应以及超时等待…… ReentrantLock 还可以配合一个或多个 Condition 条件方便的实现等待通知机制。不过需要手动加锁和解锁,且解锁的操作尽量要放在 finally 代码块中,保证线程正确释放。