Phaser的使用

本文代码基于Java8

前言

Phaser 移相器是一个可重用的同步屏障,功能上与 CyclicBarrierCountDownLatch 类似,但是支持更灵活的使用,用来解决控制多个线程分阶段共同完成任务的情景问题。

Phaser类结构

Phaser
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Phaser {
/**
* 主要状态, 占用4个字节:
* unarrived: 还未到达的参与者数目 bits 0-15
* parties: 当前阶段总的参与者数目 bits 16-31
* phase: 屏障所处的阶段 bits 32-62
* terminated: 屏障是否终止 bit 63 / sign)
*/
private volatile long state;
// 最大的参与者数目
private static final int MAX_PARTIES = 0xffff;
// 最大的阶段值
private static final int MAX_PHASE = Integer.MAX_VALUE;
// 参与者移位
private static final int PARTIES_SHIFT = 16;
// 阶段移位
private static final int PHASE_SHIFT = 32;
// 未到达参与者数掩码
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
// 总参与者数掩码
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
private static final long COUNTS_MASK = 0xffffffffL;
// 终止位
private static final long TERMINATION_BIT = 1L << 63;
// 一个到达者
private static final int ONE_ARRIVAL = 1;
// 一个参与者
private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
// 撤销一个参与者
private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
// 0 个参与者,1 个达到者
private static final int EMPTY = 1;
/** 父 Phaser */
private final Phaser parent;
/** Phaser树的root */
private final Phaser root;
// 阶段值为偶数时,Treiber 栈节点,阻塞线程驻留在节点上
private final AtomicReference<QNode> evenQ;
// 阶段值为奇数时,Treiber 栈节点,阻塞线程驻留在节点上
private final AtomicReference<QNode> oddQ;
}

构造函数

创建具有父 Phaser 和参与数的新 Phaser。当给定的父 Phaser 非空且给定的参与数大于零时,该子 Phaser 将注册到其父 Phaser。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* @param parent 父Phaser
* @param parties 需要推到下一阶段的参与数
* @throws IllegalArgumentException 假如 parties 小于0或者大于所支持的最大参与数
*/
public Phaser(Phaser parent, int parties) {
if (parties >>> PARTIES_SHIFT != 0) // 校验参与数
throw new IllegalArgumentException("Illegal number of parties");
int phase = 0;
this.parent = parent;
if (parent != null) {
final Phaser root = parent.root;
this.root = root;
this.evenQ = root.evenQ;
this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1);
}
else {
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}

/** 父 Phaser 为 null,且需要推到下一阶段的参与数为0 */
public Phaser() {
this(null, 0);
}

/** 父 Phaser 为 null */
public Phaser(int parties) {
this(null, parties);
}

/** 需要推到下一阶段的参与数为0 */
public Phaser(Phaser parent) {
this(parent, 0);
}

使用示例

