友情支持
如果您觉得这个笔记对您有所帮助,看在D瓜哥码这么多字的辛苦上,请友情支持一下,D瓜哥感激不尽,😜
有些打赏的朋友希望可以加个好友,欢迎关注D 瓜哥的微信公众号,这样就可以通过公众号的回复直接给我发信息。
公众号的微信号是: jikerizhi 。因为众所周知的原因,有时图片加载不出来。 如果图片加载不出来可以直接通过搜索微信号来查找我的公众号。 |
58. ThreadPoolExecutor 源码分析
状态名称 | 比特位 | 十进制 | 描述 |
---|---|---|---|
RUNNING |
111-00000000000000000000000000000 |
-536870912 |
运行中状态,可以接收新的任务和执行任务队列中的任务 |
SHUTDOWN |
000-00000000000000000000000000000 |
0 |
shutdown状态,不再接收新的任务,但是会执行任务队列中的任务 |
STOP |
001-00000000000000000000000000000 |
536870912 |
停止状态,不再接收新的任务,也不会执行任务队列中的任务,中断所有执行中的任务 |
TIDYING |
010-00000000000000000000000000000 |
1073741824 |
整理中状态,所有任务已经终结,工作线程数为0,过渡到此状态的工作线程会调用钩子方法 |
TERMINATED |
011-00000000000000000000000000000 |
1610612736 |
终结状态,钩子方法 |
由于运行状态值存放在高3位,所以可以直接通过十进制值(甚至可以忽略低29位,直接用ctl进行比较,或者使用ctl和线程池状态常量进行比较)来比较和判断线程池的状态:工作线程数为0的前提下:RUNNING(-536870912)
< SHUTDOWN(0)
< STOP(536870912)
< TIDYING(1073741824)
< TERMINATED(1610612736)
。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@link RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 如果当前工作线程数小于核心线程数,则创建新的线程并执行传入的任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
// 创建核心线程成功则直接返回
return;
// 创建核心线程失败,则在其他任务提交时,已经创建了足够多的线程数
// 或者线程池关闭等等,总之线程池状态已经发生变化,
// 则更新 ctl 的临时变量
c = ctl.get();
}
// 运行到这里说明创建核心线程失败,则当前工作线程已经大于等于 corePoolSize
// 判断线程池是否运行并且尝试用非阻塞方法向任务队列中添加任务(失败则返回 false)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 向任务队列投放任务成功,对线程池状态做二次检查
// 如果线程池状态不是运行中,则从任务队列中移除任务并执行拒绝策略
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略 -- 结束
reject(command);
// 走到下面的 else if 分支,则说明
// 0、线程池可能是 RUNNING 状态
// 1、任务移除失败(失败原因可能是任务已经被执行)
// 如果当前线程数为0,则创建一个非核心线程并传入任务为 null -- 结束
// 创建的线程不会马上执行任务,而是等待获取任务队列中的任务去执行
// 如果当前线程数不为0,则什么也不做。因为任务已经成功加入队列,总会执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 执行到这里说明:
// 0、线程池中的工作线程总数已经大于等于 corePoolSize
// 1、线程池可能不是 RUNNING 状态
// 2、线程池可能是 RUNNING 状态同时任务队列已经满了
// 如果向任务队列投放任务失败,则会尝试创建非核心线程传入任务执行
// 创建非核心线程失败,此时需要拒绝执行任务
else if (!addWorker(command, false))
// 执行拒绝策略 -- 结束
reject(command);
}
为什么需要二次检查线程池的运行状态,当前工作线程数量为 0
,尝试创建一个非核心线程并且传入的任务对象为 null
?这个可以看API注释:

runWorker()
方法的核心流程:

