友情支持

如果您觉得这个笔记对您有所帮助,看在D瓜哥码这么多字的辛苦上,请友情支持一下,D瓜哥感激不尽,😜

支付宝

微信

有些打赏的朋友希望可以加个好友,欢迎关注D 瓜哥的微信公众号,这样就可以通过公众号的回复直接给我发信息。

wx jikerizhi

公众号的微信号是: jikerizhi因为众所周知的原因,有时图片加载不出来。 如果图片加载不出来可以直接通过搜索微信号来查找我的公众号。

48. CountDownLatch

"Count Down" 在英语中意为倒计数,一个典型场景就是火箭🚀发射时的倒计时。它允许一个或多个线程等待其他线程完成操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.diguage.truman.concurrent;

import org.junit.jupiter.api.Test;

import java.time.LocalDateTime;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

/**
 * @author D瓜哥, https://www.diguage.com/
 * @since 2020-03-16 17:23
 */
public class CountDownLatchTest {
    private int count = 2;

    @Test
    public void test() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(count);
        ExecutorService executorService = Executors.newFixedThreadPool(count);
        for (int i = 0; i < count; i++) {
            executorService.execute(new Task(latch));
        }
        latch.await();
        System.out.println("Fire...");
        executorService.shutdown();
        while (executorService.isTerminated()) {
        }
        System.out.println("All task were done.");
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
        System.out.println("Terminal at " + LocalDateTime.now());
    }

    static class Task implements Runnable {
        private final CountDownLatch latch;

        public Task(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                int time = new Random().nextInt(5000);
                latch.countDown();
                System.out.println(Thread.currentThread().getId() + " time = " + LocalDateTime.now());
                Thread.sleep(time);
                System.out.println(Thread.currentThread().getId() + " sleep = " + time + ": check finished.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

下面,我们开始看 CountDownLatch 源码:

CountDownLatch 类中存在一个内部类 Sync,继承自 AbstractQueuedSynchronizer,代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
 * Synchronization control For CountDownLatch.
 * Uses AQS state to represent count.
 */
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

管中窥豹,从这里也可以看出 CountDownLatch 中的等待控制几乎都是依赖 AbstractQueuedSynchronizer 来实现的。

48.1. await()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
 * Causes the current thread to wait until the latch has counted down to
 * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
 *
 * <p>If the current count is zero then this method returns immediately.
 *
 * <p>If the current count is greater than zero then the current
 * thread becomes disabled for thread scheduling purposes and lies
 * dormant until one of two things happen:
 * <ul>
 * <li>The count reaches zero due to invocations of the
 * {@link #countDown} method; or
 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 * the current thread.
 * </ul>
 *
 * <p>If the current thread:
 * <ul>
 * <li>has its interrupted status set on entry to this method; or
 * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
 * </ul>
 * then {@link InterruptedException} is thrown and the current thread's
 * interrupted status is cleared.
 *
 * @throws InterruptedException if the current thread is interrupted
 *         while waiting
 */
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

await() 的处理直接委托给了 syncacquireSharedInterruptibly(1) 方法,当然这个方法是从 AbstractQueuedSynchronizer 继承而来的。来看一下这个方法:

AbstractQueuedSynchronizer
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

结合上面提到的 Sync 中的 tryAcquireShared(int acquires) 方法,可以看出,当 getState() 不为零时,就会导致 tryAcquireShared(arg) 结果返回小于零,进而调用 doAcquireSharedInterruptibly(arg),将线程进入排队,然后挂起线程。

CountDownLatch await park
CountDownLatch await unpark

48.2. countDown()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
/**
 * Decrements the count of the latch, releasing all waiting threads if
 * the count reaches zero.
 *
 * <p>If the current count is greater than zero then it is decremented.
 * If the new count is zero then all waiting threads are re-enabled for
 * thread scheduling purposes.
 *
 * <p>If the current count equals zero then nothing happens.
 */
public void countDown() {
    sync.releaseShared(1);
}

这里的 releaseShared(1) 方法是从 AbstractQueuedSynchronizer 继承过来的,来看一下这个方法的实现:

AbstractQueuedSynchronizer
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

结合上面提到的 Sync 中的 tryReleaseShared(int releases) 方法,我们可以看出:countDown() 方法直接减少锁存器计数,如果不为零,则无所作为;减少到零,则释放所有上述通过 await() 方法挂起的所有等待线程。

CountDownLatch countDown 1
CountDownLatch countDown 2
  1. CountDownLatch 为什么使用共享锁?

    答:前面我们分析 ReentrantReadWriteLock 的时候学习过AQS的共享锁模式,比如当前锁是由一个线程获取为互斥锁,那么这时候所有需要获取共享锁的线程都要进入AQS队列中进行排队,当这个互斥锁释放的时候,会一个接着一个地唤醒这些连续的排队的等待获取共享锁的线程,注意,这里的用语是“一个接着一个地唤醒”,也就是说这些等待获取共享锁的线程不是一次性唤醒的。