应按批次(如500条)而非单条提交任务,避免线程池队列堆积和oom;每批独立try-catch+completablefuture隔离异常;db写入时每批串行、禁用共享事务,配合连接池与数据库上限匹配。

为什么不能直接用 ExecutorService 提交上万个小任务
直接把 10 万条数据拆成 10 万个 Runnable 丢给 Executors.newFixedThreadPool(10),看似合理,实则危险:线程池队列会堆积大量待执行任务,内存暴涨,GC 压力大,甚至 OutOfMemoryError: GC overhead limit exceeded。这不是并发问题,是资源误配。
真正可行的思路是「控制并发粒度」——不是每条记录一个任务,而是每批(如 500 条)为一个任务单元。
- 批大小建议在
100–2000之间,具体看单条处理耗时和内存占用;IO 密集型可稍大,CPU 密集型宜小 - 用
Lists.partition(list, batchSize)(来自 Guava)或手写循环切分,避免中间生成大量子列表对象 - 提交的是
Runnable或Callable<list>></list>,不是单个元素
如何安全地切分并提交批次任务(含异常隔离)
切分本身不难,但关键在于:某一批失败,不能导致整个流程中断;也不能让异常吞没,丢失上下文。
推荐用 CompletableFuture + 批次封装,每个批次独立 try-catch:
立即学习“Java免费学习笔记(深入)”;
List<List<Data>> batches = Lists.partition(dataList, 500);
List<CompletableFuture<List<Result>>> futures = new ArrayList<>();
for (List<Data> batch : batches) {
futures.add(CompletableFuture.supplyAsync(() -> {
try {
return processBatch(batch); // 自定义处理逻辑
} catch (Exception e) {
log.error("batch failed, size={}", batch.size(), e);
throw new RuntimeException("batch execution failed", e); // 不吞异常
}
}, executor));
}
// 等待全部完成(或超时),聚合结果
List<Result> allResults = futures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList());
- 不要用
future.get(),它会阻塞并抛出ExecutionException,包装层级深;join()更简洁,异常原样抛出 - 如果某批失败,
join()会直接抛异常,可在外层捕获并决定是否继续(比如跳过该批、记录失败 ID、重试等) - 务必传入自定义
executor,别用默认 ForkJoinPool,避免干扰主线程池
数据库写入场景下怎么避免连接/事务爆掉
多线程并发写 DB 是高频雷区:连接池耗尽、死锁、唯一约束冲突、事务过长回滚慢。
核心原则:每个批次内部串行执行 DB 操作,批次之间可并行;禁止单批次开多个事务。
- 每个
processBatch()方法内,复用同一个Connection或JdbcTemplate,用批量 API:jdbcTemplate.batchUpdate(sql, batchArgs) - Spring 环境下,确保该方法**不在 @Transactional 注解的方法内部被调用**,否则所有批次共享同一事务,极易超时或锁表
- 若必须事务控制,改为每个批次单独声明事务:
@Transactional(propagation = Propagation.REQUIRES_NEW),但注意传播行为对性能的影响 - PostgreSQL/MySQL 要留意
max_connections和连接池maxActive,线程数 × 单批次最大连接数 ≤ 数据库允许上限
怎么监控进度和应对中途失败
跑 2 小时的大任务,没人想黑屏等结果。进度不可见 + 失败无恢复点 = 运维噩梦。
最轻量做法:用 AtomicInteger 记录已完成批次,配合日志输出:
AtomicInteger completed = new AtomicInteger(0);
for (List<Data> batch : batches) {
futures.add(CompletableFuture.runAsync(() -> {
processBatch(batch);
int curr = completed.incrementAndGet();
if (curr % 10 == 0) { // 每 10 批打一次日志
log.info("Progress: {}/{}", curr, batches.size());
}
}, executor));
}
- 别用
System.out.println,它不是线程安全的,且无法被日志系统捕获 - 如果需要断点续跑,得把「已处理批次的起始索引」持久化到 DB 或文件,重启时跳过已成功部分;注意幂等性设计,避免重复写入
- 慎用
CountDownLatch做进度同步——它只适合等待结束,不反映中间状态;真要实时进度,考虑暴露一个 HTTP 接口返回completed.get()
批次划分和异常边界对齐,比线程数调优重要得多;很多人卡在“为什么加了 20 个线程反而更慢”,其实问题从来不在并发数,而在任务粒度和资源争用没理清。










