
recursiveaction和recursivetask是java fork/join框架的核心组件,它们专为在forkjoinpool中运行而设计。正如其文档所述,这些抽象基类旨在与forkjoinpool协同工作。forkjoinpool虽然允许通过自定义线程工厂进行配置,但它要求创建的是forkjoinworkerthread实例,而不是标准的thread实例。forkjoinworkerthread是thread的子类,这意味着它们本质上是平台线程。
虚拟线程(Virtual Threads)则由Thread.Builder.OfVirtual或Thread::startVirtualThread等机制创建,它们并不继承自ForkJoinWorkerThread。因此,RecursiveAction和RecursiveTask无法直接利用虚拟线程的轻量级特性,因为它们被设计为在专门的ForkJoinWorkerThread上运行,而这些工作线程是平台线程。试图将RecursiveAction或RecursiveTask与虚拟线程结合使用,从根本上来说是行不通的。
RecursiveAction和RecursiveTask提供的核心价值在于,它们协助开发者将大型任务分解为小任务,并在有限的平台线程池中进行高效的负载均衡。当处理平台线程时,由于资源有限,这些特性至关重要。然而,当引入虚拟线程时,情况发生了根本性变化。虚拟线程是廉价且数量庞大的,其创建和管理成本极低。这意味着传统Fork/Join框架中对线程池的精细管理和工作窃取算法的必要性大大降低。
在虚拟线程环境下,开发者无需担心线程资源的稀缺性,可以将更多精力放在任务本身的分解逻辑上。每个子任务都可以轻松地在一个独立的虚拟线程上执行,而无需复杂的池管理机制。
虽然不能直接使用RecursiveAction,但我们可以利用CompletableFuture和虚拟线程的组合来实现类似的递归任务分解。CompletableFuture提供了一种声明式的方式来组合异步计算,而Thread::startVirtualThread则允许我们轻松地在虚拟线程上启动任务。
立即学习“Java免费学习笔记(深入)”;
以下是一个简单的示例,展示了如何使用CompletableFuture在虚拟线程中实现一个递归任务:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public record PseudoTask(int from, int to) {
public static CompletableFuture<Void> run(int from, int to) {
// 使用Thread::startVirtualThread在虚拟线程中执行任务
return CompletableFuture.runAsync(
new PseudoTask(from, to)::compute, Thread::startVirtualThread);
}
protected void compute() {
int mid = (from + to) >>> 1;
if (mid == from) {
// 模拟实际的阻塞操作,例如耗时计算或I/O
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
} else {
// 递归地创建并运行子任务
CompletableFuture<Void> sub1 = run(from, mid);
CompletableFuture<Void> sub2 = run(mid, to);
// 等待子任务完成。注意:join()会阻塞当前虚拟线程
sub1.join();
sub2.join();
}
}
public static void main(String[] args) {
System.out.println("Starting recursive task with virtual threads...");
long startTime = System.currentTimeMillis();
PseudoTask.run(0, 1_000).join(); // 启动并等待根任务完成
long endTime = System.currentTimeMillis();
System.out.println("Recursive task completed in " + (endTime - startTime) + " ms");
}
}在这个例子中,PseudoTask将一个范围分解为两半,并在新的虚拟线程上异步执行。虽然这种方法简单有效,但直接使用join()可能会导致当前虚拟线程阻塞,直到子任务完成。对于非常大的任务范围,这可能导致创建过多的虚拟线程。
为了减少虚拟线程的创建数量并优化性能,可以采用类似Fork/Join框架中的“工作窃取”思想,即当前线程处理一部分任务,而将另一部分提交给其他线程。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public record OptimizedPseudoTask(int from, int to) {
public static CompletableFuture<Void> run(int from, int to) {
return CompletableFuture.runAsync(
new OptimizedPseudoTask(from, to)::compute, Thread::startVirtualThread);
}
protected void compute() {
CompletableFuture<Void> pendingFutures = null;
// 循环处理一部分任务,另一部分提交给新线程
for (int currentFrom = this.from; ; currentFrom = (currentFrom + to) >>> 1) {
int mid = (currentFrom + to) >>> 1;
if (mid == currentFrom) {
// 模拟实际的阻塞操作
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
break; // 达到基本任务单元
} else {
// 将一半任务提交给新的虚拟线程
CompletableFuture<Void> subTaskFuture = run(currentFrom, mid);
if (pendingFutures == null) {
pendingFutures = subTaskFuture;
} else {
pendingFutures = CompletableFuture.allOf(pendingFutures, subTaskFuture);
}
// 当前虚拟线程继续处理另一半任务(通过循环迭代)
}
}
// 等待所有提交的子任务完成
if (pendingFutures != null) {
pendingFutures.join();
}
}
public static void main(String[] args) {
System.out.println("Starting optimized recursive task with virtual threads...");
long startTime = System.currentTimeMillis();
OptimizedPseudoTask.run(0, 1_000_000).join(); // 启动并等待根任务完成
long endTime = System.currentTimeMillis();
System.out.println("Optimized recursive task completed in " + (endTime - startTime) + " ms");
}
}这种优化后的方法在处理大范围任务时,例如OptimizedPseudoTask.run(0, 1_000_000).join();,可以显著减少创建的虚拟线程数量,因为它让当前虚拟线程承担了一部分工作,而不是为每个子任务都创建一个新的虚拟线程。这表明即使在虚拟线程环境中,对任务分解策略的精细控制仍然能够带来性能提升。
Java的孵化器模块中引入了StructuredTaskScope,它旨在提供更高级别的结构化并发支持。StructuredTaskScope允许开发者在一个明确定义的范围内启动多个子任务,并在父任务完成之前等待所有子任务的完成,从而更好地管理并发任务的生命周期和错误处理。
以下是一个使用StructuredTaskScope实现递归任务的示例:
import jdk.incubator.concurrent.StructuredTaskScope; // 注意:这是孵化器API
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public record StructuredPseudoTask(int from, int to) {
public static void run(int from, int to) {
// 使用ShutdownOnFailure策略,任何子任务失败都会导致整个范围关闭
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
new StructuredPseudoTask(from, to).compute(scope);
scope.join(); // 等待所有在scope内fork的子任务完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
throw new IllegalStateException("Task interrupted", e);
}
}
protected Void compute(StructuredTaskScope<Object> scope) {
for (int currentFrom = this.from; ; currentFrom = (currentFrom + to) >>> 1) {
int mid = (currentFrom + to) >>> 1;
if (mid == currentFrom) {
// 模拟实际的阻塞操作
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
break; // 达到以上就是Java虚拟线程下实现递归任务:告别Fork/Join的替代方案与实践的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号