0

0

Java Stream API:重构批量数据处理以避免共享可变性

DDD

DDD

发布时间:2025-10-01 11:07:36

|

798人浏览过

|

来源于php中文网

原创

java stream api:重构批量数据处理以避免共享可变性

本文探讨了在Java中处理批量数据库查询时如何通过重构代码来避免共享可变性。通过利用Java Stream API的map、flatMap和collect操作,可以消除对外部集合的副作用,从而实现更纯粹、更易于维护和并发友好的数据处理模式。

1. 批量数据处理中的共享可变性问题

在企业级应用中,从数据库批量获取数据是常见需求。然而,数据库通常对单次查询接受的参数数量有限制(例如,SQL IN 子句的参数数量)。因此,我们经常需要将一个大的键列表分割成多个小批次,然后对每个批次执行查询。

原始代码示例展示了这种场景,其中一个包含5000个数字的列表被分割成多个大小为500的子列表,然后对每个子列表执行数据库查询。

AtomicInteger counter = new AtomicInteger();
List catList = new ArrayList<>(); // 外部可变列表
List dogList = new ArrayList<>(); // 外部可变列表
List numbers = Stream.iterate(1, e -> e + 1)
    .limit(5000)
    .collect(Collectors.toList());

Collection> partitionedListOfNumbers = numbers.stream()
    .collect(Collectors.groupingBy(num -> counter.getAndIncrement() / 500))
    .values(); // 将列表分割成大小为500的子列表

partitionedListOfNumbers.stream()
    .forEach(list -> {
        List interimCatList = catRepo.fetchCats(list); // 从数据库获取Cat
        catList.addAll(interimCatList); // 修改外部 catList
        List interimDogList = dogRepo.fetchDogs(list); // 从数据库获取Dog
        dogList.addAll(interimDogList); // 修改外部 dogList
    });

上述代码的核心问题在于其使用了forEach操作,并在其中通过catList.addAll(interimCatList)和dogList.addAll(interimDogList)直接修改了外部的catList和dogList。这种模式被称为“共享可变性”(Shared Mutability)。

共享可变性带来了多方面的问题:

立即学习Java免费学习笔记(深入)”;

  • 线程安全隐患: 在多线程环境下,如果多个线程同时访问并修改catList或dogList,可能导致数据不一致或运行时错误。
  • 可读性和可维护性降低: 外部状态的修改使得代码难以理解,因为一个方法的行为不仅取决于其输入,还取决于其外部环境的状态。
  • 不符合函数式编程范式: 函数式编程鼓励纯函数,即没有副作用的函数。修改外部状态是典型的副作用。

为了构建更健壮、更易于测试和并发友好的代码,我们应该尽量避免共享可变性。

2. 利用 Java Stream API 实现不可变数据处理

Java 8 引入的 Stream API 提供了一种声明式、函数式的方式来处理集合数据。通过利用 map、flatMap 和 collect 等操作,我们可以在不修改外部状态的情况下转换和聚合数据。

Type
Type

生成草稿,转换文本,获得写作帮助-等等。

下载
  • map: 将流中的每个元素转换成另一个元素,生成一个新的流。例如,将一个批次的键列表转换为一个List
  • flatMap: 将流中的每个元素转换成一个流,然后将这些流扁平化为一个单一的流。这在处理“流的流”时非常有用,例如将List>扁平化为List
  • collect: 将流中的元素聚合成一个结果容器(例如List、Set或Map)。这是创建最终不可变结果的关键。

3. 重构方案详解与示例代码

为了消除共享可变性,我们将重构代码,使其不再使用forEach来修改外部列表,而是利用Stream的管道操作来生成新的结果列表。

核心思路:

  1. 分批处理: 保持原有的分批逻辑,将大的键列表分割成多个子列表。
  2. 映射批次到结果: 对每个子列表,执行数据库查询,将其映射成一个结果列表(例如List)。
  3. 扁平化结果列表: 将所有批次的结果列表(List>)扁平化成一个单一的结果流。
  4. 收集最终结果: 将扁平化后的结果流收集到一个新的List中。

