Semaphore解析

前言

Semaphore 是JDK1.5的 java.util.concurrent 并发包中提供的一个并发工具类。所谓 Semaphore 即 信号量 的意思。这个叫法并不能很好地表示它的作用,更形象的说法应该是 许可证管理器 。

Semaphore 是一个计数信号量。

  • 从概念上将,Semaphore 包含一组许可证。
  • 如果有需要的话,每个 acquire() 方法都会阻塞,直到获取一个可用的许可证。
  • 每个 release() 方法都会释放持有许可证的线程,并且归还 Semaphore 一个可用的许可证。
  • 实际上并没有真实的许可证对象供线程使用,Semaphore 只是对可用的数量进行管理维护。

Semaphore类结构

Semaphore 类结构如下:

其中SyncFairSyncNonfairSyncSemaphore 的内部类。FairSyncNonfairSync 均继承自 Sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// NonFair version
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
// 共享式获取同步状态
protected int tryAcquireShared(int acquires) {
// 无论当前线程是不是在 CLH 队列的头部,它都会直接获取信号量。
return nonfairTryAcquireShared(acquires);
}
}

// Fair version
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
// 共享式获取同步状态
protected int tryAcquireShared(int acquires) {
// 如果当前线程不在 CLH 队列的头部,则排队等候。
for (;;) {
if (hasQueuedPredecessors())
// 判断当前线程是不是在 CLH 队列的头部
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

Sync 继承自 AbstractQueuedSynchronizer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}

final int getPermits() {
return getState();
}
// 非公平模式共享式获取同步状态
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 共享式释放同步状态
// “非公平信号量许可的释放(release)”与“公平信号量许可的释放(release)”是一样的。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// 减少许可数
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 获取并返回立即可用的所有许可
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}

Semaphore 构造方法

构造方法分为两种,如下:

1
2
3
4
5
6
7
8
// 创建具有给定的许可数和非公平模式的 Semaphore
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 创建具有给定的许可数和指定是否公平模式的 Semaphore
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore 获取、释放许可的方法

获取、释放许可的方法分为两大类,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/** 
* 阻塞方法:
* 1. 无许可能够获得,则会一直等待,直到获得许可。
* 2. 在释放许可之前,必须先获获得许可。
*/
public void acquire() throws InterruptedException; // 获取一个许可
public void acquire(int permits) throws InterruptedException; // 获取permits个许可
public void release(); // 释放一个许可
public void release(int permits); // 释放permits个许可

/**
* 非阻塞方法:
* 立即获取执行结果
*/
//尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire();
//尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException;
//尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(int permits);
//尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException;

Semaphore的其他方法

其他方法包括获取当前可用许可数、获取队列正在等待许可的线程数目、是否是公平模式等,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 返回此信号量中当前可用的许可数。
public int availablePermits();
// 返回正在等待获取的线程的估计数目。
public int getQueueLength();
// 获取并返回立即可用的所有许可。
public int drainPermits();
// 查询是否有线程正在等待获取。
public boolean hasQueuedThreads();
// 如果此信号量的公平设置为 true,则返回 true。
public boolean isFair();
// 返回标识此信号量的字符串,以及信号量的状态。
public String toString();

// 返回一个 collection,包含可能等待获取的线程。
protected Collection<Thread> getQueuedThreads();
// 根据指定的缩减量减小可用许可的数目。
protected void reducePermits(int reduction);

Semaphore的使用

Semaphore 经常用于限制获取某种资源的线程数量。

以请求总数为 6,并发执行的线程数为 2 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class SemaphoreDemo {
// 请求总数
public static int clientTotal = 6;
// 并发执行的线程数
public static int threadTotal = 2;
// 随机数
public static Random random = new Random();

public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire(1);
System.out.println("处理请求 " + count + " 前,当前许可证数量:" + semaphore.availablePermits());
resolve(count);
System.out.println("处理请求 " + count + " 中,正在等待许可证的请求数量:" + semaphore.getQueueLength());
semaphore.release(1);
System.out.println("处理请求 " + count + " 后,当前许可证数量:" + semaphore.availablePermits());
} catch (Exception e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}

private static void resolve(int i) throws InterruptedException {
System.out.println("请求 " + i + " 开始处理");
int r = random.nextInt(i + 3) + 2;
System.out.println("请求 " + i + "处理" + r + "秒");
Thread.sleep(1000 * r);
System.out.println("请求 " + i + " 结束处理");
}
}

执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
处理请求 0 前,当前许可证数量:0
处理请求 1 前,当前许可证数量:0
请求 0 开始处理
请求 1 开始处理
请求 0处理4秒
请求 1处理2秒
请求 1 结束处理
处理请求 1 中,正在等待许可证的请求数量:4
处理请求 1 后,当前许可证数量:1
处理请求 2 前,当前许可证数量:0
请求 2 开始处理
请求 2处理5秒
请求 0 结束处理
处理请求 0 中,正在等待许可证的请求数量:3
处理请求 0 后,当前许可证数量:1
处理请求 3 前,当前许可证数量:0
请求 3 开始处理
请求 3处理6秒
请求 2 结束处理
处理请求 2 中,正在等待许可证的请求数量:2
处理请求 2 后,当前许可证数量:1
处理请求 4 前,当前许可证数量:0
请求 4 开始处理
请求 4处理2秒
请求 4 结束处理
处理请求 4 中,正在等待许可证的请求数量:1
处理请求 4 后,当前许可证数量:1
处理请求 5 前,当前许可证数量:0
请求 5 开始处理
请求 5处理7秒
请求 3 结束处理
处理请求 3 中,正在等待许可证的请求数量:0
处理请求 3 后,当前许可证数量:1
请求 5 结束处理
处理请求 5 中,正在等待许可证的请求数量:0
处理请求 5 后,当前许可证数量:2

当释放许可参数变成 2,即从 semaphore.release(1) 变成 semaphore.release(2)。执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
处理请求 0 前,当前许可证数量:0
请求 0 开始处理
请求 0处理4秒
处理请求 1 前,当前许可证数量:0
请求 1 开始处理
请求 1处理3秒
请求 1 结束处理
处理请求 1 中,正在等待许可证的请求数量:4
处理请求 1 后,当前许可证数量:2
处理请求 2 前,当前许可证数量:1
请求 2 开始处理
处理请求 3 前,当前许可证数量:0
请求 3 开始处理
请求 3处理4秒
请求 2处理5秒
请求 0 结束处理
处理请求 0 中,正在等待许可证的请求数量:2
处理请求 0 后,当前许可证数量:2
处理请求 4 前,当前许可证数量:1
请求 4 开始处理
处理请求 5 前,当前许可证数量:0
请求 5 开始处理
请求 5处理8秒
请求 4处理2秒
请求 4 结束处理
处理请求 4 中,正在等待许可证的请求数量:0
处理请求 4 后,当前许可证数量:2
请求 3 结束处理
处理请求 3 中,正在等待许可证的请求数量:0
处理请求 3 后,当前许可证数量:4
请求 2 结束处理
处理请求 2 中,正在等待许可证的请求数量:0
处理请求 2 后,当前许可证数量:6
请求 5 结束处理
处理请求 5 中,正在等待许可证的请求数量:0
处理请求 5 后,当前许可证数量:8