ArrayBlockingQueue解析

前言

ArrayBlockingQueue 是采用数组实现的有界阻塞线程安全队列。如果向已满的队列继续塞入元素,将导致当前的线程阻塞。如果向空队列获取元素,那么将导致当前线程阻塞。

ArrayBlockingQueue 继承 AbstractQueue 类,实现 BlockingQueueSerializable接口,关于 AbstractQueueBlockingQueue 的内容可以参考 抽象队列与阻塞队列解析

1
2
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable

ArrayBlockingQueue 有两个内部类,分别为 ItrsItr。其中 Itrs 内部类 Node 继承 WeakReference 类。Itr 实现了 Iterator 接口

使用示例

实现简单的生产者消费者模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class ArrayBlockingQueueTest {
private static BlockingQueue<Food> queue = new ArrayBlockingQueue<>(2);

// 生产者
class Producer implements Runnable {
@Override
public void run() {
Food food = new Food();
food.setName("Apple");
try {
queue.put(food);
System.out.println(Thread.currentThread().getName() + " provider : " + food);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

// 消费者
class Consumer implements Runnable {
@Override
public void run() {
try {
Food food = queue.take();
System.out.println(Thread.currentThread().getName() + " consumer : " + food);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

// 食物
class Food {
private String name;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

@Override
public String toString() {
return "Food {name:" + name + "}";
}
}

public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(new ArrayBlockingQueueTest().new Producer()).start();
}

new Thread(new ArrayBlockingQueueTest().new Consumer()).start();
new Thread(new ArrayBlockingQueueTest().new Consumer()).start();
}
}

其中一次执行结果如下:

1
2
3
4
5
6
Thread-4 provider : Food {name:Apple}
Thread-3 provider : Food {name:Apple}
Thread-5 consumer : Food {name:Apple}
Thread-2 provider : Food {name:Apple}
Thread-0 provider : Food {name:Apple}
Thread-6 consumer : Food {name:Apple}

类结构源码分析

类属性

可重入锁的内容可以参考 ReentrantLock解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/** 存放队列元素的数组 */
final Object[] items;

/** 取元素的下标索引 */
int takeIndex;

/** 存元素的下标索引 */
int putIndex;

/** 队列中元素的数量 */
int count;

/** 数据访问的重入锁 */
final ReentrantLock lock;

/** 取元素的等待队列条件 */
private final Condition notEmpty;

/** 存元素的等待队列条件 */
private final Condition notFull;

/** 当前活动迭代器的共享状态,如果已知不存在,则为空。允许队列操作更新迭代器状态。*/
transient Itrs itrs = null;

构造器

构造器有三种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 初始化阻塞队列容量,默认是采用不公平重入锁
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

// 使用 capacity 初始化阻塞队列容量,fair 初始化重入锁的公平与否
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

// 在 ArrayBlockingQueue(int capacity, boolean fair) 基础上,将集合中元素加到阻塞队列中
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
// 采用 ArrayBlockingQueue(int capacity, boolean fair) 初始化阻塞队列
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // 锁仅用于可见性,而不是相互排斥
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e); // 检查非空
items[i++] = e; // 将元素加入阻塞队列
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
// 更新队列中元素的数量
count = i;
// 更新存元素的下标索引,i 和 capacity 相等,则从 0 开始重新计
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}

内部类源码分析

Itrs内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
class Itrs {
// 弱迭代器引用的链接列表节点
private class Node extends WeakReference<Itr> {
Node next;
Node(Itr iterator, Node next) {
super(iterator);
this.next = next;
}
}
/** takeIndex 环绕到0的次数*/
int cycles = 0;
/** 弱迭代器引用的链接列表头节点 */
private Node head;
/** 用于删除过时的迭代器 */
private Node sweeper = null;
// 短扫频探头
private static final int SHORT_SWEEP_PROBES = 4;
// 长扫频探头
private static final int LONG_SWEEP_PROBES = 16;

Itrs(Itr initial) {
register(initial);
}

// 清理迭代器,查找并删除过时的迭代器。仅从迭代线程调用
void doSomeSweeping(boolean tryHarder) {
// assert lock.getHoldCount() == 1;
// assert head != null;
int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
Node o, p;
final Node sweeper = this.sweeper;
boolean passedGo; // to limit search to one full sweep

if (sweeper == null) {
o = null;
p = head;
passedGo = true;
} else {
o = sweeper;
p = o.next;
passedGo = false;
}

for (; probes > 0; probes--) {
if (p == null) {
if (passedGo)
break;
o = null;
p = head;
passedGo = true;
}
final Itr it = p.get();
final Node next = p.next;
if (it == null || it.isDetached()) {
// found a discarded/exhausted iterator
probes = LONG_SWEEP_PROBES; // "try harder"
// unlink p
p.clear();
p.next = null;
if (o == null) {
head = next;
if (next == null) {
// We've run out of iterators to track; retire
itrs = null;
return;
}
}
else
o.next = next;
} else {
o = p;
}
p = next;
}

this.sweeper = (p == null) ? null : o;
}

// 将新迭代器添加到跟踪迭代器的链接列表中。
void register(Itr itr) {
// assert lock.getHoldCount() == 1;
head = new Node(itr, head);
}

// 通知所有迭代器,并删除任何已过时的迭代器。takeIndex 环绕到0时调用
void takeIndexWrapped() {
// assert lock.getHoldCount() == 1;
cycles++;
for (Node o = null, p = head; p != null;) {
final Itr it = p.get();
final Node next = p.next;
if (it == null || it.takeIndexWrapped()) {
// unlink p
// assert it == null || it.isDetached();
p.clear();
p.next = null;
if (o == null)
head = next;
else
o.next = next;
} else {
o = p;
}
p = next;
}
if (head == null) // no more iterators to track
itrs = null;
}

// 当迭代器被移除时调用,通知所有迭代器,并删除已过时的迭代器。
void removedAt(int removedIndex) {
for (Node o = null, p = head; p != null;) {
final Itr it = p.get();
final Node next = p.next;
if (it == null || it.removedAt(removedIndex)) {
// unlink p
// assert it == null || it.isDetached();
p.clear();
p.next = null;
if (o == null)
head = next;
else
o.next = next;
} else {
o = p;
}
p = next;
}
if (head == null) // no more iterators to track
itrs = null;
}

// 队列为空时调用,通知所有活动迭代器,队列为空,清除所有弱引用,并取消 itrs 的链接。
void queueIsEmpty() {
// assert lock.getHoldCount() == 1;
for (Node p = head; p != null; p = p.next) {
Itr it = p.get();
if (it != null) {
p.clear();
it.shutdown();
}
}
head = null;
itrs = null;
}

// 每当元素(在 takeIndex 处)退出队列时调用。
void elementDequeued() {
// assert lock.getHoldCount() == 1;
if (count == 0)
queueIsEmpty();
else if (takeIndex == 0)
takeIndexWrapped();
}
}

Itr 内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
private class Itr implements Iterator<E> {
/** 游标,下一个next节点对应的下标,到达putIndex结束的位置为NONE */
private int cursor;
/** next节点的元素值 */
private E nextItem;
/** next节点的索引 */
private int nextIndex;
/** 上一个节点的元素值 */
private E lastItem;
/** 上一个节点的索引 */
private int lastRet;
/** 记录 takeIndex */
private int prevTakeIndex;
/** 记录cycles */
private int prevCycles;
/** 元素不可用或未定义时的特殊索引值 */
private static final int NONE = -1;
/** 元素非调用 this.remove() 被移除元素的索引值 */
private static final int REMOVED = -2;
/** detached 模式特殊的 prevTakeIndex 值 */
private static final int DETACHED = -3;

Itr() {
// assert lock.getHoldCount() == 0;
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
if (count == 0) {
// assert itrs == null;
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex);
cursor = incCursor(takeIndex);
if (itrs == null) {
itrs = new Itrs(this);
} else {
// 在 itrs 中注册 this
itrs.register(this); // in this order
// 清理迭代器,查找并删除过时的迭代器。
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
// assert takeIndex >= 0;
// assert prevTakeIndex == takeIndex;
// assert nextIndex >= 0;
// assert nextItem != null;
}
} finally {
lock.unlock();
}
}
// 是否是 detached 模式
boolean isDetached() {
// assert lock.getHoldCount() == 1;
return prevTakeIndex < 0;
}
// 游标处理
private int incCursor(int index) {
// assert lock.getHoldCount() == 1;
if (++index == items.length)
// 从 0 开始
index = 0;
if (index == putIndex)
// takeIndex == putIndex
index = NONE;
return index;
}
// 校验索引是否有效
private boolean invalidated(int index, int prevTakeIndex,
long dequeues, int length) {
if (index < 0)
return false;
int distance = index - prevTakeIndex;
if (distance < 0)
distance += length;
// 下标距离的prevTakeIndex元素数量 和 出队元素数量 比较
return dequeues > distance;
}

// 调整索引,合并自上次对此迭代器执行操作以来的所有出列。
private void incorporateDequeues() {
// assert lock.getHoldCount() == 1;
// assert itrs != null;
// assert !isDetached();
// assert count > 0;

final int cycles = itrs.cycles;
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
final int prevCycles = this.prevCycles;
final int prevTakeIndex = this.prevTakeIndex;
// cycles和takeIndex存在不一致,需要修正
if (cycles != prevCycles || takeIndex != prevTakeIndex) {
final int len = items.length;
// 计算出队元素的数量
long dequeues = (cycles - prevCycles) * len
+ (takeIndex - prevTakeIndex);
// 校验下标合法性
if (invalidated(lastRet, prevTakeIndex, dequeues, len))
lastRet = REMOVED;
if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
nextIndex = REMOVED;
if (invalidated(cursor, prevTakeIndex, dequeues, len))
cursor = takeIndex;
// 进入detach模式
if (cursor < 0 && nextIndex < 0 && lastRet < 0)
detach();
else {
this.prevCycles = cycles;
this.prevTakeIndex = takeIndex;
}
}
}

// 修改detach的标志字段,并且启动itrs的清理逻辑。
private void detach() {
// Switch to detached mode
// assert lock.getHoldCount() == 1;
// assert cursor == NONE;
// assert nextIndex < 0;
// assert lastRet < 0 || nextItem == null;
// assert lastRet < 0 ^ lastItem != null;
if (prevTakeIndex >= 0) {
// assert itrs != null;
prevTakeIndex = DETACHED;
// try to unlink from itrs (but not too hard)
itrs.doSomeSweeping(true);
}
}

// 迭代器的 hasNext 方法
public boolean hasNext() {
// assert lock.getHoldCount() == 0;
if (nextItem != null)
return true;
noNext(); // 对于没有下一个节点的迭代器,需要修正下标属性并进入detach模式。
return false;
}
// 修正下标属性并进入detach模式
private void noNext() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
// assert cursor == NONE;
// assert nextIndex == NONE;
if (!isDetached()) {
// assert lastRet >= 0;
incorporateDequeues(); // might update lastRet
if (lastRet >= 0) {
lastItem = itemAt(lastRet);
// assert lastItem != null;
detach();
}
}
// assert isDetached();
// assert lastRet < 0 ^ lastItem != null;
} finally {
lock.unlock();
}
}
// 迭代器的 next() 方法
public E next() {
// assert lock.getHoldCount() == 0;
final E x = nextItem;
if (x == null)
throw new NoSuchElementException();
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
if (!isDetached())
incorporateDequeues();
// assert nextIndex != NONE;
// assert lastItem == null;
lastRet = nextIndex;
final int cursor = this.cursor;
if (cursor >= 0) {
nextItem = itemAt(nextIndex = cursor);
// assert nextItem != null;
this.cursor = incCursor(cursor);
} else {
nextIndex = NONE;
nextItem = null;
}
} finally {
lock.unlock();
}
return x;
}
// 迭代器的 remove() 方法
public void remove() {
// assert lock.getHoldCount() == 0;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
if (!isDetached())
incorporateDequeues(); // might update lastRet or detach
final int lastRet = this.lastRet;
this.lastRet = NONE;
if (lastRet >= 0) {
if (!isDetached())
removeAt(lastRet);
else {
final E lastItem = this.lastItem;
// assert lastItem != null;
this.lastItem = null;
if (itemAt(lastRet) == lastItem)
removeAt(lastRet);
}
} else if (lastRet == NONE)
throw new IllegalStateException();
// else lastRet == REMOVED and the last returned element was
// previously asynchronously removed via an operation other
// than this.remove(), so nothing to do.

if (cursor < 0 && nextIndex < 0)
detach();
} finally {
lock.unlock();
// assert lastRet == NONE;
// assert lastItem == null;
}
}

