ForkJoinPool适用于分治算法,基于工作窃取机制提升CPU利用率。1. 核心为ForkJoinTask,RecursiveTask有返回值,RecursiveAction无返回值,通过fork()异步执行,join()等待结果。2. 实现并行计算需拆分任务,如SumTask在数据量小于阈值时直接求和,否则分为左右子任务,左任务fork()提交,右任务compute()执行,再join()合并结果。3. 关键点:合理设置THRESHOLD避免过度拆分;避免阻塞操作;推荐使用ForkJoinPool.commonPool()减少资源开销;注意异常在join()时抛出。4. 可简化为commonPool.invoke()调用。适用于归并排序、矩阵运算等可递归拆解的计算密集型任务。

在Java中,ForkJoinPool 是专为分治(divide-and-conquer)算法设计的线程池,适合将大任务拆成小任务并行执行,最后合并结果。它基于“工作窃取”(work-stealing)算法,空闲线程可以从其他线程的任务队列中“窃取”任务,提高CPU利用率。
1. ForkJoinPool 核心概念
ForkJoinPool 配合 ForkJoinTask 使用,常用子类有:
- RecursiveTask:有返回值的任务,适用于需要合并结果的场景。
- RecursiveAction:无返回值的任务。
任务通过 fork() 提交异步执行,通过 join() 等待结果。
2. 实现分治并行计算的步骤
以计算数组元素和为例,展示如何使用 RecursiveTask 实现并行求和:
立即学习“Java免费学习笔记(深入)”;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class SumTask extends RecursiveTask<Long> {
private final long[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 1000; // 拆分阈值
public SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// 如果任务足够小,直接计算
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// 否则拆分为两个子任务
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
// 异步执行左任务
leftTask.fork();
// 当前线程处理右任务
long rightResult = rightTask.compute();
// 等待左任务结果并合并
long leftResult = leftTask.join();
return leftResult + rightResult;
}
public static void main(String[] args) {
long[] data = new long[100_000];
for (int i = 0; i < data.length; i++) {
data[i] = i + 1;
}
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(data, 0, data.length);
long result = pool.invoke(task);
System.out.println("Sum: " + result);
pool.shutdown();
}
}3. 关键点说明与建议
使用 ForkJoinPool 时注意以下几点:
-
合理设置阈值:任务拆分不能太细,否则调度开销会超过并行收益。根据数据规模和任务复杂度调整
THRESHOLD。 - 避免阻塞操作:ForkJoinPool 的工作线程数量通常等于CPU核心数,阻塞会导致性能下降。
-
优先使用 commonPool:多数情况下可直接使用
ForkJoinPool.commonPool(),避免创建过多线程池。 -
异常处理:任务中抛出的异常会被封装,调用
join()或get()时会重新抛出。
4. 使用 commonPool 的简化方式
可以直接使用全局的公共池:
```java long result = ForkJoinPool.commonPool().invoke(new SumTask(data, 0, data.length)); ```这种方式更简洁,适合大多数场景。
基本上就这些。ForkJoinPool 特别适合递归型、可拆解的任务,比如归并排序、矩阵运算、树遍历等。掌握好拆分策略和阈值控制,就能有效提升计算密集型任务的性能。










