原创

Java线程池的使用与ThreadPoolExecutor详解

为什么业务代码不能一直 new Thread()

线程不是便宜资源。

在 Java 里直接 new Thread().start() 看起来简单,但一旦请求量上来,问题会非常直接:

  • 线程创建和销毁有成本
  • 线程数失控后会带来频繁上下文切换
  • 内存占用会明显上升
  • 高并发下可能把机器拖慢,甚至把服务打挂

线程池的核心价值不是“写法更高级”,而是复用线程、控制并发、管理任务队列和拒绝策略

真正有用的地方也在这里:你终于可以明确回答“系统最多同时处理多少异步任务”“队列能堆多少”“堆满后怎么办”。


线程池的核心组成

Java 中最常用的线程池实现是 ThreadPoolExecutor。它的完整构造方法如下:

public ThreadPoolExecutor(
        int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory,
        RejectedExecutionHandler handler) {
}

这 7 个参数基本决定了线程池的行为。

1. corePoolSize

核心线程数。

线程池创建后不会立刻把核心线程全部启动,但当任务不断提交时,优先创建线程直到达到 corePoolSize

可以理解为:线程池平时的常驻工作线程数量

2. maximumPoolSize

最大线程数。

当核心线程都在忙,而且队列也放不下新任务时,线程池才会继续创建线程,直到达到 maximumPoolSize

很多人误以为线程池一满就直接扩容到最大线程数。不是这样。 是否继续扩容,取决于队列是否已满。

3. keepAliveTime

非核心线程的空闲存活时间。

超过核心线程数创建出来的那些“临时线程”,如果空闲时间超过 keepAliveTime,就会被回收。

4. workQueue

任务阻塞队列,用来存放还没被线程执行的任务。

常见队列有:

  • ArrayBlockingQueue:有界队列,容量固定
  • LinkedBlockingQueue:可选有界或近似无界,默认容量很大
  • SynchronousQueue:不存储元素,提交一个任务必须马上交给线程处理
  • PriorityBlockingQueue:按优先级排序

这里是线程池行为差异最大的地方之一。

5. threadFactory

线程工厂。

用于自定义线程的创建逻辑,比如:

  • 自定义线程名
  • 设置是否为守护线程
  • 设置优先级
  • 统一异常处理逻辑

生产环境里,给线程起清晰的名字非常重要,排查问题时能省很多时间。

6. RejectedExecutionHandler

拒绝策略。

当线程数达到最大值,队列也满了,再来新任务时,就会触发拒绝策略。

JDK 内置 4 种:

  • AbortPolicy:直接抛异常
  • CallerRunsPolicy:由提交任务的线程自己执行
  • DiscardPolicy:直接丢弃
  • DiscardOldestPolicy:丢弃队列中最老的任务,再尝试提交当前任务

7. 任务提交流程

把上面几个参数串起来,线程池接收任务时大致遵循这个流程:

  1. 当前运行线程数 < corePoolSize,创建核心线程执行任务
  2. 否则,任务进入阻塞队列
  3. 如果队列满了,且运行线程数 < maximumPoolSize,创建非核心线程执行任务
  4. 如果队列也满、线程数也到上限,执行拒绝策略

这个流程不搞清楚,后面很多参数都容易配错。


最基础的线程池使用方式

1. 创建一个线程池

import java.util.concurrent.*;

public class ThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(
                4,
                8,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                new ThreadFactory() {
                    private int count = 1;

                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("biz-thread-" + count++);
                        return thread;
                    }
                },
                new ThreadPoolExecutor.AbortPolicy()
        );

        for (int i = 0; i < 10; i++) {
            int taskId = i;
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 执行任务:" + taskId);
            });
        }

        executorService.shutdown();
    }
}

这个例子展示了最基本的几个点:

  • 使用 ThreadPoolExecutor 明确指定参数
  • 使用 execute() 提交无返回值任务
  • 自定义线程名
  • 使用结束后调用 shutdown()

execute()submit() 的区别

这是很常见的面试题,但真正重要的是你要知道在业务里什么时候该用哪个。

execute()

用于提交 Runnable,没有返回值。

executorService.execute(() -> {
    System.out.println("执行异步任务");
});

submit()

