友情支持
如果您觉得这个笔记对您有所帮助,看在D瓜哥码这么多字的辛苦上,请友情支持一下,D瓜哥感激不尽,😜
有些打赏的朋友希望可以加个好友,欢迎关注D 瓜哥的微信公众号,这样就可以通过公众号的回复直接给我发信息。
公众号的微信号是: jikerizhi 。因为众所周知的原因,有时图片加载不出来。 如果图片加载不出来可以直接通过搜索微信号来查找我的公众号。 |
65. ArrayBlockingQueue

65.1. 方法组
插入 | 移除 | 检查 | |
---|---|---|---|
抛异常 |
|
|
|
特定值 |
|
|
|
阻塞 |
|
|
|
超时 |
|
|
四组方法:
-
抛异常:如果试图的操作无法立即执行,抛一个异常。
-
特定值:如果试图的操作无法立即执行,返回一个特定的值(通常是
true
,false
或null
)。 -
阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
-
超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。
ArrayBlockingQueue
是 BlockingQueue
接口的有界队列实现类,底层采用数组来实现。ArrayBlockingQueue
一旦创建,容量不能改变。
ArrayBlockingQueue
默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到 ArrayBlockingQueue
。而非公平性则是指访问 ArrayBlockingQueue
的顺序不是遵守严格的时间顺序,有可能存在,当 ArrayBlockingQueue
可以被访问时,长时间阻塞的线程依然无法访问到 ArrayBlockingQueue
。如果保证公平性,通常会降低吞吐量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E e) {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return e;
}
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and throwing an
* {@code IllegalStateException} if this queue is full.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws IllegalStateException if this queue is full
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
return super.add(e);
}
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full. This method is generally preferable to method {@link #add},
* which can fail to insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
底层使用数组来实现,长度确定后就不再变化,通过下标循环往复地使用数组,类似与将数组组成了一个圆。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.diguage.truman.concurrent;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* @author D瓜哥, https://www.diguage.com/
* @since 2020-04-22 15:11
*/
public class ArrayBlockingQueueTest {
@Test
public void testTimeoutPoll() {
ExecutorService executorService = Executors.newFixedThreadPool(5);
ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(5);
executorService.execute(() -> {
for (long i = 0; i < 5; i++) {
queue.add(i);
try {
Thread.sleep(2 * 60 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
for (int i = 0; i < 10; i++) {
final long time = i;
executorService.execute(() -> {
try {
Long num = queue.poll(time, TimeUnit.MINUTES);
System.out.println("poll:" + num);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
LockSupport.parkNanos(TimeUnit.MINUTES.toNanos(10));
}
}
poll(long, java.util.concurrent.TimeUnit)
方法,其实就是使用 Condition notEmpty
对象来调用 ConditionObject.awaitNanos(long)
方法,在其中再调用了 LockSupport.parkNanos(java.lang.Object, long)
方法来实现"休眠等待"。