深度剖析:线程池运行原理及常见面试题,看完这篇文章彻底懂了!

你好,我是猿java。

今天我们开门见山,直奔主题:线程池。

申明:本文基于 jdk-11.0.15

为什么有线程还需要线程池

关于 Java线程,有兴趣可以参考小编过往的文章:深度剖析:Java线程运行机制,程序员必看的知识点!

我们先看下线程的几个缺陷:

  • Java的线程最终是由操作系统创建的,因此线程的创建,调度都受限于操作系统的处理能力;
  • 线程在 Java中是一种宝贵的资源,如果程序创建线程时控制不当,可能导致资源耗尽的风险;
  • 互联网的快速发张,很多业务场景会涉及到多线程;

鉴于上面的

内容大纲:
img.png

线程池是什么

线程池(Thread Pool)是 JDK 1.5版本开始引入,由大牛 Doug Lea实现,本文的线程池是指J.U.C提供的 java.util.concurrent.ThreadPoolExecutor 类。 它是一种基于池化思想管理线程的工具,更直白的说,线程池就是用来帮助程序员更优的管理线程的工具。

线程池核心属性

下面给出一张创建线程池最底层构造器源码截图(其他方式创建线程池最终都是调用该构造器):

img.png

通过 ThreadPoolExecutor源码截图我们可以得出线程池中几个核心的概念:corePoolSize、maximumPoolSize、workQueue、keepAliveTime、threadFactory、handler。

理解了这几个核心概念,线程池基本掌握了50%,下面分别对每个属性进行讲解。

corePoolSize

核心线程数,当提交新任务,如果线程池中的线程数量小于 corePoolSize,即便其它线程处于空闲状态,线程池也会创建一个新线程来处理该任务;

maximumPoolSize

最大线程数,线程池中允许创建的最大线程数阈值。当新任务被提交时:

  • 如果线程池中的线程数 countWorker大于 corePoolSize并且小于 maximumPoolSize,任务会被放入队列;
  • 如果线程池中的线程数 countWorker大于 maximumPoolSize,假如任务能够放入堵塞队列则放入,否则执行拒绝策略;

workQueue

工作队列,它是 BlockingQueue 堵塞类型,用来传输和保存由 execute()方法提交的 Runnable任务,当线程池中的线程大于等于
corePoolSize 而小于
maximumPoolSize时,任务会被优先放入队列,任务放入队列有 3种策略:

  1. 直接处理:当 workQueue为
    SynchronousQueue(同步队列),则新任务会立即被交给线程池中已有的线程进行处理,如果无法立即获取可用线程,则认为任务放入队列失败,需要重新生成一个线程来处理该任务(如果此时线程数大于
    maximumPoolSize,任务被拒绝),为了避免任务被拒绝,一般会把 maximumPoolSize设置成无限大,带来的问题是线程数疯长,有资源耗尽的风险,不过该策略可以避免死锁。
  2. 无界队列:当 workQueue为无界队列,任务会被放入队列中,等待空闲的线程来处理。如果任务之间是相互独立的,适合使用这种队列,但是当线程处理的速度小于新任务放入的速度,队列会无线封装,依然存在资源耗尽的风险。
  3. 有界队列:当 workQueue为有界队列,可以规避无界队列资源耗尽的风险,但是这个边界该设置多大比较难把控,如果设置大队列小线程池,虽然可以降低
    CPU使用率和线程切换的开销,但是可能导致低吞吐量。

keepAliveTime

保留存活时间,当线程数大于 corePoolSize时,假如线程池无需处理任务,超出 corePoolSize的空闲线程会保留 keepAliveTime时长后被
terminated,如果设置了 allowCoreThreadTimeOut(true),核心线程也会保留 keepAliveTime时长后会被
terminated,keepAliveTime的值可以通过
setKeepAliveTime(long, TimeUnit)方法动态设置;

threadFactory

线程工厂,线程池中的所有线程都是通过 threadFactory进行创建;使用线程工厂可以避免 new Thread()这种硬码创建新线程,我们可以在
threadFactory中使用特殊的线程子类、优先级等功能。

handler

