Java并发利器:ThreadPoolExecutor 高效使用指南
Java并发利器:ThreadPoolExecutor 高效使用指南
dong4j有时候,系统需要处理非常多的执行时间很短的请求,如果每一个请求都开启一个新线程的话,系统就要不断的进行线程的创建和销毁,有时花在创建和销毁线程上的时间会比线程真正执行的时间还长。而且当线程数量太多时,系统不一定能受得了。
使用线程池主要为了解决一下几个问题:
- 通过重用线程池中的线程,来减少每个线程创建和销毁的性能开销。
- 对线程进行一些维护和管理,比如定时开始,周期执行,并发数控制等等。
Executor
Executor 是一个接口,跟线程池有关的基本都要跟他打交道。下面是常用的 ThreadPoolExecutor 的关系。
Executor 接口很简单,只有一个 execute 方法。
ExecutorService 是 Executor 的子接口,增加了一些常用的对线程的控制方法,之后使用线程池主要也是使用这些方法。
AbstractExecutorService 是一个抽象类。ThreadPoolExecutor 就是实现了这个类。
ThreadPoolExecutor
构造方法
ThreadPoolExecutor 是线程池的真正实现,他通过构造方法的一系列参数,来构成不同配置的线程池。常用的构造方法有下面四个:
1 | ThreadPoolExecutor(int corePoolSize, |
1 | ThreadPoolExecutor(int corePoolSize, |
1 | ThreadPoolExecutor(int corePoolSize, |
1 | ThreadPoolExecutor(int corePoolSize, |
构造方法参数说明
corePoolSize
核心线程数,默认情况下核心线程会一直存活,即使处于闲置状态也不会受存
keepAliveTime
限制。除非将allowCoreThreadTimeOut
设置为true
。maximumPoolSize
线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有设置大小的 LinkedBlockingDeque 时,这个值无效。
keepAliveTime
非核心线程的闲置超时时间,超过这个时间就会被回收。
unit
指定
keepAliveTime
的单位,如TimeUnit.SECONDS
。当将allowCoreThreadTimeOut
设置为true
时对 corePoolSize 生效。workQueue
线程池中的任务队列.
常用的有三种队列,
SynchronousQueue
,LinkedBlockingDeque
,ArrayBlockingQueue
。threadFactory
线程工厂,提供创建新线程的功能。ThreadFactory 是一个接口,只有一个方法
1
2
3public interface ThreadFactory {
Thread newThread(Runnable r);
}通过线程工厂可以对线程的一些属性进行定制。
默认的工厂:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager var1 = System.getSecurityManager();
this.group = var1 != null?var1.getThreadGroup():Thread.currentThread().getThreadGroup();
this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}
public Thread newThread(Runnable var1) {
Thread var2 = new Thread(this.group, var1, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
if(var2.isDaemon()) {
var2.setDaemon(false);
}
if(var2.getPriority() != 5) {
var2.setPriority(5);
}
return var2;
}
}RejectedExecutionHandler
RejectedExecutionHandler
也是一个接口,只有一个方法1
2
3public interface RejectedExecutionHandler {
void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}当线程池中的资源已经全部使用,添加新线程被拒绝时,会调用 RejectedExecutionHandler 的 rejectedExecution 方法。
线程池规则
线程池的线程执行规则跟任务队列有很大的关系。
下面都假设任务队列没有大小限制:
- 如果线程数量核心线程数,但核心线程数,但核心线程数,并且 > 最大线程数,当任务队列是 LinkedBlockingDeque,会将超过核心线程的任务放在任务队列中排队。也就是当任务队列是
LinkedBlockingDeque 并且没有大小限制时,线程池的最大线程数设置是无效的,他的线程数最多不会超过核心线程数。 - 如果线程数量 > 核心线程数,并且 > 最大线程数,当任务队列是 SynchronousQueue 的时候,会因为线程池拒绝添加任务而抛出异常。
- 如果线程数量核心线程数,但核心线程数,但核心线程数,并且 > 最大线程数,当任务队列是 LinkedBlockingDeque,会将超过核心线程的任务放在任务队列中排队。也就是当任务队列是
任务队列大小有限时
- 当 LinkedBlockingDeque 塞满时,新增的任务会直接创建新线程来执行,当创建的线程数量超过最大线程数量时会抛异常。
- SynchronousQueue 没有数量限制。因为他根本不保持这些任务,而是直接交给线程池去执行。当任务数量超过最大线程数时会直接抛异常。
规则验证
前提
所有的任务都是下面这样的,睡眠两秒后打印一行日志:
1 | Runnable myRunnable = new Runnable() { |
所有验证过程都是下面这样,先执行三个,再执行三个,8 秒后,各看一次信息
1 | executor.execute(myRunnable); |
验证 1
核心线程数为 6,最大线程数为 10。超时时间为 5 秒
1
ThreadPoolExecutor executor = new ThreadPoolExecutor(6, 10, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18--- 先开三个 ---
核心线程数 6
线程池线程数 3
队列任务数 0
--- 再开三个 ---
核心线程数 6
线程池线程数 6
队列任务数 0
pool-1-thread-1 run
pool-1-thread-6 run
pool-1-thread-5 run
pool-1-thread-3 run
pool-1-thread-4 run
pool-1-thread-2 run
----8 秒之后 ----
核心线程数 6
线程池线程数 6
队列任务数 0
可以看到每个任务都是是直接启动一个核心线程来执行任务,一共创建了 6 个线程,不会放入队列中。8 秒后线程池还是 6 个线程,核心线程默认情况下不会被回收,不收超时时间限制。
验证 2
核心线程数为 3,最大线程数为 6。超时时间为 5 秒, 队列是 LinkedBlockingDeque
1
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18--- 先开三个 ---
核心线程数 3
线程池线程数 3
队列任务数 0
--- 再开三个 ---
核心线程数 3
线程池线程数 3
队列任务数 3
pool-1-thread-3 run
pool-1-thread-1 run
pool-1-thread-2 run
pool-1-thread-3 run
pool-1-thread-1 run
pool-1-thread-2 run
----8 秒之后 ----
核心线程数 3
线程池线程数 3
队列任务数 0当任务数超过核心线程数时,会将超出的任务放在队列中,只会创建 3 个线程重复利用。
验证 3
- 核心线程数为 3,最大线程数为 6。超时时间为 5 秒, 队列是 SynchronousQueue
1 | ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); |
1 | --- 先开三个 --- |
当队列是 SynchronousQueue 时,超出核心线程的任务会创建新的线程来执行,看到一共有 6 个线程。但是这些线程是费核心线程,收超时时间限制,在任务完成后限制超过
5 秒就会被回收。所以最后看到线程池还是只有三个线程。
验证 4
核心线程数是 3,最大线程数是 4,队列是 LinkedBlockingDeque
1
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
1 | --- 先开三个 --- |
LinkedBlockingDeque 根本不受最大线程数影响。
但是当 LinkedBlockingDeque 有大小限制时就会受最大线程数影响了
4.1 比如下面,将队列大小设置为 2.
1 | ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(2)); |
1 | --- 先开三个 --- |
首先为三个任务开启了三个核心线程 1,2,3,然后第四个任务和第五个任务加入到队列中,第六个任务因为队列满了,就直接创建一个新线程
4,这是一共有四个线程,没有超过最大线程数。8 秒后,非核心线程收超时时间影响回收了,因此线程池只剩 3 个线程了。
4.2 将队列大小设置为 1
1 | ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(1)); |
1 | Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.sunlinlin.threaddemo.Main$1@677327b6 rejected from java.util.concurrent.ThreadPoolExecutor@14ae5a5[Running, pool size = 4, active threads = 4, queued tasks = 1, completed tasks = 0] |
直接出错在第 6 个 execute 方法上。因为核心线程是 3 个,当加入第四个任务的时候,就把第四个放在队列中。加入第五个任务时,因为队列满了,就创建新线程执行,创建了线程
4。当加入第六个线程时,也会尝试创建线程,但是因为已经达到了线程池最大线程数,所以直接抛异常了。
验证 5
核心线程数是 3 ,最大线程数是 4,队列是 SynchronousQueue
1
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
1 | Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.sunlinlin.threaddemo.Main$1@14ae5a5 rejected from java.util.concurrent.ThreadPoolExecutor@7f31245a[Running, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 0] |
这次在添加第五个任务时就报错了,因为 SynchronousQueue 各奔不保存任务,收到一个任务就去创建新线程。所以第五个就会抛异常了。