以下是重构后的代码示例:

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

// 模拟数据库仓库接口和实体类
class Cat {
    private int id;
    private String name;
    public Cat(int id, String name) { this.id = id; this.name = name; }
    @Override public String toString() { return "Cat{id=" + id + ", name='" + name + "'}"; }
}

class Dog {
    private int id;
    private String name;
    public Dog(int id, String name) { this.id = id; this.name = name; }
    @Override public String toString() { return "Dog{id=" + id + ", name='" + name + "'}"; }
}

class CatRepository {
    public List fetchCats(List keys) {
        // 模拟数据库查询
        System.out.println("Fetching Cats for keys: " + keys.size() + " elements, first: " + keys.get(0));
        return keys.stream()
                   .map(id -> new Cat(id, "Cat_" + id))
                   .collect(Collectors.toList());
    }
}

class DogRepository {
    public List fetchDogs(List keys) {
        // 模拟数据库查询
        System.out.println("Fetching Dogs for keys: " + keys.size() + " elements, first: " + keys.get(0));
        return keys.stream()
                   .map(id -> new Dog(id, "Dog_" + id))
                   .collect(Collectors.toList());
    }
}

public class BatchProcessingRefactor {

    public static void main(String[] args) {
        CatRepository catRepo = new CatRepository();
        DogRepository dogRepo = new DogRepository();

        int totalNumbers = 5000;
        int batchSize = 500;

        // AtomicInteger 用于在 groupingBy 中生成分组键
        // 它本身是可变的,但其作用是帮助创建不可变的子集合
        AtomicInteger counter = new AtomicInteger();

        // 1. 数据分批:将 1 到 5000 的数字分割成大小为 500 的子列表
        // IntStream.rangeClosed(1, totalNumbers) 生成一个从1到totalNumbers的整数流
        // boxed() 将 IntStream 转换为 Stream
        Collection> partitionedListOfNumbers = IntStream.rangeClosed(1, totalNumbers)
            .boxed()
            .collect(Collectors.groupingBy(num -> counter.getAndIncrement() / batchSize))
            .values();

        System.out.println("Total partitions: " + partitionedListOfNumbers.size());

        // 2. 处理 Cat 数据:使用 Stream API 避免共享可变性
        // partitionedListOfNumbers.stream() 创建一个包含 List 的流
        // .map(catRepo::fetchCats) 将每个 List 映射为一个 List
        //   此时流的类型是 Stream>
        // .flatMap(List::stream) 将 Stream> 扁平化为 Stream
        //   即将所有 List 中的 Cat 对象合并到一个单一的流中
        // .collect(Collectors.toList()) 将 Stream 中的所有 Cat 对象收集到一个新的 List 中
        List catList = partitionedListOfNumbers.stream()
            .map(catRepo::fetchCats)
            .flatMap(List::stream)
            .collect(Collectors.toList());

        // 3. 处理 Dog 数据:同样的方式
        List dogList = partitionedListOfNumbers.stream()
            .map(dogRepo::fetchDogs)
            .flatMap(List::stream)
            .collect(Collectors.toList());

        System.out.println("Fetched " + catList.size() + " cats.");
        System.out.println("Fetched " + dogList.size() + " dogs.");

        // 验证部分数据
        // catList.stream().limit(5).forEach(System.out::println);
        // dogList.stream().skip(4995).forEach(System.out::println);
    }
}

4. 优势与注意事项

优势:

  • 避免共享可变性: catList 和 dogList 是通过 collect(Collectors.toList()) 操作创建的新列表,它们在创建过程中没有被外部修改,从而消除了副作用和线程安全问题。
  • 函数式编程风格: 代码更具声明性,清晰地表达了“转换”和“聚合”的意图,而不是“迭代”和“修改”。
  • 并发友好: 由于消除了共享可变状态,这种模式更容易适应并行流 (parallelStream()),从而在多核处理器上获得性能提升,而无需担心复杂的同步机制
  • 可维护性与可测试性: 减少了副作用,使得代码逻辑更纯粹,更容易进行单元测试和推理。

