0

0

Java并行处理大型列表:使用CompletableFuture提升性能

霞舞

霞舞

发布时间:2025-07-28 15:02:01

|

743人浏览过

|

来源于php中文网

原创

java并行处理大型列表:使用completablefuture提升性能

本文旨在解决在Java中使用CompletableFuture进行并行处理时常见的性能陷阱。许多开发者尝试通过在流式操作中直接调用CompletableFuture::join来并行化任务,但这往往导致任务实际串行执行。本教程将详细解释这一现象,并提供一种正确的、高效的并行处理策略,通过分离异步任务的创建与结果的聚合,结合CompletableFuture.allOf实现真正的并行计算,最终将分散的结果合并成一个单一的列表。

理解并行处理的常见误区

在处理大量数据时,将耗时操作并行化是提升性能的有效手段。Java 8引入的CompletableFuture为异步编程提供了强大的支持。然而,不恰当的使用方式可能导致预期的并行效果无法实现。

考虑以下场景:有一个包含大量数据(例如50,000条记录)的列表,需要对每个列表项执行一个耗时操作,并将结果映射到Java对象,最终写入CSV文件。如果采用顺序处理,例如:

list.stream()
    .map(listItem -> service.methodA(listItem).map(result -> mapToBean(result, listItem)))
    .flatMap(Optional::stream)
    .collect(Collectors.toList());

当数据量较大时,这种方式可能非常慢,例如处理2,000条数据就需要4小时。为了加速,开发者可能会尝试使用CompletableFuture进行并行化,常见的错误尝试如下:

ExecutorService service = Executors.newFixedThreadPool(noOfCores - 1);
Lists.partition(list, 500).stream() // 将大列表分成小块
    .map(item -> CompletableFuture.supplyAsync(() -> executeListPart(item), service)) // 提交异步任务
    .map(CompletableFuture::join) // 立即等待每个任务完成
    .flatMap(List::stream)
    .collect(Collectors.toList());

尽管代码中使用了CompletableFuture.supplyAsync将任务提交到线程池,但紧随其后的.map(CompletableFuture::join)操作是导致性能问题的关键。CompletableFuture::join是一个阻塞操作,它会暂停当前流的执行,直到对应的CompletableFuture完成并返回结果。这意味着,尽管每个任务可能在不同的线程中执行,但流本身是按顺序处理每个CompletableFuture的,一个任务完成后,流才会处理下一个任务。这实际上将并行执行变成了顺序等待,从而失去了并行化的优势。

立即学习Java免费学习笔记(深入)”;

正确的并行处理策略

要实现真正的并行,核心思想是:先创建并启动所有异步任务,然后统一等待它们完成并收集结果。 避免在创建任务的同一流式管道中立即阻塞等待。

以下是实现这一策略的步骤和示例代码:

AITDK
AITDK

免费AI SEO工具,SEO的AI生成器

下载
  1. 创建并启动所有异步任务: 遍历数据分片,为每个分片创建一个CompletableFuture,并将其提交到ExecutorService中执行。将这些CompletableFuture实例收集到一个列表中。
  2. 统一等待所有任务完成: 使用CompletableFuture.allOf()方法创建一个新的CompletableFuture,它将在所有已提交的任务都完成时才完成。
  3. 聚合所有任务的结果: 当CompletableFuture.allOf()完成时,表明所有子任务都已完成,此时可以安全地对之前收集的CompletableFuture列表调用join()方法,并对结果进行扁平化和收集。
