0

0

Kafka Streams 实现按字段分组并生成组内所有两两组合的完整教程

心靈之曲

心靈之曲

发布时间:2026-02-25 09:26:19

|

238人浏览过

|

来源于php中文网

原创

Kafka Streams 实现按字段分组并生成组内所有两两组合的完整教程

本文详解如何使用 kafka streams 对消息按指定字段(如 groupid)分组,并为每个组生成所有可能的两两 id 组合,重点介绍基于 key 重映射 + groupbykey 的标准流式方案及 aggregate 的替代实现。

本文详解如何使用 kafka streams 对消息按指定字段(如 groupid)分组,并为每个组生成所有可能的两两 id 组合,重点介绍基于 key 重映射 + groupbykey 的标准流式方案及 aggregate 的替代实现。

在 Kafka Streams 应用中,常需对事件流进行逻辑分组后执行组合计算(如生成配对、关联分析、协同过滤候选集等)。典型场景是:原始消息以 JSON 形式存储于 compact topic,结构为 {"id": 1, "groupId": 1},目标是将相同 groupId 的记录聚合成一组,并输出该组内所有无序、不重复的两两 id 组合(即 C(n,2) 组合数),例如 groupId=1 包含 id 1/2/3,则输出三对:"1-2"、"1-3"、"2-3"。

✅ 推荐方案:Key 重映射 + groupByKey + mapValues(流式、可扩展、状态轻量)

核心思路是将业务分组键(groupId)提升为 Kafka 消息的 record key,从而利用 Kafka Streams 内置的分区与聚合语义,确保同组数据路由至同一 task 并自然完成分组:

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import java.util.*;

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source = builder.stream("input-topic", 
    Consumed.with(Serdes.String(), Serdes.String()));

KStream<String, String> groupedCombinations = source
    // Step 1: 解析 JSON,提取 groupId 作为新 key,保留原值
    .map((key, value) -> {
        try {
            Map<String, Object> json = new ObjectMapper().readValue(value, Map.class);
            String groupId = String.valueOf(json.get("groupId"));
            return KeyValue.pair(groupId, value);
        } catch (Exception e) {
            throw new RuntimeException("Invalid JSON: " + value, e);
        }
    })
    // Step 2: 按新 key(groupId)分组 → 转为 KGroupedStream
    .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
    // Step 3: 对每组 value 列表执行两两组合生成
    .aggregate(
        ArrayList::new,
        (groupId, value, list) -> {
            try {
                Map<String, Object> json = new ObjectMapper().readValue(value, Map.class);
                Integer id = ((Number) json.get("id")).intValue();
                list.add(id);
                return list;
            } catch (Exception e) {
                return list; // 跳过解析失败项
            }
        },
        Materialized.<String, List<Integer>>as("group-id-store")
            .withKeySerde(Serdes.String())
            .withValueSerde(Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(List.class)))
    )
    .toStream()
    .flatMapValues((groupId, ids) -> {
        List<String> combinations = new ArrayList<>();
        for (int i = 0; i < ids.size(); i++) {
            for (int j = i + 1; j < ids.size(); j++) {
                combinations.add(ids.get(i) + "-" + ids.get(j));
            }
        }
        return combinations;
    });

groupedCombinations.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

优势说明

Solvely
Solvely

AI学习伴侣,数学解体,作业助手,家教辅导

下载
  • 状态仅维护每个 groupId 对应的 List(内存可控);
  • 利用 RocksDB state store 实现容错与恢复;
  • 支持 exactly-once 处理语义(启用 processing.guarantee=exactly_once_v2);
  • 可水平扩展:不同 groupId 自动分布到不同分区。

⚠️ 注意事项与最佳实践

  • 避免在 map 中做复杂计算:map 阶段仅做轻量 key 提取,组合逻辑必须放在 aggregate 或 transform 后续阶段,防止阻塞流处理;
  • JSON 解析性能:生产环境建议复用 ObjectMapper 实例(static final),禁用动态类型解析(如 enableDefaultTyping);
  • 空组/单元素组处理:上述代码中 i+1
  • 状态大小监控:若某 groupId 关联数万条记录,List 可能引发 OOM —— 此时应改用 windowed aggregation 或引入外部存储(如 Redis)缓存中间状态;
  • compact topic 兼容性:因输入 topic 已 compact,aggregate 的状态更新会自动跟随 log compaction 语义,旧 groupId 记录被清理后,对应状态也会被 evict(需配合 retention.ms 合理配置)。

? 替代方案:纯 transform() + Processor API(高灵活性,低抽象)

若需更精细控制(如增量更新组合、去重合并、带时间窗口),可使用 transform() 注册自定义 Transformer,内部维护 Map>,并在 punctuate() 中批量输出。但此方式丧失 DSL 的可读性与优化能力,仅建议在 DSL 无法满足需求时采用。

综上,“重设 key → groupByKey → aggregate 构建集合 → flatMapValues 生成组合” 是最符合 Kafka Streams 设计哲学、兼具正确性、可维护性与扩展性的标准解法。开发者应优先采用该模式,并结合监控与压测验证其在目标数据规模下的稳定性。

相关文章

Kafka Eagle可视化工具
Kafka Eagle可视化工具

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

智谱清言 - 免费全能的AI助手
智谱清言 - 免费全能的AI助手

智谱清言 - 免费全能的AI助手

相关专题

更多
json数据格式
json数据格式

JSON是一种轻量级的数据交换格式。本专题为大家带来json数据格式相关文章,帮助大家解决问题。

448

2023.08.07

json是什么
json是什么

JSON是一种轻量级的数据交换格式,具有简洁、易读、跨平台和语言的特点,JSON数据是通过键值对的方式进行组织,其中键是字符串,值可以是字符串、数值、布尔值、数组、对象或者null,在Web开发、数据交换和配置文件等方面得到广泛应用。本专题为大家提供json相关的文章、下载、课程内容,供大家免费下载体验。

544

2023.08.23

jquery怎么操作json
jquery怎么操作json

操作的方法有:1、“$.parseJSON(jsonString)”2、“$.getJSON(url, data, success)”;3、“$.each(obj, callback)”;4、“$.ajax()”。更多jquery怎么操作json的详细内容,可以访问本专题下面的文章。

323

2023.10.13

go语言处理json数据方法
go语言处理json数据方法

本专题整合了go语言中处理json数据方法,阅读专题下面的文章了解更多详细内容。

81

2025.09.10

kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

175

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

156

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

206

2024.02.23

Java 流式处理与 Apache Kafka 实战
Java 流式处理与 Apache Kafka 实战

本专题专注讲解 Java 在流式数据处理与消息队列系统中的应用,系统讲解 Apache Kafka 的基础概念、生产者与消费者模型、Kafka Streams 与 KSQL 流式处理框架、实时数据分析与监控,结合实际业务场景,帮助开发者构建 高吞吐量、低延迟的实时数据流管道,实现高效的数据流转与处理。

117

2026.02.04

Golang 生态工具与框架:扩展开发能力
Golang 生态工具与框架:扩展开发能力

《Golang 生态工具与框架》系统梳理 Go 语言在实际工程中的主流工具链与框架选型思路,涵盖 Web 框架、RPC 通信、依赖管理、测试工具、代码生成与项目结构设计等内容。通过真实项目场景解析不同工具的适用边界与组合方式,帮助开发者构建高效、可维护的 Go 工程体系,并提升团队协作与交付效率。

18

2026.02.24

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
极客学院Java8新特性视频教程
极客学院Java8新特性视频教程

共17课时 | 3.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号