友情支持

如果您觉得这个笔记对您有所帮助,看在D瓜哥码这么多字的辛苦上,请友情支持一下,D瓜哥感激不尽,😜

支付宝

微信

有些打赏的朋友希望可以加个好友,欢迎关注D 瓜哥的微信公众号,这样就可以通过公众号的回复直接给我发信息。

wx jikerizhi

公众号的微信号是: jikerizhi因为众所周知的原因,有时图片加载不出来。 如果图片加载不出来可以直接通过搜索微信号来查找我的公众号。

65. ArrayBlockingQueue

BlockingQueue

65.1. 方法组

插入 移除 检查

抛异常

boolean add(E e)

boolean remove(Object o)

E element()

特定值

boolean offer(E e)

E poll()

E peek()

阻塞

void put(E e)

E take()

超时

boolean offer(E e, long timeout, TimeUnit unit)

E poll(long timeout, TimeUnit unit)

四组方法:

  • 抛异常:如果试图的操作无法立即执行,抛一个异常。

  • 特定值:如果试图的操作无法立即执行,返回一个特定的值(通常是 true, falsenull)。

  • 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。

  • 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

ArrayBlockingQueueBlockingQueue 接口的有界队列实现类,底层采用数组来实现。ArrayBlockingQueue 一旦创建,容量不能改变。

ArrayBlockingQueue 默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到 ArrayBlockingQueue。而非公平性则是指访问 ArrayBlockingQueue 的顺序不是遵守严格的时间顺序,有可能存在,当 ArrayBlockingQueue 可以被访问时,长时间阻塞的线程依然无法访问到 ArrayBlockingQueue。如果保证公平性,通常会降低吞吐量。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
 * Inserts element at current put position, advances, and signals.
 * Call only when holding lock.
 */
private void enqueue(E e) {
    // assert lock.isHeldByCurrentThread();
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = e;
    if (++putIndex == items.length) putIndex = 0;
    count++;
    notEmpty.signal();
}

/**
 * Extracts element at current take position, advances, and signals.
 * Call only when holding lock.
 */
private E dequeue() {
    // assert lock.isHeldByCurrentThread();
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E e = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length) takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return e;
}



/**
 * Inserts the specified element at the tail of this queue if it is
 * possible to do so immediately without exceeding the queue's capacity,
 * returning {@code true} upon success and throwing an
 * {@code IllegalStateException} if this queue is full.
 *
 * @param e the element to add
 * @return {@code true} (as specified by {@link Collection#add})
 * @throws IllegalStateException if this queue is full
 * @throws NullPointerException if the specified element is null
 */
public boolean add(E e) {
    return super.add(e);
}

/**
 * Inserts the specified element at the tail of this queue if it is
 * possible to do so immediately without exceeding the queue's capacity,
 * returning {@code true} upon success and {@code false} if this queue
 * is full.  This method is generally preferable to method {@link #add},
 * which can fail to insert an element only by throwing an exception.
 *
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

底层使用数组来实现,长度确定后就不再变化,通过下标循环往复地使用数组,类似与将数组组成了一个圆。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.diguage.truman.concurrent;

import org.junit.jupiter.api.Test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

/**
 * @author D瓜哥, https://www.diguage.com/
 * @since 2020-04-22 15:11
 */
public class ArrayBlockingQueueTest {

    @Test
    public void testTimeoutPoll() {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(5);
        executorService.execute(() -> {
            for (long i = 0; i < 5; i++) {
                queue.add(i);
                try {
                    Thread.sleep(2 * 60 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        for (int i = 0; i < 10; i++) {
            final long time = i;
            executorService.execute(() -> {
                try {
                    Long num = queue.poll(time, TimeUnit.MINUTES);
                    System.out.println("poll:" + num);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        LockSupport.parkNanos(TimeUnit.MINUTES.toNanos(10));
    }
}

poll(long, java.util.concurrent.TimeUnit) 方法,其实就是使用 Condition notEmpty 对象来调用 ConditionObject.awaitNanos(long) 方法,在其中再调用了 LockSupport.parkNanos(java.lang.Object, long) 方法来实现"休眠等待"。