import com.google.common.collect.Lists; // 假设使用Guava的Lists.partition
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class ParallelProcessingExample {

    // 假设这是您的业务逻辑方法,处理列表的一个分片并返回结果列表
    // executeListPart(List<MyItem> partition) 应该返回 List<MyProcessedBean>
    private List<MyProcessedBean> executeListPart(List<MyItem> partition) {
        // 模拟耗时操作
        try {
            Thread.sleep(100); // 假设每个分片处理100ms
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        // 实际业务逻辑:处理partition中的每个MyItem,并生成MyProcessedBean
        return partition.stream()
                .map(item -> new MyProcessedBean("Processed_" + item.getId())) // 示例转换
                .collect(Collectors.toList());
    }

    public List<MyProcessedBean> processLargeListInParallel(List<MyItem> largeList, int partitionSize, int threadPoolSize) {
        // 1. 创建并配置线程池
        // 建议线程池大小根据CPU核心数和任务类型(IO密集型/CPU密集型)调整
        ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);

        try {
            // 2. 将大列表分成小块,并为每个小块创建异步任务
            // CompletableFuture<List<MyProcessedBean>> 表示每个任务会返回一个MyProcessedBean列表
            List<CompletableFuture<List<MyProcessedBean>>> futures = Lists.partition(largeList, partitionSize).stream()
                    .map(partition -> CompletableFuture.supplyAsync(() -> executeListPart(partition), executorService))
                    .collect(Collectors.toList());

            // 3. 创建一个CompletableFuture,等待所有子任务完成
            CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

            // 4. 当所有子任务完成后,聚合结果
            List<MyProcessedBean> finalResults = allOf.thenApply(v ->
                    futures.stream()
                            .map(CompletableFuture::join) // 此时所有future都已完成,join是非阻塞的
                            .flatMap(List::stream)       // 扁平化List<List<MyProcessedBean>>为List<MyProcessedBean>
                            .collect(Collectors.toList())
            ).join(); // 阻塞等待最终结果的聚合

            return finalResults;

        } finally {
            // 5. 关闭线程池,释放资源
            executorService.shutdown();
            // 可选:等待线程池终止,确保所有任务都已完成
            // try {
            //     if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
            //         executorService.shutdownNow();
            //     }
            // } catch (InterruptedException ex) {
            //     executorService.shutdownNow();
            //     Thread.currentThread().interrupt();
            // }
        }
    }

    // 示例数据类
    static class MyItem {
        private String id;
        public MyItem(String id) { this.id = id; }
        public String getId() { return id; }
    }

    static class MyProcessedBean {
        private String processedId;
        public MyProcessedBean(String processedId) { this.processedId = processedId; }
        public String getProcessedId() { return processedId; }
        @Override
        public String toString() { return "MyProcessedBean{" + "processedId='" + processedId + '\'' + '}'; }
    }

    public static void main(String[] args) {
        ParallelProcessingExample app = new ParallelProcessingExample();

        // 构造一个大型列表
        List<MyItem> largeList = new java.util.ArrayList<>();
        for (int i = 0; i < 5000; i++) {
            largeList.add(new MyItem("item_" + i));
        }

        long startTime = System.currentTimeMillis();
        List<MyProcessedBean> results = app.processLargeListInParallel(largeList, 500, Runtime.getRuntime().availableProcessors() - 1);
        long endTime = System.currentTimeMillis();

        System.out.println("Processed " + results.size() + " items in " + (endTime - startTime) + " ms");
        // System.out.println("First 10 results: " + results.subList(0, Math.min(10, results.size())));
    }
}