拒绝策略,当 Executor关闭时,以及 Executor对最大线程和工作队列容量都使用有限的界限,并且已经饱和时,在方法 execute(Runnable)
中提交的新任务将被拒绝。线程池提供了 4种拒绝策略:

  • AbortPolicy:默认的拒绝策略。直接丢弃任务并抛出 RejectedExecutionException这样一个运行时错误;
  • CallerRunsPolicy:任务会被转交给调用 execute()方法的线程处理;
  • DiscardPolicy:直接丢弃了无法执行的任务;
  • DiscardOldestPolicy:如果线程池没有被关闭,则丢弃工作队列头部的任务(最早放入的任务),然后重试执行(可能再次失败,导致重复此操作);

在实际生产中,我们需要根据具体的业务来自定义拒绝策略:

比如:对于一些不能丢失的数据,可以写数据库、写log或者放入MQ中,当线程池恢复处理能力时,再取出数据进行处理。

比如:在数据库中数据有软状态,可以直接丢弃,当线程池恢复处理能力时,再从数据库中获取数据进行处理。

线程池的运行状态

线程池的运行状态主要有 5种:

  • RUNNING:接受新任务并处理排队任务;
  • SHUTDOWN:不接受新任务,但处理排队任务;
  • STOP:不接受新任务,不处理排队任务,并中断正在进行的任务;
  • TIDYING:所有任务都已终止,workerCount 为零,转换到状态 TIDYING的线程将运行 terminate()钩子方法;
  • TERMINATED:当 terminate()完成,在 awaitTermination()中等待的线程将达到 TERMINATED状态;

5种运行状态的转换关系如下图:

img.png

线程池运行状态是通过 ctl变量进行存储的,ctl包装了 workerCount 和 runState两个字段,共 32bit,源码如下:

1
2
3
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
}

workerCount:指的是线程池中的工作线程数;

runState:指的是线程池的运行状态,包含 RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 5种;

ctl 结构如下图:

img.png

线程池工作机制

下面整理了一张线程池工作的核心流程图:

img.png

线程池源码分析

execute()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {

if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
/**
* 步骤1:workerCount < corePoolSize,尝试创建一个新线程执行任务
* workerCountOf()方法取出 ctl变量低 29位的值,表示当前活动的线程数;
* 如果当前活动的线程数小于corePoolSize,则新建一个线程放入线程池中,并把该任务放到线程中
*/
if (workerCountOf(c) < corePoolSize) {
/**
* addWorker中的第二个参数表示限制添加线程的数量 是根据据corePoolSize 来判断还是maximumPoolSize来判断;
* 如果是ture,根据corePoolSize判断
* 如果是false,根据maximumPoolSize判断
*/
if (addWorker(command, true))
return;

// 如果 addWorker()添加失败,则重新获取 ctl值
c = ctl.get();
}
/**
* 步骤2:当步骤1的 addWorker()失败,并且线程池是RUNNING,并且任务成功接入阻塞队列
*/
if (isRunning(c) && workQueue.offer(command)) {
//double-check,重新获取ctl的值
int recheck = ctl.get();
/**
* 因为线程池的状态一直在变化,所以再次判断线程池的状态,如果不是运行状态,并且移之前已添加到阻塞队列中command
* 然后进入拒绝策略,流程终止
*/
if (!isRunning(recheck) && remove(command))
reject(command);
/**
* 线程池为Running状态,获取线程池中的总线程数,如果数量是0,则执行 addWorker()方法;
* 第一个参数为null,表示在线程池中创建一个线程,但不去启动
* 第二个参数为false,将线程池的线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
/**
* 步骤3:步骤1的 addWorker()失败并且无法进入步骤2,有两种情况:
* 1、线程池的状态不是RUNNING;
* 2、线程池状态是RUNNING,但是workerCount >= corePoolSize, workerQueue已满
* 再次调用addWorker方法,第二个参数传 false,将线程池的线程数上限设置为 maximumPoolSize;
* 如果失败则执行拒绝策略;
*/
} else if (!addWorker(command, false))
reject(command);
}
}

execute(Runnable command) 方法总结:

  1. 如果 workerCount < corePoolSize,则尝试创建并启动一个新线程来执行新提交的任务;
  2. 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添 加到该阻塞队列中;
  3. 如果 corePoolSize <= workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则尝试创建并启动一个新线程来执行新提交的任务;
  4. 如果 workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的
    AbortPolicy策略是直接抛异常;