// 通知迭代器队列为空,或队列已无望地落在后面
void shutdown() {
// assert lock.getHoldCount() == 1;
cursor = NONE;
if (nextIndex >= 0)
nextIndex = REMOVED;
if (lastRet >= 0) {
lastRet = REMOVED;
lastItem = null;
}
prevTakeIndex = DETACHED;
// Don't set nextItem to null because we must continue to be
// able to return it on next().
//
// Caller will unlink from itrs when convenient.
}

private int distance(int index, int prevTakeIndex, int length) {
int distance = index - prevTakeIndex;
if (distance < 0)
distance += length;
return distance;
}

// 所有队列移除takeIndex下标处元素的方法, 都会调用迭代器的removeAt方法,以通知其修正下标索引值。
boolean removedAt(int removedIndex) {
// assert lock.getHoldCount() == 1;
if (isDetached())
return true;

final int cycles = itrs.cycles;
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
final int prevCycles = this.prevCycles;
final int prevTakeIndex = this.prevTakeIndex;
final int len = items.length;
int cycleDiff = cycles - prevCycles;
if (removedIndex < takeIndex)
cycleDiff++;
final int removedDistance =
(cycleDiff * len) + (removedIndex - prevTakeIndex);
// assert removedDistance >= 0;
int cursor = this.cursor;
if (cursor >= 0) {
int x = distance(cursor, prevTakeIndex, len);
if (x == removedDistance) {
if (cursor == putIndex)
this.cursor = cursor = NONE;
}
else if (x > removedDistance) {
// assert cursor != prevTakeIndex;
this.cursor = cursor = dec(cursor);
}
}
int lastRet = this.lastRet;
if (lastRet >= 0) {
int x = distance(lastRet, prevTakeIndex, len);
if (x == removedDistance)
this.lastRet = lastRet = REMOVED;
else if (x > removedDistance)
this.lastRet = lastRet = dec(lastRet);
}
int nextIndex = this.nextIndex;
if (nextIndex >= 0) {
int x = distance(nextIndex, prevTakeIndex, len);
if (x == removedDistance)
this.nextIndex = nextIndex = REMOVED;
else if (x > removedDistance)
this.nextIndex = nextIndex = dec(nextIndex);
}
else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
this.prevTakeIndex = DETACHED;
return true;
}
return false;
}