注意事项:

  • AtomicInteger 的使用: 在 groupingBy 操作中,AtomicInteger 被用来生成不重复的分组键。虽然 AtomicInteger 本身是可变的,但它在这里的作用是辅助流操作生成不可变的子集合,而不是用于累积最终结果,因此它不会引入共享可变性问题到最终结果列表中。
  • 代码重复的抽象: 示例中 catList 和 dogList 的获取逻辑存在相似性。在更复杂的场景中,可以考虑将这部分逻辑抽象为一个通用方法,接受一个函数作为参数来执行具体的数据库查询,从而进一步减少代码重复。
  • 错误处理: 在实际应用中,数据库查询可能会抛出异常。Stream API 提供了 try-catch 或 Optional 等机制来处理这些情况,但需要额外设计。

5. 总结

通过采用 Java Stream API 的 map、flatMap 和 collect 等操作,我们可以有效地重构批量数据处理代码,从而避免共享可变性。这种方法不仅提升了代码的线程安全性、可读性和可维护性,还使其更符合现代函数式编程的理念。在处理大量数据或构建高并发系统时,优先考虑这种不可变的数据处理模式将带来显著的优势。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
数据分析工具有哪些
数据分析工具有哪些

数据分析工具有Excel、SQL、Python、R、Tableau、Power BI、SAS、SPSS和MATLAB等。详细介绍:1、Excel,具有强大的计算和数据处理功能;2、SQL,可以进行数据查询、过滤、排序、聚合等操作;3、Python,拥有丰富的数据分析库;4、R,拥有丰富的统计分析库和图形库;5、Tableau,提供了直观易用的用户界面等等。

728

2023.10.12

SQL中distinct的用法
SQL中distinct的用法

SQL中distinct的语法是“SELECT DISTINCT column1, column2,...,FROM table_name;”。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

328

2023.10.27

SQL中months_between使用方法
SQL中months_between使用方法

在SQL中,MONTHS_BETWEEN 是一个常见的函数,用于计算两个日期之间的月份差。想了解更多SQL的相关内容,可以阅读本专题下面的文章。

350

2024.02.23

SQL出现5120错误解决方法
SQL出现5120错误解决方法

SQL Server错误5120是由于没有足够的权限来访问或操作指定的数据库或文件引起的。想了解更多sql错误的相关内容,可以阅读本专题下面的文章。

1263

2024.03.06

sql procedure语法错误解决方法
sql procedure语法错误解决方法

sql procedure语法错误解决办法:1、仔细检查错误消息;2、检查语法规则;3、检查括号和引号;4、检查变量和参数;5、检查关键字和函数;6、逐步调试;7、参考文档和示例。想了解更多语法错误的相关内容,可以阅读本专题下面的文章。

360

2024.03.06

oracle数据库运行sql方法
oracle数据库运行sql方法

运行sql步骤包括:打开sql plus工具并连接到数据库。在提示符下输入sql语句。按enter键运行该语句。查看结果,错误消息或退出sql plus。想了解更多oracle数据库的相关内容,可以阅读本专题下面的文章。

841

2024.04.07

sql中where的含义
sql中where的含义

sql中where子句用于从表中过滤数据,它基于指定条件选择特定的行。想了解更多where的相关内容,可以阅读本专题下面的文章。

581

2024.04.29

sql中删除表的语句是什么
sql中删除表的语句是什么

sql中用于删除表的语句是drop table。语法为drop table table_name;该语句将永久删除指定表的表和数据。想了解更多sql的相关内容,可以阅读本专题下面的文章。

423

2024.04.29

java入门学习合集
java入门学习合集

本专题整合了java入门学习指南、初学者项目实战、入门到精通等等内容,阅读专题下面的文章了解更多详细学习方法。

1

2026.01.29

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 3万人学习

C# 教程
C# 教程

共94课时 | 7.9万人学习

Java 教程
Java 教程

共578课时 | 53.1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号