Java ThreadPoolExecutor 队列策略

错误复现

1
2
3
4
5
6
7
8
9
10
11
@Test
public void testThreadPool() {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10);
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, queue);
for (int i = 0; i < 100; i++) {
final int j = i;
executor.submit(() -> {
System.out.println("task-" + j);
});
}
}
1
2
3
4
5
6
7
8
9
10
11
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@e50a6f6 rejected from java.util.concurrent.ThreadPoolExecutor@14ec4505[Running, pool size = 10, active threads = 10, queued tasks = 3, completed tasks = 27]

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at tech.valuesimplex.workweixin.TestSdk.testThreadPool(TestSdk.java:37)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)

参数介绍

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
序号参数含义备注
1corePoolSize  核心线程数量,线程池初始化时设定corePoolSize 大小和 maximumPoolSize 大小一致的话 线程池中的线程将不会空闲、 keepAliveTime 和 timeUnit 就不会再起作用
2maximumPoolSize线程池最大线程数(非核心线程) 核心线程 和  非核心线程 共同使用线程池、但是核心线程是不会被回收的、回收条件是线程池中的线程数量大于核心线程数
3keepAliveTime  如果当前线程池中线程数大于 corePoolSize。多余的线程、在等待 keepAliveTime 时间后如果还没有新的线程任务指派给它、它就会被回收
4unit   等待时间 keepAliveTime 的单位
5workQueue  等待队列默认 SynchronousQueue 一个没有存储空间的阻塞队列 ,将任务同步交付给工作线程;
可以使用无界队 LinkedBlockingQueue;
有界队列 ArrayBlockingQueue;
以及优先级队列 PriorityBlockingQueue 
6RejectedExecutionHandler饱和策略(分 4 种)AbortPolicy(默认):直接抛弃
CallerRunsPolicy:用调用者的线程执行任务(始终此种方式,可以避免线程被线程池拒绝的情况)
DiscardOldestPolicy:抛弃队列中最久的任务
DiscardPolicy:抛弃当前任务

原因分析

当线程池里的线程都繁忙的时候,新任务会被提交给阻塞队列保存,当提交给阻塞队列的任务,超出了该队列的最大容量时。线程池就会拒绝接收新任务,随即抛出异常

RejectedExecutionHandler 的四种饱和策略

序号策略名称含义备注
1AbortPolicy终止策略是默认的饱和策略,当队列满时,会抛出一个 RejectExecutionException 异常,客户可以捕获这个异常,根据需求编写自己的处理代码不是解决问题的根本
2DiscardPolicy策略会悄悄抛弃该任务。建议最好不用,水太深
3DiscardOldestPolicy策略将会抛弃下一个将要执行的任务,如果此策略配合优先队列 PriorityBlockingQueue,该策略将会抛弃优先级最高的任务
4CallerRunsPolicy调用者运行策略,该策略不会抛出异常,不会抛弃任务,而是将任务回退给调用者线程执行(调用 execute 方法的线程),由于任务需要执行一段时间,所以在此期间不能提交任务,从而使工作线程有时间执行正在执行的任务。非常适合我们的业务场景

解决

1
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, queue, new CallerRunsPolicy());

其他几种线程池

序号线程池名称规则问题点
1FixedThreadPool允许的请求队列长度为 Integer.MAX_VALUE会堆积大量的请求,从而导致 OOM
2SingleThreadPool 
3CachedThreadPool 允许的创建线程数量为 Integer.MAX_VALUE会创建大量的线程,从而导致 OOM
4ScheduledThreadPool