addWorker()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 给线程池增加新的 worker
* @param firstTask 当前任务
* @param core 创建新worker是以 corePoolSize还是 maximumPoolSize作为最大阈值
* @return 返回增加新的 worker的结果,true or false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// retry 作用域为 for循环
retry:
// 自旋
for (int c = ctl.get(); ; ) {
// Check if queue empty only if necessary.
/**
* 这里的判断比较绕,直白的说:线程池已经关闭 或者 有资格关闭,则无法 addWorker并且返回false;
* 如果线程池当前的 runState为 SHUTDOWN、STOP、TIDYING、TERMINATED 其中一种;并且下面的结果
* 三个条件 通过 || (或) 连接,只要有一个为 true,就返回 true;
* 1. 线程池当前的 runState为 STOP、TIDYING、TERMINATED 其中一种;表示关闭状态,不再接收提交的任务,但却可以继续处理阻塞队列中已经保存的任务;
* 2. firstTask 不为空
* 3. Check if queue empty only if necessary.阻塞队列为空
*/
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;

for (; ; ) {
// workerCount 大于最大线程阈值,无法 addWorker,返回false
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 通过 CAS 方式来增加线程数,如果成功,则跳出最外层 for
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 线程池状态为 非 RUNNING,则重新进入 for循环
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

// addWorker() 的核心流程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
/**
* 根据firstTask来创建Worker对象,每个Worker对象里面会封装一个通过 ThreadFactory创建的线程
* Worker(Runnable firstTask) {
* setState(-1); // inhibit interrupts until runWorker
* this.firstTask = firstTask;
* this.thread = getThreadFactory().newThread(this);
* }
*/

w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 获取可重入锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
// 如果线程池的runState为 RUNNING状态,或者 runState < STOP(RUNNING或者SHUTDOWN)并且任务为空
// 如果线程不为NEW,则抛异常
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程,Worker implements Runnable,是一个线程类,此时t.start()会调用 Worker的run()方法,从而调用 runWorker()方法
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
}

addWorker() 方法总结:

  • addWorker()方法的主要作用是在线程池中创建一个新的线程并执行任务;
  • firstTask参数用于指定新增的线程执行的第一个任务,
  • core参数用来限制创建新线程,是以 corePoolSize还是以 maximumPoolSize作为线程数最大阈值。
  • 需要获取 mainLock可重入锁成功才能将 worker加入 HashSet中
  • worker添加成功后,调用线程的t.start() 启动线程执行任务;

Worker类分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

// TODO: switch to AbstractQueuedLongSynchronizer and move
// completedTasks into the lock word.

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
/**
* 把state设置为 -1,阻止中断直到调用runWorker方法;
* 因为 AQS默认 state是 0,刚创建一个 Worker对象,在没有执行任务时,不应该被中断
*/
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
/**
* 创建一个线程,newThread()方法入参是 this对象,也就是 Worker对象,
* 因为 Worker implements Runnable,Worker 是一个线程类;
* 所以 Worker对象中的 Thread.start()会间接调用 Worker类中 run()方法
*/
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker.
* 将主运行循环委托给外部 runWorker
*/
public void run() {
runWorker(this);
}

// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.

protected boolean isHeldExclusively() {
return getState() != 0;
}

/** 获取锁
* CAS 方式修改 state,不可重入;
* state根据 0 来判断,所以 Worker构造方法中state=-1,就是为了禁止在执行任务前对线程进行中断;
* 因此,在 runWorker()方法中会先调用Worker对象的 unlock()方法将 state设置为 0
*/
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// 释放锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() {
acquire(1);
}

public boolean tryLock() {
return tryAcquire(1);
}

public void unlock() {
release(1);
}

public boolean isLocked() {
return isHeldExclusively();
}

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

