
本教程深入探讨如何使用 java stream api 处理复杂的数据聚合需求,特别是针对多条件分组、求和以及去重计数。通过构建自定义统计模型和巧妙运用 `collectors.groupingby` 结合 `collectors.reducing`,文章展示了如何高效且准确地从嵌套集合中提取所需数据,解决按月份统计总值和独立人数的常见挑战。
1. 数据模型定义
在进行数据处理之前,首先需要明确数据结构。本教程将围绕 Person 对象进行聚合操作,并期望将结果封装到 DTO 对象中。为了代码的简洁性,我们使用 Java 14 引入的 record 类型定义数据模型。
import java.time.LocalDate;
import java.math.BigDecimal; // DTO中使用BigDecimal
// 定义事件状态枚举
enum Statement {
STATUS1, STATUS2, STATUS3, STATUS4 // 示例中可能包含更多状态
}
// Person 记录,代表一个事件或一个人的某次记录
record Person(String id,
Statement event,
LocalDate eventDate,
int value) {} // 简化 value 类型为 int
// 最终结果的 DTO
record DTO(int month,
BigDecimal totalSum,
int totalPersons) {}初始数据可能以 Map
2. 聚合挑战分析
我们的目标是:
- 按月份(eventDate 的月份)进行分组。
- 计算每个月份所有 Person 记录中 value 的总和 (totalSum)。
- 计算每个月份的独立 Person 数量 (totalPersons),即同一个月份内,具有相同 id 的 Person 仅计为一人。
原始尝试中,常见的错误是直接对每个 Person 记录进行计数,导致 totalPersons 统计的是事件数量而非独立个体数量。例如,如果 per1 在同一个月内有两次记录,我们希望它只被计数一次。
立即学习“Java免费学习笔记(深入)”;
3. 构建自定义聚合器:PersonGroupMetric
为了同时处理求和与去重计数,并确保聚合逻辑的清晰与可复用性,我们引入一个自定义的聚合器 PersonGroupMetric。这个 record 将存储每个分组的中间统计结果。
record PersonGroupMetric(int count, int sum) {
// 定义一个空的度量值,作为 reducing 操作的初始值
public static final PersonGroupMetric EMPTY = new PersonGroupMetric(0, 0);
// 构造函数:将一个 Person 对象映射为初始的 PersonGroupMetric
// 注意:此处的 count 统计的是事件数量,后续会处理去重
public PersonGroupMetric(Person p) {
this(1, p.value());
}
// 合并方法:定义如何将两个 PersonGroupMetric 实例合并
public PersonGroupMetric add(PersonGroupMetric other) {
return new PersonGroupMetric(
this.count + other.count, // 累加事件计数
this.sum + other.sum // 累加值总和
);
}
}关键点: PersonGroupMetric 的 count 字段在 add 方法中累加的是事件数量,而不是独立人数。这是 Collectors.reducing 的一个特性,它会根据 mapper 将每个元素转换为 PersonGroupMetric,然后通过 combiner 累加。要实现独立人数计数,我们需要在 PersonGroupMetric 中额外存储 id,或者在聚合的后期进行处理。考虑到问题描述中 Person Count 的期望结果(月1为3人,月3为2人),这暗示了我们需要对 id 进行去重。
4. 核心聚合逻辑:使用 groupingBy 和 reducing
为了实现按月份分组,并计算总和与去重人数,我们将结合使用 Collectors.groupingBy 和 Collectors.reducing。然而,reducing 本身难以直接处理去重计数。因此,我们将采取两步走的策略:
第一步:按月份分组,计算总和,并收集所有 Person 的 id。
import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import static java.util.stream.Collectors.*; // 假设 employees 是 Map> // 为了简化示例,我们直接从 List src 开始 var src = List.of( new Person("per1", Statement.STATUS1, LocalDate.of(2022, 1, 10), 1), new Person("per2", Statement.STATUS2, LocalDate.of(2022, 1, 10), 2), new Person("per3", Statement.STATUS3, LocalDate.of(2022, 1, 10), 3), new Person("per1", Statement.STATUS4, LocalDate.of(2022, 1, 10), 1), // per1 在月1有重复记录 new Person("per1", Statement.STATUS1, LocalDate.of(2022, 2, 10), 1), new Person("per2", Statement.STATUS1, LocalDate.of(2022, 3, 10), 1), new Person("per3", Statement.STATUS2, LocalDate.of(2022, 3, 10), 2) ); // 定义一个临时的聚合结果类,用于在中间阶段存储总和和所有ID record TempMonthMetric(int sum, Set uniqueIds) { public static final TempMonthMetric EMPTY = new TempMonthMetric(0, ConcurrentHashMap.newKeySet()); // 使用并发Set以防并行流 public TempMonthMetric(Person p) { this(p.value(), Set.of(p.id())); } public TempMonthMetric add(TempMonthMetric other) { Set combinedIds = ConcurrentHashMap.newKeySet(); combinedIds.addAll(this.uniqueIds); combinedIds.addAll(other.uniqueIds); return new TempMonthMetric(this.sum + other.sum, combinedIds); } } Map monthlyAggregations = src.stream() .collect(groupingBy( p -> p.eventDate().getMonthValue(), reducing( TempMonthMetric.EMPTY, TempMonthMetric::new, TempMonthMetric::add ) ));
代码解析:
- 我们创建了一个新的 TempMonthMetric 记录,它不仅存储 sum,还存储一个 Set
来收集每个 Person 的 id。 - TempMonthMetric 的构造函数将一个 Person 映射为包含其 value 和 id 的初始度量。
- add 方法负责合并两个 TempMonthMetric:累加 sum,并合并 uniqueIds 集合。Set 的特性天然地保证了 id 的去重。
- reducing 收集器将每个 Person 转换为 TempMonthMetric,然后通过 add 方法逐步合并,最终得到按月份分组的 Map
。
第二步:将中间结果映射到最终的 DTO。
import java.util.Comparator; import java.math.BigDecimal; Listfin = monthlyAggregations.entrySet().stream() .map(entry -> new DTO( entry.getKey(), // 月份 new BigDecimal(entry.getValue().sum()), // 总和 entry.getValue().uniqueIds().size() // 独立人数 = 集合大小 )) .sorted(Comparator.comparing(DTO::month)) // 按月份排序 .collect(toList()); // 打印结果 fin.forEach(System.out::println);
预期输出:
DTO[month=1, totalSum=7, totalPersons=3] DTO[month=2, totalSum=1, totalPersons=1] DTO[month=3, totalSum=3, totalPersons=2]
这与问题中期望的 Person Count 结果一致。
5. 完整示例代码
为了方便读者理解和运行,以下是包含所有必要类和逻辑的完整代码示例:
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.math.BigDecimal;
import java.util.Comparator;
import static java.util.stream.Collectors.*;
public class StreamAggregationTutorial {
// 定义事件状态枚举
enum Statement {
STATUS1, STATUS2, STATUS3, STATUS4
}
// Person 记录,代表一个事件或一个人的某次记录
record Person(String id,
Statement event,
LocalDate eventDate,
int value) {}
// 最终结果的 DTO
record DTO(int month,
BigDecimal totalSum,
int totalPersons) {}
// 临时的聚合结果类,用于在中间阶段存储总和和所有ID
record TempMonthMetric(int sum, Set uniqueIds) {
// 定义一个空的度量值,作为 reducing 操作的初始值
// 使用 ConcurrentHashMap.newKeySet() 以支持并行流的安全集合操作
public static final TempMonthMetric EMPTY = new TempMonthMetric(0, ConcurrentHashMap.newKeySet());
// 构造函数:将一个 Person 对象映射为初始的 TempMonthMetric
public TempMonthMetric(Person p) {
this(p.value(), Set.of(p.id()));
}
// 合并方法:定义如何将两个 TempMonthMetric 实例合并
public TempMonthMetric add(TempMonthMetric other) {
Set combinedIds = ConcurrentHashMap.newKeySet();
combinedIds.addAll(this.uniqueIds);
combinedIds.addAll(other.uniqueIds);
return new TempMonthMetric(this.sum + other.sum, combinedIds);
}
}
public static void main(String[] args) {
// 示例数据
var src = List.of(
new Person("per1", Statement.STATUS1, LocalDate.of(2022, 1, 10), 1),
new Person("per2", Statement.STATUS2, LocalDate.of(2022, 1, 10), 2),
new Person("per3", Statement.STATUS3, LocalDate.of(2022, 1, 10), 3),
new Person("per1", Statement.STATUS4, LocalDate.of(2022, 1, 10), 1), // per1 在月1有重复记录
new Person("per1", Statement.STATUS1, LocalDate.of(2022, 2, 10), 1),
new Person("per2", Statement.STATUS1, LocalDate.of(2022, 3, 10), 1),
new Person("per3", Statement.STATUS2, LocalDate.of(2022, 3, 10), 2)
);
// 第一步:按月份分组,计算总和,并收集所有 Person 的 id
Map monthlyAggregations = src.stream()
.collect(groupingBy(
p -> p.eventDate().getMonthValue(),
reducing(
TempMonthMetric.EMPTY,
TempMonthMetric::new,
TempMonthMetric::add
)
));
// 第二步:将中间结果映射到最终的 DTO
List result = monthlyAggregations.entrySet().stream()
.map(entry -> new DTO(
entry.getKey(), // 月份
new BigDecimal(entry.getValue().sum()), // 总和
entry.getValue().uniqueIds().size() // 独立人数 = 集合大小
))
.sorted(Comparator.comparing(DTO::month)) // 按月份排序
.collect(toList());
// 打印最终结果
System.out.println("--- 最终聚合结果 ---");
result.forEach(System.out::println);
}
} 6. 注意事项与最佳实践
- 数据类型选择: 原始问题中 Person.value 为 Object,在实际聚合中通常需要转换为具体的数值类型(如 int, double, BigDecimal)。本教程为简化示例,将其假定为 int。对于货币或其他高精度计算,BigDecimal 是更好的选择。
- 记录 (Records) 的优势: Java 14 引入的 record 类型极大地简化了数据载体类的定义,减少了样板代码,并默认提供了 equals(), hashCode(), toString() 方法,非常适合作为聚合过程中的中间数据结构。
- Collectors.reducing 的使用场景: reducing 收集器适用于需要将流中的元素逐步合并成单个结果的场景。它需要一个初始值、一个将流元素映射到中间结果的函数,以及一个合并两个中间结果的函数。这对于自定义聚合逻辑非常强大。
- 去重计数: 当需要在分组聚合中进行去重计数时,将所有需要去重的标识符收集到一个 Set 中,然后取 Set 的大小,是实现此目标的一种有效且简洁的方法。
- 并行流安全性: 在 TempMonthMetric 中,如果考虑使用并行流 (.parallelStream()),Set 的实现需要是线程安全的,例如使用 ConcurrentHashMap.newKeySet()。在单线程流中,普通的 HashSet 即可。
- 代码可读性: 尽管 Stream API 提供了强大的功能,但过于复杂的链式操作可能会降低代码可读性。通过定义清晰的中间数据模型(如 TempMonthMetric),可以将复杂的聚合逻辑分解为更易于理解的步骤。
7. 总结
本教程详细展示了如何利用 Java Stream API 的 Collectors.groupingBy 和 Collectors.reducing 来解决复杂的数据聚合问题,特别是涉及多条件分组、求和以及去重计数的需求。通过创建自定义的中间聚合器 (TempMonthMetric),我们能够灵活地在流处理过程中捕获和合并所需的所有统计信息,最终精确地计算出按月份分组的总和与独立人数。这种模式对于处理类似的数据分析和报表生成任务具有很高的参考价值。










