CyclicBarrier解析

本文代码基于Java8

前言

CyclicBarrier 字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier 可以被重用。

CyclicBarrier类结构

CyclicBarrier 底层使用重写锁和 Generation 实现,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CyclicBarrier {
/**
* barrier的每次使用都表示为一个Generation实例。当barrier被释放或重置时,generation会发生变化。
* 可以有许多generation实例与使用屏障的线程关联。——因为锁可能被分配给等待的线程的方式是不确定的,
* 但是一次只能有一个线程处于活动状态(应用{@code count}的线程),其余的线程要么被破坏,要么被触发。
* 如果发生中断,但没有后续重置,则不需要激活generation。
*/
private static class Generation {
boolean broken = false;
}

/** 防护barrier入口的锁 */
private final ReentrantLock lock = new ReentrantLock();
/** 待跳闸条件 */
private final Condition trip = lock.newCondition();
/** 线程数量 */
private final int parties;
/* 跳闸时运行的命令 */
private final Runnable barrierCommand;
/** 当前 generation */
private Generation generation = new Generation();

/**
* 仍然在等待的线程数量。每当仍然在等待的线程数量到0或崩溃时,CyclicBarrier会重置为每一代的线程数量。
*/
private int count;
}

使用示例

一群学生准备乘车出去玩,约定明天在学校门口集合上车出发。现在陆陆续续的一个一个同学上车了,如果还有学生没到,其它人都必须等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  private final static int NUM = 3;
private static final Random random = new Random(10);
public static void main(String[] args) {
Runnable barrierAction =
() -> { System.out.println("线程到齐,发车!"); };
final CyclicBarrier barrier = new CyclicBarrier(NUM, barrierAction);

for (int i = 0; i < NUM; i++) {
new Task(barrier).start();
}
}

static class Task extends Thread {
private CyclicBarrier barrier;

public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 出发!");
try {
Thread.sleep(1000 * random.nextInt(5));
System.out.println(Thread.currentThread().getName() + " 已经上车,等待其他线程上车!");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}

执行结果如下:

1
2
3
4
5
6
7
Thread-0 出发!
Thread-2 出发!
Thread-1 出发!
Thread-2 已经上车,等待其他线程上车!
Thread-1 已经上车,等待其他线程上车!
Thread-0 已经上车,等待其他线程上车!
线程到齐,发车!

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/** 
* 创建一个新的CyclicBarrier对象,当给定的线程都到达这个临界点等待(即调用await方法),则开启barrier。
* 当开启barrier时,它将执行 barrierAction
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

/**
* 创建一个新的CyclicBarrier对象,当给定的线程都到达这个临界点等待(即调用await方法),则开启barrier。
* 当开启barrier时并没有任何预先定义的action需要执行。
*/
public CyclicBarrier(int parties) {
this(parties, null);
}

await 方法

等待直到在这个 barrier 中有 parties 个线程调用了 await 方法。如果当前线程不是最后一个到达的,则由于线程调度的原因被禁用,sleep 直至以下事情发生:

  1. 最后一个线程的到达;
  2. 其它的线程中断了当前线程;
  3. 其它的线程中断了和一样正在等待的线程;
  4. 其它线程等待barrier超时;
  5. 其它线程在这个barrier处调用了reset方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L); // 无超时时间
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

/** 指定等待超时时间 */
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

此方法是本质上是调用 dowait 方法来完成,dowait 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock(); // 获取重写锁
try {
final Generation g = generation;
if (g.broken) // broken 则抛异常
throw new BrokenBarrierException();

if (Thread.interrupted()) { // 线程中断
// 将 barrier 当前generation 的 broken字段设为 true,并唤醒所有线程。
breakBarrier();
throw new InterruptedException();
}

int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 执行barrierCommand
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// 循环直至发生 tripped, broken, interrupted, or timed out
for (;;) {
try {
/** 超时设置 */
if (!timed) // 未设置超时时间,则一直等待,直到其它线程唤醒
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
/** 发生中断 */
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
// broken
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
// timed out
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

breakBarrier()nextGeneration() 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 设置当前的generation状态为broken且唤醒所有正在等待的线程。
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
// 更新barrier的状态为重复利用做准备并且唤醒所有正在等待的线程
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

总结

  1. CyclicBarrier 是可重用的,CountDownLatch 是不可重用的。
  2. CyclicBarrier 的用途是让一组线程互相等待,直到到达某个公共屏障点才开始继续工作。
  3. CyclicBarrier 指定的任务是进入 barrier 最后一个线程来调用的,如果在执行这个任务发生异常时,则会传播到此线程,其它线程不受影响继续正常运行。
  4. 在等待的只要有一个线程发生中断,则其它线程就会被唤醒继续正常运行。