友情支持
如果您觉得这个笔记对您有所帮助,看在D瓜哥码这么多字的辛苦上,请友情支持一下,D瓜哥感激不尽,😜
有些打赏的朋友希望可以加个好友,欢迎关注D 瓜哥的微信公众号,这样就可以通过公众号的回复直接给我发信息。
公众号的微信号是: jikerizhi 。因为众所周知的原因,有时图片加载不出来。 如果图片加载不出来可以直接通过搜索微信号来查找我的公众号。 |
48. CountDownLatch
"Count Down" 在英语中意为倒计数,一个典型场景就是火箭🚀发射时的倒计时。它允许一个或多个线程等待其他线程完成操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.diguage.truman.concurrent;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* @author D瓜哥, https://www.diguage.com/
* @since 2020-03-16 17:23
*/
public class CountDownLatchTest {
private int count = 2;
@Test
public void test() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(count);
ExecutorService executorService = Executors.newFixedThreadPool(count);
for (int i = 0; i < count; i++) {
executorService.execute(new Task(latch));
}
latch.await();
System.out.println("Fire...");
executorService.shutdown();
while (executorService.isTerminated()) {
}
System.out.println("All task were done.");
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
System.out.println("Terminal at " + LocalDateTime.now());
}
static class Task implements Runnable {
private final CountDownLatch latch;
public Task(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
int time = new Random().nextInt(5000);
latch.countDown();
System.out.println(Thread.currentThread().getId() + " time = " + LocalDateTime.now());
Thread.sleep(time);
System.out.println(Thread.currentThread().getId() + " sleep = " + time + ": check finished.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
下面,我们开始看 CountDownLatch
源码:
CountDownLatch
类中存在一个内部类 Sync
,继承自 AbstractQueuedSynchronizer
,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
管中窥豹,从这里也可以看出 CountDownLatch
中的等待控制几乎都是依赖 AbstractQueuedSynchronizer
来实现的。
48.1. await()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Causes the current thread to wait until the latch has counted down to
* zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>If the current count is zero then this method returns immediately.
*
* <p>If the current count is greater than zero then the current
* thread becomes disabled for thread scheduling purposes and lies
* dormant until one of two things happen:
* <ul>
* <li>The count reaches zero due to invocations of the
* {@link #countDown} method; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted
* while waiting
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
对 await()
的处理直接委托给了 sync
的 acquireSharedInterruptibly(1)
方法,当然这个方法是从 AbstractQueuedSynchronizer
继承而来的。来看一下这个方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
结合上面提到的 Sync
中的 tryAcquireShared(int acquires)
方法,可以看出,当 getState()
不为零时,就会导致 tryAcquireShared(arg)
结果返回小于零,进而调用 doAcquireSharedInterruptibly(arg)
,将线程进入排队,然后挂起线程。


48.2. countDown()
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Decrements the count of the latch, releasing all waiting threads if
* the count reaches zero.
*
* <p>If the current count is greater than zero then it is decremented.
* If the new count is zero then all waiting threads are re-enabled for
* thread scheduling purposes.
*
* <p>If the current count equals zero then nothing happens.
*/
public void countDown() {
sync.releaseShared(1);
}
这里的 releaseShared(1)
方法是从 AbstractQueuedSynchronizer
继承过来的,来看一下这个方法的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
结合上面提到的 Sync
中的 tryReleaseShared(int releases)
方法,我们可以看出:countDown()
方法直接减少锁存器计数,如果不为零,则无所作为;减少到零,则释放所有上述通过 await()
方法挂起的所有等待线程。


-
CountDownLatch
为什么使用共享锁?答:前面我们分析
ReentrantReadWriteLock
的时候学习过AQS的共享锁模式,比如当前锁是由一个线程获取为互斥锁,那么这时候所有需要获取共享锁的线程都要进入AQS队列中进行排队,当这个互斥锁释放的时候,会一个接着一个地唤醒这些连续的排队的等待获取共享锁的线程,注意,这里的用语是“一个接着一个地唤醒”,也就是说这些等待获取共享锁的线程不是一次性唤醒的。