// takeIndex每次循环到0时会调用该方法。
// cycle计数增加,遍历链表检查并清理过期的无效节点。
boolean takeIndexWrapped() {
// assert lock.getHoldCount() == 1;
if (isDetached())
return true;
if (itrs.cycles - prevCycles > 1) {
// All the elements that existed at the time of the last
// operation are gone, so abandon further iteration.
shutdown();
return true;
}
return false;
}

// /** 取消调试注释. */
// public String toString() {
// return ("cursor=" + cursor + " " +
// "nextIndex=" + nextIndex + " " +
// "lastRet=" + lastRet + " " +
// "nextItem=" + nextItem + " " +
// "lastItem=" + lastItem + " " +
// "prevCycles=" + prevCycles + " " +
// "prevTakeIndex=" + prevTakeIndex + " " +
// "size()=" + size() + " " +
// "remainingCapacity()=" + remainingCapacity());
// }
}

进入detach模式的关键有3种情况:

  1. cursor == putIndex,这时候 cursor = NONE
  2. 空队列
  3. cycle - preCycles > 1

常用操作源码分析

入队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 调用抽象队列的 add 方法
public boolean add(E e) {
return super.add(e);
}

// 将元素 e 插入队列尾部,不感知中断
public boolean offer(E e) {
checkNotNull(e); // 检查元素 e 是否为空
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁,不感知中断
try {
if (count == items.length)
// 队列满了,返回 false
return false;
else {
// 入队,返回 true
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

// 将元素 e 插入队列尾部,如果队列慢了则一直等待,感知中断。
public void put(E e) throws InterruptedException {
checkNotNull(e); // 检查元素 e 是否为空
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加锁,感知中断
try {
while (count == items.length)
// 队列满了,等待
notFull.await();
// 入队
enqueue(e);
} finally {
lock.unlock();
}
}

// put(E e) 超时版本,如果入队成功,则返回 true
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加锁,感知中断
try {
while (count == items.length) {
if (nanos <= 0)
return false;
// notFull 的超时等待
nanos = notFull.awaitNanos(nanos);
}
// 入队
enqueue(e);
return true;
} finally {
lock.unlock();
}
}

出队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// 遍历并移除队列头部元素,不感知中断
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 出队:队列为空则返回 null。
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

// 遍历并移除队列头部元素,感知中断。如果队列为空则一直等待。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加锁,感知中断
try {
while (count == 0)
// 队列为空,等待
notEmpty.await();
// 出队:遍历并移除队列头部元素,并返回头部元素
return dequeue();
} finally {
lock.unlock();
}
}

// take() 的超时版本
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
// notEmpty 的超时等待
nanos = notEmpty.awaitNanos(nanos);
}
// 出队:遍历并移除队列头部元素,并返回头部元素
return dequeue();
} finally {
lock.unlock();
}
}