Worker类总结:

  • Worker类实现了 Runnable接口,因此是一个线程类。
  • Worker类继承了AQS,使用了 AQS实现独占锁的功能。
  • 为什么不使用 ReentrantLock可重入锁来实现?
    从源码的 tryAcquire()方法可以看出它是不允许重入的,而ReentrantLock是允许可重入的:
  1. lock方法一旦获取独占锁,表示当前线程正在执行任务中;
  2. 如果正在执行任务,则不应该中断线程;
  3. 如果该线程现在不是独占锁的状态,也就是空闲状态,说明它没有处理任务,这时可以对该线程进行中断;
  4. 线程池中执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
  5. 之所以设置为不可重入的,是因为在任务调用setCorePoolSize这类线程池控制的方法时,不会中断正在运行的线程所以,Worker继承自AQS,用于判断线程是否空闲以及是否处于被中断。

runWorker()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
}

runWorker()方法总结:

  1. while 死循环通过getTask()方法从阻塞队列中获取任务,当getTask()为null时跳出 while()循环;
  2. 正常进入 while死循,先加锁,然后进入任务处理逻辑;
  3. 如果线程池正在停止,则中断当前线程,调用 processWorkerExit()方法,runWorker()结束;
  4. 调用 task.run() 执行任务;
  5. while 循环不论何种原因跳出,最终都需要执行 processWorkerExit()方法;
  6. runWorker()方法执行成功,意味着 Worker中的 run()方法执行成功,则释放锁,销毁当前线程;