58.1. 大纲
-
基本使用
-
使用
Executors
创建线程池 -
自定义任务,并提交任务
-
获取返回结果
-
线程池的类图结构
-
创建执行线程
-
取出任务执行
-
如何实现
invokeAny(Collection<? extends Callable<T>> tasks)
? -
如何实现
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
? -
如何实现
invokeAll(Collection<? extends Callable<T>> tasks)
? -
如何实现
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
? -
如何判断线程超时?以及超时后如何杀掉线程?
-
如何终止任务?温柔终止?或者野蛮终止?
-
线程池在jDK5、6、7中有哪些升级变化?
-
拒绝策略
58.2. 核心点
-
关键参数
-
corePoolSize
-
maximumPoolSize
-
BlockingQueue
-
RejectedExecutionHandler
-
keepAliveTime
-
threadFactory
-
-
RejectedExecutionHandler
-
AbortPolicy
-
CallerRunsPolicy
-
DiscardPolicy
-
DiscardOldestPolicy
-
在生产环境,为了避免首次调用超时,可以调用 executor.prestartAllCoreThreads()
预创建所有 core
线程,避免来一个创一个带来首次调用慢的问题。
58.3. 问题
-
任务添加后,如何执行?
-
一个任务执行完成后,如何在同一个线程执行下一个任务?
-
在
corePoolSize
比maximumPoolSize
小的情况下,如何判定一个线程是否超时?并且如何删除一个线程? -
任务添加后,
-
如何返回任务执行的结果?
-
这个线程池还有哪些可以改进的地方?比如 Guava 中提供了哪些线程池?
-
如何改造成添加任务,如果没有达到
maxPoolSize
则先创建线程?
58.3.1. Tomcat 改进
可不可以自己封装一个Queue,在插入时增加以下逻辑呢?
-
如果当前有空闲线程等待接客,则把任务加入队列让孩儿们去抢。
-
如果没有空闲的了,总线程数又没到达max,那就返回false,让Executor去创建线程。
-
如果总线程数已达到max,则继续把任务加入队列缓冲一下。
-
如果缓冲队列也满了,抛出拒绝异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
private transient volatile ThreadPoolExecutor parent = null;
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
}
如何判断当前有没有空闲的线程等待接客?Tomcat 则靠扩展 Executor
,增加一个当前请求数的计数器,在 execute()
方法前加1,再重载 afterExecute()
方法减1,然后判断当前线程总数是否大于当前请求总数就知道有咩有围观群众。
58.4. 需要注意的点
-
线程池如何初始化?
-
任务如何添加?
-
任务如何执行?
-
任务如何终止?
-
遇到异常如何处理?
-
线程池队列已满,如何拒绝?
-
任务执行过程中出现异常,如何处理?关闭该线程,重启一个吗?
-
??
-
任务如何存放?
-
任务存放后,如何取出来?
-
如何做到不断地一个一个执行下去?
-
为什么
Worker
继承AbstractQueuedSynchronizer
?AQS起什么作用?是否需要先研究一下?
58.5. 收获
-
可以继承
ThreadPoolExecutor
,实现beforeExecute()
和afterExecute()
等方法,来加入执行时的回调。类似的回调,还有terminated()
-
添加任务时,
execute()
方法的第二种情况,为什么还有再次检查?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package com.diguage.truman.concurrent;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author D瓜哥, https://www.diguage.com/
* @since 2020-03-10 10:50
*/
public class ThreadPoolExecutorTest {
@Test
public void testStatus() {
int COUNT_BITS = Integer.SIZE - 3;
int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
int RUNNING = -1 << COUNT_BITS;
int SHUTDOWN = 0 << COUNT_BITS;
int STOP = 1 << COUNT_BITS;
int TIDYING = 2 << COUNT_BITS;
int TERMINATED = 3 << COUNT_BITS;
System.out.printf("%32s // %d%n", Integer.toBinaryString(RUNNING), RUNNING);
System.out.printf("%32s // %d%n", Integer.toBinaryString(SHUTDOWN), SHUTDOWN);
System.out.printf("%32s // %d%n", Integer.toBinaryString(STOP), STOP);
System.out.printf("%32s // %d%n", Integer.toBinaryString(TIDYING), TIDYING);
System.out.printf("%32s // %d%n", Integer.toBinaryString(TERMINATED), TERMINATED);
}
@Test
public void testPoolSize() {
ThreadPoolExecutor executorService
= new ThreadPoolExecutor(2, 4, 1L,
TimeUnit.MINUTES, new LinkedBlockingQueue<>(6));
for (int i = 0; i < 10; i++) {
executorService.execute(new Task("Task-" + i));
}
executorService.shutdown();
while (!executorService.isTerminated()) {
}
System.out.println("Finish all thread...");
}
@Test
public void testCallable() {
ThreadPoolExecutor executorService
= new ThreadPoolExecutor(2, 4, 1L,
TimeUnit.MINUTES, new LinkedBlockingQueue<>(6));
List<Future<String>> futures = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
Future<String> future = executorService.submit(
new CallableTask("CallableTask-" + i));
futures.add(future);
}
for (Future<String> future : futures) {
try {
System.out.println(LocalDateTime.now() + "::" + future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
while (!executorService.isTerminated()) {
}
System.out.println("Finish all thread...");
}
@Test
public void testComplete() {
ThreadPoolExecutor executorService
= new ThreadPoolExecutor(2, 4, 1L,
TimeUnit.MINUTES, new LinkedBlockingQueue<>(6));
List<Future<String>> futures = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
Future<String> future = executorService.submit(
new CallableTask("CallableTask-" + i));
futures.add(future);
}
for (Future<String> future : futures) {
try {
System.out.println(LocalDateTime.now() + "::" + future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
while (!executorService.isTerminated()) {
}
System.out.println("Finish all thread...");
}
public static class CallableTask implements Callable<String> {
private final String name;
public CallableTask(String name) {
this.name = name;
}
@Override
public String call() throws Exception {
Thread.sleep(1000);
//返回执行当前 Callable 的线程名字
return Thread.currentThread().getName();
}
}
public static class Task implements Runnable {
private String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()
+ " Start. Time = " + LocalDateTime.now());
processCommand();
System.out.println(Thread.currentThread().getName()
+ " End. Time = " + LocalDateTime.now());
}
private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return this.name;
}
}
}
58.7. 参考资料
-
http://www.throwable.club/2019/07/15/java-concurrency-thread-pool-executor/[JUC线程池ThreadPoolExecutor源码分析 - Throwable’s Blog
-
Java线程池架构原理和源码解析(ThreadPoolExecutor) - xieyuooo的专栏 - 博客频道 - CSDN.NET
-
Java多线程系列目录(共43篇) - 如果天空不死 - 博客园
-
搞清楚了
ctl
的含义,高三位是状态,低29位是线程数 -
主要属性的含义,主要方法的实现,任务添加后,三种不同的处理方式
-
线程池状态变换
-
线程池拒绝策略的实现
-
带返回值的任务的实现方式,
Callable
,Future
-