Phaser 替代 CyclicBarrier 比较简单,CyclicBarrierawait() 方法可以直接用 PhaserarriveAndAwaitAdvance() 方法替代。CyclicBarrier 只适用于固定数量的参与者,而 Phaser 适用于可变数目的屏障。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class PhaserDemo {
private final static int NUM = 3;
private static final Random random = new Random(10);
public static void main(String[] args) {
final Phaser phaser = new Phaser(NUM); // 使用CyclicBarrier
for (int i = 0; i < NUM; i++) {
new Task(phaser).start();
}
}

static class Task extends Thread {
private Phaser barrier;

public Task(Phaser barrier) {
this.barrier = barrier;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 出发!");
try {
Thread.sleep(1000 * random.nextInt(5));
System.out.println(Thread.currentThread().getName() + " 已经上车,等待其他线程上车!");
barrier.arriveAndAwaitAdvance(); // 使用await()
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

常用方法

phaser.getPhase() 初始值为0,如果全部线程执行完则+1,如果 phaser.getPhase() 达到 Integer 的最大值,这重新清空为0。
phaser.arriveAndDeregister() 表示线程到达后离开。
phaser.arriveAndAwaitAdvance() 表示线程在等待其他线程。
phaser.bulkRegister(friendNum) 表示临时加进来几个线程。

这些方法的具体实现相对复杂,了解这些方法的作用会熟练使用即可。

有个例子很好的解释了这些方法的作用,更多可以参考 java多线程之Phaser

假如有这么一个场景,在旅游过程中,有可能很凑巧遇到几个朋友,然后他们听说你们在旅游,所以想要加入一起继续接下来的旅游。也有可能,在旅游过程中,突然其中有某几个人临时有事,想退出这次旅游了。在自由行的旅游,这是很常见的一些事情。如果现在我们使用 CyclicBarrier 这个类来实现,我们发现是实现不了,但是用Phaser就可实现这个功能。
旅游类 TourismRunnable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
public class TourismRunnable implements Runnable {
Phaser phaser;
Random random;
/**
* 每个线程保存一个朋友计数器,比如小红第一次遇到一个朋友,则取名`小红的朋友0号`,
* 然后旅游到其他景点的时候,如果小红又遇到一个朋友,这取名为`小红的朋友1号`
*/
AtomicInteger frientCount = new AtomicInteger();

public TourismRunnable(Phaser phaser) {
this.phaser = phaser;
this.random = new Random();
}

@Override
public void run() {
tourism();
}

/**
* 旅游过程
*/
private void tourism() {
switch (phaser.getPhase()) {
case 0:
if (!goToStartingPoint()) break;
case 1:
if (!goToHotel()) break;
case 2:
if (!goToTourismPoint1()) break;
case 3:
if (!goToTourismPoint2()) break;
case 4:
if (!goToTourismPoint3()) break;
case 5:
if (!goToEndPoint()) break;
}
}

/**
* 准备返程
*
* @return 返回true, 说明还要继续旅游, 否则就临时退出了
*/
private boolean goToEndPoint() {
return goToPoint("飞机场,准备登机回家");
}

/**
* 到达旅游点3
*
* @return 返回true, 说明还要继续旅游, 否则就临时退出了
*/
private boolean goToTourismPoint3() {
return goToPoint("旅游点3");
}

/**
* 到达旅游点2
*
* @return 返回true, 说明还要继续旅游, 否则就临时退出了
*/
private boolean goToTourismPoint2() {
return goToPoint("旅游点2");
}

/**
* 到达旅游点1
*
* @return 返回true, 说明还要继续旅游, 否则就临时退出了
*/
private boolean goToTourismPoint1() {
return goToPoint("旅游点1");
}

/**
* 入住酒店
*
* @return 返回true, 说明还要继续旅游, 否则就临时退出了
*/
private boolean goToHotel() {
return goToPoint("酒店");
}

/**
* 出发点集合
*
* @return 返回true, 说明还要继续旅游, 否则就临时退出了
*/
private boolean goToStartingPoint() {
return goToPoint("出发点");
}

private int getRandomTime() throws InterruptedException {
int time = random.nextInt(400) + 100;
Thread.sleep(time);
return time;
}

/**
* @param point 集合点
* @return 返回true, 说明还要继续旅游, 否则就临时退出了
*/
private boolean goToPoint(String point) {
try {
if (!randomEvent()) { // 到达后有事离开
phaser.arriveAndDeregister();
return false;
}
String name = Thread.currentThread().getName();
System.out.println(name + " 花了 " + getRandomTime() + " 时间才到了" + point);
phaser.arriveAndAwaitAdvance(); // 等待其他人
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}

/**
* 随机事件
*
* @return 返回true, 说明还要继续旅游, 否则就临时退出了
*/
private boolean randomEvent() {
int r = random.nextInt(100);
String name = Thread.currentThread().getName();
if (r < 10) {
int friendNum = 1;
System.out.println(name + ":在这里竟然遇到了" + friendNum + "个朋友,他们说要一起去旅游...");
phaser.bulkRegister(friendNum); // 临时加入
for (int i = 0; i < friendNum; i++) {
new Thread(new TourismRunnable(phaser), name + "的朋友" + frientCount.getAndAdd(1) + "号").start();
}
} else if (r > 90) {
System.out.println(name + ":突然有事要离开一下,不和他们继续旅游了....");
return false;
}
return true;
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Phaser;

public class PhaserDemo {
public static void main(String[] args) {
String name = "明刚红丽黑白";
Phaser phaser = new SubPhaser(name.length());
List<Thread> tourismThread = new ArrayList<>();
for (char ch : name.toCharArray()) {
tourismThread.add(new Thread(new TourismRunnable(phaser), "小" + ch));
}
for (Thread thread : tourismThread) {
thread.start();
}
}

public static class SubPhaser extends Phaser {
public SubPhaser(int parties) {
super(parties);
}

/** onAdvance 可重写,表示是否终止的条件 */
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName() + ":全部" + getArrivedParties() + "个人都到齐了,现在是第" + (phase + 1)
+ "次集合准备去下一个地方..................\n");
return super.onAdvance(phase, registeredParties);
}
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
小刚:突然有事要离开一下,不和他们继续旅游了....
小白:在这里竟然遇到了1个朋友,他们说要一起去旅游...
小红:在这里竟然遇到了1个朋友,他们说要一起去旅游...
小红 花了 207 时间才到了出发点
小黑 花了 365 时间才到了出发点
小红的朋友0号 花了 367 时间才到了出发点
小明 花了 371 时间才到了出发点
小白的朋友0号 花了 379 时间才到了出发点
小白 花了 402 时间才到了出发点
小丽 花了 460 时间才到了出发点
小丽:全部7个人都到齐了,现在是第1次集合准备去下一个地方..................

小红 花了 130 时间才到了酒店
小红的朋友0号 花了 168 时间才到了酒店
小黑 花了 187 时间才到了酒店
小明 花了 328 时间才到了酒店
小白 花了 380 时间才到了酒店
小白的朋友0号 花了 438 时间才到了酒店
小丽 花了 464 时间才到了酒店
小丽:全部7个人都到齐了,现在是第2次集合准备去下一个地方..................

小丽:在这里竟然遇到了1个朋友,他们说要一起去旅游...
小红:在这里竟然遇到了1个朋友,他们说要一起去旅游...
小黑:突然有事要离开一下,不和他们继续旅游了....
小白:突然有事要离开一下,不和他们继续旅游了....
小明 花了 103 时间才到了旅游点1
小红的朋友1号 花了 241 时间才到了旅游点1
小丽的朋友0号 花了 317 时间才到了旅游点1
小红的朋友0号 花了 365 时间才到了旅游点1
小丽 花了 383 时间才到了旅游点1
小红 花了 433 时间才到了旅游点1
小白的朋友0号 花了 460 时间才到了旅游点1
小白的朋友0号:全部7个人都到齐了,现在是第3次集合准备去下一个地方..................

小明:在这里竟然遇到了1个朋友,他们说要一起去旅游...
小明 花了 121 时间才到了旅游点2
小白的朋友0号 花了 203 时间才到了旅游点2
小红 花了 261 时间才到了旅游点2
小红的朋友1号 花了 286 时间才到了旅游点2
小丽的朋友0号 花了 306 时间才到了旅游点2
小红的朋友0号 花了 432 时间才到了旅游点2
小明的朋友0号 花了 478 时间才到了旅游点2
小丽 花了 491 时间才到了旅游点2
小丽:全部8个人都到齐了,现在是第4次集合准备去下一个地方..................

小丽:突然有事要离开一下,不和他们继续旅游了....
小丽的朋友0号:突然有事要离开一下,不和他们继续旅游了....
小红的朋友0号 花了 155 时间才到了旅游点3
小白的朋友0号 花了 216 时间才到了旅游点3
小红 花了 226 时间才到了旅游点3
小明的朋友0号 花了 268 时间才到了旅游点3
小红的朋友1号 花了 364 时间才到了旅游点3
小明 花了 373 时间才到了旅游点3
小明:全部6个人都到齐了,现在是第5次集合准备去下一个地方..................

小明 花了 189 时间才到了飞机场,准备登机回家
小白的朋友0号 花了 235 时间才到了飞机场,准备登机回家
小红的朋友0号 花了 245 时间才到了飞机场,准备登机回家
小明的朋友0号 花了 323 时间才到了飞机场,准备登机回家
小红的朋友1号 花了 418 时间才到了飞机场,准备登机回家
小红 花了 450 时间才到了飞机场,准备登机回家
小红:全部6个人都到齐了,现在是第6次集合准备去下一个地方..................