getTask()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
 public class ThreadPoolExecutor extends AbstractExecutorService {

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (; ; ) {
int c = ctl.get();

/**
* 再次判断线程池的 runState,决定能够获取到 task
*/
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

/**
* timed 变量用于判断是否需要进行 keepAliveTime超时处理;
* allowCoreThreadTimeOut 默认是 false,代表核心线程不允许超时;
* wc > corePoolSize,表示当前线程数大于核心线程数;
* 对于超出 corePoolSize的线程,需要 keepAliveTime超时处理;
*/
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

/**
* wc > maximumPoolSize 产生的原因是:此方法执行阶段同时执行了 setMaximumPoolSize()方法;
* timed && timedOut 如果为true,表示当前操作需要进行获取任务超时处理,并且上次从阻塞队列中获取任务发生了超时;
* 如果池中的线程数大于 1,或者 workQueue为空,则尝试将 workCount减1;如果减1失败,则返回重试;
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

/**
* 从工作队列中获取任务,根据 timed来决定采用超时获取task()还是堵塞获取task
* timed为 true,则通过workQueue的poll方法进行超时控制,如果在keepAliveTime时间内没有获取任务,则返回null;
* 否则通过take方法,如果队列为空,则take方法会阻塞直到队列中不为空;
*/
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
}

getTask()方法总结:

  • 判断线程池的 runState决定是否能返回任务;
  • 判断是否需要对超出 corePoolSize的线程最超时推出操作;
  • 根据 timed来判断 采用什么方式从阻塞队列中获取任务;

processWorkerExit()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 public class ThreadPoolExecutor extends AbstractExecutorService {

private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly为 true,则说明线程执行时出现异常,需要将 workerCount 减1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
// 获取锁
mainLock.lock();
try {
// 已完成 taskCount计数加1
completedTaskCount += w.completedTasks;
// 从 workers hashSet中移除当前 worker
workers.remove(w);
} finally {
mainLock.unlock();
}

// 钩子函数,根据线程池的状态来判断是否结束线程池
tryTerminate();

int c = ctl.get();
/**
* 当前线程 runState为 RUNNING或 SHUTDOWN 时,如果 worker是异常结束,会执行 addWorker()方法;
* 如果allowCoreThreadTimeOut=true,那么等待队列有任务,至少保留一个worker;
* 如果allowCoreThreadTimeOut=false,workerCount少于coolPoolSize
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
}

到此,线程池的核心方法已经分析完毕,那么,我们需要如何创建线程池呢?

线程池创建方式

手动创建

参数按需设置,比如下面代码就是自己创建的一个线程池:

1
2
3
4
5
6
7
ThreadPoolExecutor executor=new ThreadPoolExecutor(2,
4,
4,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

使用 Executors提供的方法

newFixedThreadPool

固定线程数的线程池,corePoolSize = maximumPoolSize,keepAliveTime = 0,工作队列为无界堵塞队列 LinkedBlockingQueue。

  • 优点:线程数是固定的,不会无限增长。
  • 缺点:线程数是固定的,无法动态调整;工作队列是无界的,如果任务堆积过多可能导致内存溢出;不支持自定义拒绝策略

newSingleThreadExecutor

只有一个线程的线程池,corePoolSize = maximumPoolSize = 1,keepAliveTime = 0,工作队列为无界堵塞队列 LinkedBlockingQueue。

  • 优点:可以顺序执行任务
  • 缺点:线程数为1,不支持并发,无法动态调整;工作队列是无界的,如果任务堆积过多可能导致内存溢出;不支持自定义拒绝策略

newCachedThreadPool

按需要创建新线程的线程池,核心线程数为0,最大线程数为 Integer.MAX_VALUE,keepAliveTime = 60s,工作队列使用同步SynchronousQueue。

  • 优点:可以无线添加新的线程,当需求降低时会自动回收空闲线程。适用于执行很多的短期异步任务,或者是负载较轻的服务器。
  • 缺点:不支持自定义拒绝策略

newScheduledThreadPool

创建一个以延迟或定时的方式来执行任务的线程池,堵塞队列为 DelayedWorkQueue。

  • 优点:适用于需要多个后台线程执行周期任务
  • 缺点:任务是单线程方式执行,一旦一个任务失败其他任务也受影响;不支持自定义拒绝策略

newWorkStealingPool

窃取(抢占式操作)线程池,JDK 1.8 新增,底层使用 ForkJoinPool 实现,1个任务拆分成多个”小任务”,”小任务”被分发到多个线程上执行,”小任务” 都执行完成后,再将结果合并。每一个线程都有一个自己的队列,当线程发现自己的队列没有任务了,就会到其它线程的队列里获取任务执行,这个过程理解为”窃取”。
和上面 4种线程池有很明显的区别,前 4种线程池都有核心线程数、最大线程数等等,而这就使用了一个并发线程数解决问题,任务的执行是无序的,哪个线程抢到任务。

  • 优点:每个线程有自己的队列,可以减少队列的争用,适合并发量大的任务
  • 缺点:不会保证任务的顺序执行,不支持自定义拒绝策略

线程池终止方式

终止线程池主要有两种方式:

  • shutdown():不接受新任务,但是在关闭前会将之前提交的任务处理完毕。
  • shutdownNow():直接关闭线程池,通过 Thread.interrupt()方法终止所有线程,不会等待之前提交的任务执行完毕,但是会返回队列中未处理的任务。

如何配置线程池

线程池参数的配置,会依据服务器的工作类型来设置,参考意见如下:

  • 计算密集型,设置 线程数 = CPU数 + 1;

  • I/O密集型,线程数 = CPU数 * 2;

但是在实际开发中,服务器很难完全区分是哪一种类型,因此一个比较合理的设置是:

线程数 = CPU数 * CPU利用率 * (任务等待时间 / 任务计算时间 + 1)

比如,程序部署在 4核的服务器上用于任务计算,假设任务计算时长是 100ms,等待 I/O操作为 400ms,则线程数约为:4 * 1 * (1 + 400 / 100) = 20个

不过,上面的都是参考值,在实际生产中的做法是:经过压测得出最佳线程数,并且把 corePoolSize 和 maximumPoolSize
设计成可配置化参数,一旦需要调整大小,可以直接通过配置中心来完成,以免重新发布代码。

常见面试题

注意:在面试中回答线程池的问题,一定要先说明 jdk版本,因为不同的版本实现可能有差异(jdk修复bug或者性能优化导致的)。有了上面对线程池原理的分析,下面的面试题可以很容易的回答。

1.为什么有线程还需要线程池呢?

线程在正常执行或者异常中断时会被销毁,如果频繁的创建很多线程,不仅会消耗系统资源,还会降低系统的稳定性,一不小心把系统搞崩了。

使用线程池可以带来以下几个好处:

  • 线程池内部的线程数是可控的;
  • 可以灵活的设置参数;
  • 线程池内会保留部分线程,当提交新的任务可以直接运行;
  • 方便内部线程资源的管理,调优和监控;

2.能讲下线程池中几个核心属性吗?

参考上述 核心属性 章节

3.能讲下线程池的运行原理吗?

参考上述 运行原理 章节

4.线程池中的核心线程会被销毁吗?

默认情况下不会,但是如果设置了 allowCoreThreadTimeOut(true),核心线程也会保留 keepAliveTime时长后会被 terminated。

5.能讲下线程池的运行状态吗?他们之间是如何转换的?

参考上述 线程池的运行 章节

6.能聊聊线程池的拒绝策略吗?

参考上述 拒绝策略 章节

7.能讲讲 ctl的实现机制吗?

ctl将 runState 和 workerCount 封装成了一个原子 AtomicInteger操作。因为 runState 和 workerCount 是线程池正常运行的2个最重要属性。

因此无论是查询还是修改,我们必须保证对这2个属性的操作是属于“同一时刻”的,也就是原子操作,否则就会出现错乱的情况。如果我们使用2个变量来分别存储,要保证原子性则需要额外进行加锁操作,这显然会带来额外的开销,而将这2个变量封装成1个
AtomicInteger 则不会带来额外的加锁开销,而且只需使用简单的位操作就能分别得到 runState 和 workerCount。

由于这个设计,workerCount 的上限 CAPACITY = (1 << 29) - 1,对应的二进制原码为:0001 1111 1111 1111 1111 1111 1111
1111(29个1)。

通过 ctl得到 runState:runState = ctl & CAPACITY。( 按位取反),于是”~CAPACITY”的值为:1110 0000 0000 0000 0000 0000 0000
0000,只有高3位为1,与 ctl 进行 & 操作,结果为 ctl
高3位的值,也就是 runState。

通过 ctl得到 workerCount:workerCount = c & CAPACITY(位操作)。

8.Executors提供了哪些创建线程池的方法?

参考上述 线程池创建方法 章节

9.线程池中的核心线程数和最大线程数该如何设置呢?

10.如何终止线程池?

参考上述 终止线程池 章节

11.为什么线程池中要使用堵塞队列?

  • 主要原因:线程从阻塞队列取任务时,如果阻塞队列不为空则立即返回,如果为空,则线程会被阻塞,一直等待,直到队列中有新的任务,这样就充分的利用了阻塞队列
    堵塞和通知线程的功能。如果采用非堵塞队列,则需要写额外的机制来实现通知线程获取任务。
  • 次要原因:当核心线程处理不过来,可以通过堵塞队列进行任务堆积

12. 非核心线程能成为核心线程吗?

核心线程和非核心线程是理解上的区分,线程池内部并不区分核心线程和非核心线程的,会根据池的工作线程数 countWorker和 corePoolSize 进行调整,

  • 当 countWorker <= corePoolSize,则 countWorker这些线程被理解成核心线程;
  • 当 corePoolSize < countWorker < maximumPoolSize,则 countWorker - corePoolSize 这部分线程被理解成非核心线程;

13. 线程池中的线程在什么时候创建?

默认情况下,即使是核心线程也只能在新任务到达时才创建和启动。但是可以使用 prestartCoreThread 或 prestartAllCoreThreads 方法来提前启动核心线程,即创建线程池的时候就会创建对应数量的线程。

14. 有界队列和无解队列有什么风险?

  • 有界队列,当线程池的队列满了后,被拒绝的任务如何处理;
  • 无界队列,当任务的提交速度大于线程池的处理速度,可能会导致内存溢出;

1.5 为什么不推荐使用Executors包装的线程池?

参考上述 线程池创建方式 章节

总结

到此,我们对线程池进行了全面的分析,也列举了面试中常见的问题,对于线程池的学习,个人极力推荐优先阅读源码,毕竟是原滋原味,然后结合一些优秀的文章,比如本文😁,对照着理解,这样的话就能快速的掌握理论,
但是,光有理论还不行,还应该多参与一些关于线程池的开发工作,也可以多参与一些生产环境线程池的监控和排错任务,加强实战水平,这样才能在工作和面试中游刃有余。

文章总结不易,看到的这里的小伙伴,还请帮忙点赞,关注哦!

学习交流

如果你觉得本文章对你有帮助,感谢转发给更多的好友,关注我:猿java,为你呈现更多的硬核文章。

drawing