forkjoinpool基于工作窃取算法,通过recursivetask和recursiveaction实现分治计算,适用于拆分任务的计算密集型场景,合理设置阈值可提升并行性能。

在Java中,ForkJoinPool 是用于高效执行分治算法的线程池,特别适合可以拆分成多个小任务的计算密集型操作。它基于“工作窃取”(work-stealing)算法,空闲线程可以从其他线程的任务队列中“窃取”任务执行,提高CPU利用率。
1. ForkJoinPool 的基本原理
ForkJoinPool 配合 ForkJoinTask 使用,常见的子类有 RecursiveAction(无返回值)和 RecursiveTask(有返回值)。核心思想是:
- 分解(Fork):将大任务拆成多个子任务,并提交到池中异步执行。
- 合并(Join):等待子任务完成,并合并它们的结果。
2. 使用 RecursiveTask 实现有返回值的任务
假设我们要计算一个数组的和,可以通过分治方式实现:
立即学习“Java免费学习笔记(深入)”;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
<p>public class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 1000; // 拆分阈值
private long[] array;
private int start, end;</p><pre class='brush:java;toolbar:false;'>public SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 小任务直接计算
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// 拆分为两个子任务
int mid = (start + end) / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
left.fork(); // 异步执行左任务
right.fork(); // 异步执行右任务
return left.join() + right.join(); // 合并结果
}
}
public static void main(String[] args) {
long[] data = new long[10000];
for (int i = 0; i < data.length; i++) {
data[i] = i + 1;
}
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(data, 0, data.length);
long result = pool.invoke(task); // 执行任务
System.out.println("总和:" + result);
pool.shutdown();
}}
在现实生活中的购物过程,购物者需要先到商场,找到指定的产品柜台下,查看产品实体以及标价信息,如果产品合适,就将该产品放到购物车中,到收款处付款结算。电子商务网站通过虚拟网页的形式在计算机上摸拟了整个过程,首先电子商务设计人员将产品信息分类显示在网页上,用户查看网页上的产品信息,当用户看到了中意的产品后,可以将该产品添加到购物车,最后使用网上支付工具进行结算,而货物将由公司通过快递等方式发送给购物者
3. 使用 RecursiveAction 处理无返回值任务
如果任务不需要返回结果,比如打印数组元素,可以用 RecursiveAction:
立即学习“Java免费学习笔记(深入)”;
import java.util.concurrent.RecursiveAction;
<p>public class PrintTask extends RecursiveAction {
private static final int THRESHOLD = 50;
private int[] array;
private int start, end;</p><pre class='brush:java;toolbar:false;'>public PrintTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
for (int i = start; i < end; i++) {
System.out.print(array[i] + " ");
}
} else {
int mid = (start + end) / 2;
PrintTask left = new PrintTask(array, start, mid);
PrintTask right = new PrintTask(array, mid, end);
left.fork();
right.fork();
left.join();
right.join();
}
}}
4. 注意事项与最佳实践
使用 ForkJoinPool 时要注意以下几点:
- 只适用于可拆分的计算任务,不适合IO密集型或阻塞操作。
- 拆分粒度要合理,太细会导致调度开销大,太粗则无法充分利用多核。
- 建议使用 pool.invoke(task) 启动任务,它由工作线程执行,支持工作窃取。
- 通常不需要手动 shutdown,但在长时间运行的应用中应妥善管理生命周期。
基本上就这些。掌握 fork 和 join 的时机,就能有效利用 ForkJoinPool 提升并行计算性能。