// 返回数组中下标索引 takeIndex 对应的元素,不进行弹出。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // 队列为空返回 null
} finally {
lock.unlock();
}
}
@SuppressWarnings("unchecked")
final E itemAt(int i) {
return (E) items[i];
}

// 从队列中移除元素,不感知中断
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 队列非空
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
// takeIndex 对应的元素和 o 一致则移除 takeIndex 对应的元素,返回 true
removeAt(i);
return true;
}
if (++i == items.length)
// ++i 和数组长度相等,i 从 0 开始计
i = 0;
} while (i != putIndex); // i == putIndex 则终止循环
}
// 移除失败
return false;
} finally {
lock.unlock();
}
}

// 清空阻塞队列
public void clear() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
items[i] = null;
if (++i == items.length)
i = 0;
} while (i != putIndex);
takeIndex = putIndex;
count = 0;
if (itrs != null)
// 通知所有活动迭代器,队列为空,清除所有弱引用,并取消 itrs 的链接。
itrs.queueIsEmpty();
// 查询是否有因为给定条件 notFull 正在等待的线程
for (; k > 0 && lock.hasWaiters(notFull); k--)
// 唤醒因为给定条件 notFull 正在等待的线程
notFull.signal();
}
} finally {
lock.unlock();
}
}
// 通知所有活动迭代器,队列为空,清除所有弱引用,并取消 itrs 的链接。
void queueIsEmpty() {
// assert lock.getHoldCount() == 1;
for (Node p = head; p != null; p = p.next) {
Itr it = p.get();
if (it != null) {
p.clear();
it.shutdown();
}
}
head = null;
itrs = null;
}