注意事项与最佳实践

  1. 线程池管理:

    • ExecutorService是管理线程的关键。对于CPU密集型任务,线程池大小通常设置为Runtime.getRuntime().availableProcessors()或noOfCores - 1。对于IO密集型任务,可以适当增加线程池大小,因为线程在等待IO时不会占用CPU。
    • 在任务完成后,务必调用executorService.shutdown()来优雅地关闭线程池,释放资源。如果线程池是应用程序生命周期内的单例,则可以在应用程序关闭时统一管理。
    • awaitTermination()可以用于等待所有已提交的任务完成,但对于一次性任务聚合,CompletableFuture.allOf().join()通常就足够了。
  2. 列表分片:

    • 将大列表分片(例如使用Guava的Lists.partition)是一个很好的策略。每个分片的大小需要根据任务的粒度和系统资源进行调整。过小的分片会增加任务调度开销,过大的分片可能导致部分线程长时间空闲。
    • 确保executeListPart方法是线程安全的,并且不依赖于共享的可变状态,或者对共享状态进行适当的同步。
  3. 错误处理:

    • CompletableFuture提供了丰富的错误处理机制,例如exceptionally()、handle()、whenComplete()等。在生产环境中,应为异步任务添加健壮的错误处理逻辑,以防止单个任务失败导致整个流程中断。
    • 当使用CompletableFuture.allOf()时,如果任何一个子CompletableFuture异常完成,那么allOf也会异常完成。你可以通过.exceptionally()或.handle()来捕获和处理这些异常。
  4. 结果聚合:

    • CompletableFuture.allOf()返回的是CompletableFuture<Void>,因为它本身不关心子任务的结果,只关心它们是否完成。
    • 要获取所有子任务的结果,需要像示例中那样,在allOf完成后,再次遍历原始的futures列表,并调用join()(此时是非阻塞的),然后进行结果的flatMap和collect。

总结

通过将CompletableFuture的创建和结果的join操作分离,我们能够充分利用多核CPU的优势,实现真正意义上的并行处理。这种模式是处理大量数据或执行耗时操作时提升Java应用程序性能的关键。理解CompletableFuture的非阻塞特性以及如何正确地聚合结果,是编写高效、并发代码的重要一步。

相关文章

数码产品性能查询
数码产品性能查询

该软件包括了市面上所有手机CPU,手机跑分情况,电脑CPU,电脑产品信息等等,方便需要大家查阅数码产品最新情况,了解产品特性,能够进行对比选择最具性价比的商品。

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
guava包作用
guava包作用

guava是一个java库,增强了java标准库,提供更有效率和易于使用的集合、实用程序、缓存和并发工具。想了解更多guava的相关内容,可以阅读本专题下面的文章。

271

2024.05.29

javascriptvoid(o)怎么解决
javascriptvoid(o)怎么解决

javascriptvoid(o)的解决办法:1、检查语法错误;2、确保正确的执行环境;3、检查其他代码的冲突;4、使用事件委托;5、使用其他绑定方式;6、检查外部资源等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

186

2023.11.23

java中void的含义
java中void的含义

本专题整合了Java中void的相关内容,阅读专题下面的文章了解更多详细内容。

134

2025.11.27

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

765

2023.08.10

golang map内存释放
golang map内存释放

本专题整合了golang map内存相关教程,阅读专题下面的文章了解更多相关内容。

77

2025.09.05

golang map相关教程
golang map相关教程

本专题整合了golang map相关教程,阅读专题下面的文章了解更多详细内容。

40

2025.11.16

golang map原理
golang map原理

本专题整合了golang map相关内容,阅读专题下面的文章了解更多详细内容。

67

2025.11.17

java判断map相关教程
java判断map相关教程

本专题整合了java判断map相关教程,阅读专题下面的文章了解更多详细内容。

47

2025.11.27

TypeScript类型系统进阶与大型前端项目实践
TypeScript类型系统进阶与大型前端项目实践

本专题围绕 TypeScript 在大型前端项目中的应用展开,深入讲解类型系统设计与工程化开发方法。内容包括泛型与高级类型、类型推断机制、声明文件编写、模块化结构设计以及代码规范管理。通过真实项目案例分析,帮助开发者构建类型安全、结构清晰、易维护的前端工程体系,提高团队协作效率与代码质量。

26

2026.03.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
10分钟--Midjourney创作自己的漫画
10分钟--Midjourney创作自己的漫画

共1课时 | 0.1万人学习

Midjourney 关键词系列整合
Midjourney 关键词系列整合

共13课时 | 0.9万人学习

AI绘画教程
AI绘画教程

共2课时 | 0.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号