可以提交 RunnableCallable,返回 Future,可以获取执行结果。

Future<Integer> future = executorService.submit(() -> {
    Thread.sleep(1000);
    return 200;
});

try {
    Integer result = future.get();
    System.out.println("任务返回结果:" + result);
} catch (Exception e) {
    e.printStackTrace();
}

区别不只是“有没有返回值”

更关键的一点是:异常处理表现不同

execute() 提交的任务抛异常

executorService.execute(() -> {
    throw new RuntimeException("execute exception");
});

异常会直接交给线程的 UncaughtExceptionHandler,默认情况下会打印堆栈。

submit() 提交的任务抛异常

Future<?> future = executorService.submit(() -> {
    throw new RuntimeException("submit exception");
});

future.get();

异常不会立即打印,而是被封装进 Future,只有在调用 get() 时才会抛出。

这意味着一个很现实的问题:

很多项目里用 submit() 提交任务,但从来不调用 get(),结果任务异常被静默吞掉了。

这是线上排查异步任务失败时非常常见的坑。


为什么不建议直接使用 Executors 快速创建线程池

很多教程会这样写:

ExecutorService executorService = Executors.newFixedThreadPool(10);

或者:

ExecutorService executorService = Executors.newCachedThreadPool();

代码确实短,但在生产环境里通常不建议直接这么用。

1. newFixedThreadPool

底层使用的是近似无界的 LinkedBlockingQueue

这意味着线程数虽然固定,但队列可能无限堆积。 如果任务生产速度长期大于消费速度,内存压力会不断升高,最后可能触发 OOM。

2. newCachedThreadPool

底层使用 SynchronousQueue,并且最大线程数非常大。

这意味着只要任务来得快、执行得慢,线程池就会疯狂创建线程。 它不太会把任务压在队列里,而是倾向于不断扩线程。

结果往往不是吞吐变高,而是线程数暴涨,CPU 上下文切换加剧。

3. 官方和实践里的建议

实际项目中更稳妥的方式是:

  • 直接使用 ThreadPoolExecutor
  • 明确指定核心线程数、最大线程数、队列容量和拒绝策略
  • 根据任务类型做不同线程池隔离

很多团队规范之所以强调这一点,不是为了“代码风格统一”,而是为了避免失控。