转数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 将队列转成 Object 数组
public Object[] toArray() {
Object[] a;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
a = new Object[count];
int n = items.length - takeIndex;
if (count <= n) // putIndex 大于等于 takeIndex
System.arraycopy(items, takeIndex, a, 0, count);
else { // putIndex 小于 takeIndex
System.arraycopy(items, takeIndex, a, 0, n);
System.arraycopy(items, 0, a, n, count - n);
}
} finally {
lock.unlock();
}
return a;
}

// 将队列转成泛型数组
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
final int len = a.length;
if (len < count)
// 泛型数组 a 长度不够,重新创建一个长度为 count 的数组
// 注意:泛型数组不能通过 T[] a = new T[count] 方式进行创建,只能通过反射进行创建
a = (T[])java.lang.reflect.Array.newInstance(
a.getClass().getComponentType(), count);
int n = items.length - takeIndex;
if (count <= n) // putIndex 大于等于 takeIndex
System.arraycopy(items, takeIndex, a, 0, count);
else { // putIndex 小于 takeIndex
System.arraycopy(items, takeIndex, a, 0, n);
System.arraycopy(items, 0, a, n, count - n);
}
if (len > count) // 泛型数组 a 长度大于阻塞队列元素个数, 则第 count 位清空
a[count] = null;
} finally {
lock.unlock();
}
return a;
}

其他操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// 获取阻塞队列元素数量
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}

// 获取阻塞队列剩余容量
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}

// 判断阻塞队列是否包含元素 o
public boolean contains(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do { // 处理类似 remove 方法
if (o.equals(items[i]))
return true;
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}

// toString 方法,结果类似 [e1.toString(), e2.toString()……]
public String toString() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k == 0)
return "[]";

final Object[] items = this.items;
StringBuilder sb = new StringBuilder();
sb.append('[');
for (int i = takeIndex; ; ) {
Object e = items[i];
sb.append(e == this ? "(this Collection)" : e);
if (--k == 0)
return sb.append(']').toString();
sb.append(',').append(' ');
if (++i == items.length)
i = 0;
}
} finally {
lock.unlock();
}
}

特殊方法

将队列元素添加到集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// 将队列元素添加到集合 c,最大数量为 Integer.MAX_VALUE
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}

// 将队列元素添加到集合 c,最大数量为 maxElements
public int drainTo(Collection<? super E> c, int maxElements) {
checkNotNull(c); // 检查集合是否非null
if (c == this) // c 是当前队列本身
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 取 maxElements 和队列元素数量最小值
int n = Math.min(maxElements, count);
int take = takeIndex;
int i = 0;
try {
while (i < n) { // 从队列中循环取出元素并加入集合 c 中
@SuppressWarnings("unchecked")
E x = (E) items[take];
c.add(x);
items[take] = null;
if (++take == items.length)
take = 0;
i++;
}
return n;
} finally {
// 恢复不变量,即使调用 c.add() 抛出异常
if (i > 0) {
count -= i; // 移除并成功添加到集合后,队列剩余元素数量
takeIndex = take; // 更新 takeIndex
if (itrs != null) {
if (count == 0)
// 通知所有活动迭代器,队列为空,清除所有弱引用,并取消 itrs 的链接。
itrs.queueIsEmpty();
else if (i > take) // 移除并成功添加到集合的元素个数大于 take
// 每当 takeIndex 为0时调用。通知所有迭代器,并删除任何已过时的迭代器。
itrs.takeIndexWrapped(); // 参考内部类 Itrs 的 takeIndexWrapped 实现
}
// 查询是否有因为给定条件 notFull 正在等待的线程
for (; i > 0 && lock.hasWaiters(notFull); i--)
// 唤醒因为给定条件 notFull 正在等待的线程
notFull.signal();
}
}
} finally {
lock.unlock();
}
}

出队与入队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 入队
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒因为给定条件 notEmpty 正在等待的线程
notEmpty.signal();
}

// 出队
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒因为给定条件 notFull 正在等待的线程
notFull.signal();
return x;
}
void elementDequeued() {
// assert lock.getHoldCount() == 1;
if (count == 0)
// 通知所有活动迭代器,队列为空,清除所有弱引用,并取消 itrs 的链接。
queueIsEmpty();
else if (takeIndex == 0)
// 通知所有迭代器,并删除任何已过时的迭代器。
takeIndexWrapped();
}

分割迭代器

1
2
3
4
5
6
7
// jdk8新增,可用来并行遍历元素的一个迭代器
public Spliterator<E> spliterator() {
// 有序、非空、线程安全
return Spliterators.spliterator
(this, Spliterator.ORDERED | Spliterator.NONNULL |
Spliterator.CONCURRENT);
}

总结

ArrayBlockingQueue 采用数组实现的有界阻塞线程安全队列,是规定大小的 BlockingQueue,其构造必须指定大小。其所含的对象是 FIFO 顺序排序的。