生产环境更推荐的线程池写法

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomThreadPoolExample {

    private static final ExecutorService ORDER_EXECUTOR = new ThreadPoolExecutor(
            4,
            8,
            60L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(200),
            new ThreadFactory() {
                private final AtomicInteger count = new AtomicInteger(1);

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("order-pool-" + count.getAndIncrement());
                    thread.setDaemon(false);
                    return thread;
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public static void main(String[] args) {
        for (int i = 0; i < 20; i++) {
            int orderId = i;
            ORDER_EXECUTOR.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 处理订单:" + orderId);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        ORDER_EXECUTOR.shutdown();
    }
}

这个写法比 Executors.newXXX() 多几行,但它至少把这些关键事情说清楚了:

  • 最大承载能力是多少
  • 队列能堆多少任务
  • 满了以后怎么处理
  • 线程名字是什么
  • 这是哪个业务的线程池

这些信息对线上运维和问题排查非常关键。


线程池参数应该怎么配

没有一套参数适合所有场景。

真正应该先判断的是:你的任务是 CPU 密集型,还是 IO 密集型。

1. CPU 密集型任务

比如:

  • 大量计算
  • 加密解密
  • 图像处理
  • 复杂规则运算

这类任务主要消耗 CPU,线程数一般不宜太多。 经验上通常设置为:

线程数 ≈ CPU 核心数 或 CPU 核心数 + 1

线程太多通常没有好处,反而会增加上下文切换。

2. IO 密集型任务

比如:

  • 调用数据库
  • 远程 RPC
  • 读写文件
  • 网络请求

这类任务大部分时间线程都在等待,所以线程数可以适当调高。

常见经验公式:

线程数 ≈ CPU 核心数 * 2

或者更细一点:

线程数 = CPU核心数 * (1 + 等待时间 / 计算时间)

但这只是起点,不是标准答案。 最终还是要结合压测、响应时间、机器配置和任务耗时分布来调。

3. 队列容量怎么配

队列不是越大越好。

队列太大意味着:

  • 问题不容易暴露
  • 请求延迟会变长
  • 高峰期任务大量堆积
  • 系统看似没挂,实际上已经开始雪崩前排队

很多系统最危险的状态不是“立刻失败”,而是“慢慢堆死”。

所以业务里一般更推荐:

  • 有界队列
  • 给出明确容量
  • 配合合适的拒绝策略
  • 在监控里观察活跃线程数、队列长度、任务耗时和拒绝次数

四种拒绝策略怎么选

1. AbortPolicy

new ThreadPoolExecutor.AbortPolicy()

默认策略,直接抛 RejectedExecutionException

适合对任务不允许悄悄丢失的场景。 比如订单、支付、库存变更等核心流程。

优点是问题暴露快。 缺点是如果上层没有处理好异常,用户请求会直接失败。

2. CallerRunsPolicy

new ThreadPoolExecutor.CallerRunsPolicy()

由提交任务的线程自己执行任务。

这个策略的特点是“削峰限流”。 因为提交线程被迫去干活,提交速度自然会慢下来。

适合可以接受调用方变慢,但不希望直接丢任务的场景。

不过它也有副作用: 如果提交线程本身是 Tomcat 请求线程、Netty 事件线程或者消息消费线程,可能会把上游链路一起拖慢。

3. DiscardPolicy

new ThreadPoolExecutor.DiscardPolicy()

直接丢弃任务,不报错。

除非任务本身允许丢失,比如部分日志、统计、低价值埋点,否则不建议用。

最糟糕的地方在于: 任务没执行,但系统表面上很安静。

4. DiscardOldestPolicy

new ThreadPoolExecutor.DiscardOldestPolicy()

丢弃队列里最早的任务,再尝试提交当前任务。

适合某些“最新数据比旧数据更重要”的场景,但用得并不多。 因为很多业务并不真的能接受“悄悄把老任务丢了”。


一个带返回值的线程池示例

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(
                3,
                6,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(50),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

        List<Future<String>> futures = new ArrayList<>();

        for (int i = 1; i <= 5; i++) {
            int taskId = i;
            futures.add(executorService.submit(() -> {
                Thread.sleep(1000);
                return "任务" + taskId + "执行完成";
            }));
        }

        for (Future<String> future : futures) {
            try {
                System.out.println(future.get());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e) {
                System.out.println("任务执行异常:" + e.getCause().getMessage());
            }
        }

        executorService.shutdown();
    }
}

这个例子里 submit() 更合适,因为需要拿到结果。

但在批量任务场景里,future.get() 如果一个个阻塞等待,吞吐并不一定好。 更进一步通常会用:

  • CompletionService
  • CompletableFuture
  • 并行编排框架

线程池只是底层执行器,不等于完整的异步编排方案。


线程池关闭方式

线程池用完不关,是另一个常见问题。

1. shutdown()

平滑关闭。

  • 不再接收新任务
  • 会继续执行队列中的已提交任务
executorService.shutdown();

2. shutdownNow()

尝试立刻关闭。

  • 不再接收新任务
  • 尝试中断正在执行的线程
  • 返回等待中的任务列表
List<Runnable> notExecutedTasks = executorService.shutdownNow();

但要注意,shutdownNow() 只是发出中断信号,不是保证立刻停掉。 如果任务代码没有正确处理中断,线程可能还是停不下来。

3. 标准关闭写法

executorService.shutdown();
try {
    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
        executorService.shutdownNow();
    }
} catch (InterruptedException e) {
    executorService.shutdownNow();
    Thread.currentThread().interrupt();
}

这类写法更完整,也更适合生产环境。


使用线程池时最常见的坑

1. 一个项目里只有一个大线程池

这是很多系统初期常见的设计。

看起来省事,实际上问题很大:

  • 订单异步任务堵住了消息消费任务
  • 文件导出把通知发送拖慢
  • 一个慢任务把整个异步系统都拖住

更合理的方式是按业务隔离线程池,至少把高优先级和低优先级任务拆开。


2. 队列设成无界,表面稳定,实际危险

无界队列的问题不是马上报错,而是慢慢积压。

短时间内你会觉得系统“挺稳定”,因为没有拒绝、没有异常。 但实际上请求延迟在拉长,内存占用在上涨,最终出问题时往往已经不是一个简单的参数调整能解决的。


3. 线程池大小拍脑袋设置

设置成 200、500、1000,并不代表处理能力更强。

如果机器只有 4 核,任务又是 CPU 密集型,开 500 个线程通常只会更糟。

线程池参数不是越大越保险,而是越接近业务特征越有效。


4. 异步任务里异常没人处理

尤其是 submit()

executorService.submit(() -> {
    int x = 1 / 0;
    return "ok";
});

如果不上报异常、不调用 get()、不记录日志,这类失败很容易被忽略。

建议至少做到其中一项:

  • Future 统一收集异常
  • 在线程工厂中设置异常处理器
  • 任务内部做好日志与告警
  • 使用更适合异步编排的工具做统一异常处理

5. 忽略线程池监控

线程池不是配完就结束了。

应该至少监控这些指标:

  • 当前线程池大小
  • 活跃线程数
  • 队列长度
  • 已完成任务数
  • 拒绝任务次数
  • 任务平均耗时 / 最大耗时

没有监控,线程池问题通常只能等用户投诉才暴露。


实际项目中的建议

1. 优先使用 ThreadPoolExecutor 显式创建线程池

不要为了省几行代码,把最关键的容量控制交给默认实现。

2. 使用有界队列

让系统容量边界可控,不要无限堆积。

3. 线程池按业务隔离

不同业务的任务类型、优先级、耗时差异都可能很大,混在一个池里通常不是好主意。

4. 给线程命名

例如:

  • order-pool-1
  • mail-pool-2
  • export-pool-3

排查日志、线程 dump、监控面板时会非常直观。

5. 明确拒绝策略

不能默认“先跑着看”。

你需要提前想清楚: 任务满了以后,是失败、降级、阻塞调用方,还是允许丢弃。

6. 对中断做正确处理

线程池关闭、任务取消时都依赖中断机制。 捕获 InterruptedException 后不要直接吞掉,通常应该这样写:

catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

否则中断语义就断了。


一个更接近业务场景的例子

下面模拟“下单成功后异步发送短信”的场景:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class OrderService {

    private static final ExecutorService SMS_EXECUTOR = new ThreadPoolExecutor(
            2,
            4,
            30L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(20),
            new ThreadFactory() {
                private final AtomicInteger counter = new AtomicInteger(1);

                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("sms-pool-" + counter.getAndIncrement());
                    return t;
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public void createOrder(String orderNo) {
        System.out.println("订单创建成功:" + orderNo);

        SMS_EXECUTOR.execute(() -> sendSms(orderNo));
    }

    private void sendSms(String orderNo) {
        try {
            System.out.println(Thread.currentThread().getName() + " 发送短信,订单号:" + orderNo);
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("短信发送任务被中断,订单号:" + orderNo);
        } catch (Exception e) {
            System.out.println("短信发送失败,订单号:" + orderNo + ",原因:" + e.getMessage());
        }
    }

    public static void main(String[] args) {
        OrderService orderService = new OrderService();
        for (int i = 1; i <= 10; i++) {
            orderService.createOrder("ORDER_" + i);
        }

        SMS_EXECUTOR.shutdown();
    }
}

这里有几个值得注意的点:

  • 发短信这种逻辑适合异步化,但不应该和订单主流程共用线程池
  • 使用 CallerRunsPolicy,在流量高峰时可以自然给调用方施加反压
  • 任务内部要处理异常,不能指望线程池替你兜底
  • 队列容量是明确可控的,不会无限堆任务

总结

Java 线程池真正要解决的,不是“怎么异步执行一段代码”,而是怎么在有限资源下稳定地执行大量任务

写线程池代码时,重点不要只盯着 API,而要盯着这几个问题:

  • 最大并发是多少
  • 队列能积压多少
  • 满了之后怎么办
  • 任务异常怎么处理
  • 不同业务是否需要隔离
  • 运行状态有没有监控

线程池用得好,是系统的缓冲层。 线程池用不好,也可能成为最难定位的问题源头之一。

很多线上故障并不是因为不会写线程池,而是因为把它当成了一个“默认安全”的工具。它从来不是。

正文到此结束
评论插件初始化中...
Loading...