0

0

Flink 中高效解析 JSON 字符串至 JSONObject 的实践指南

霞舞

霞舞

发布时间:2025-09-25 12:47:00

|

279人浏览过

|

来源于php中文网

原创

flink 中高效解析 json 字符串至 jsonobject 的实践指南

本文探讨了在 Apache Flink 流处理任务中,将 JSON 格式的字符串数据转换为 JSONObject 时可能遇到的 NullPointerException: Assigned key must not be null! 错误。通过分析问题根源,本文提供了一种基于 org.json 库的有效解决方案,并强调了使用 POJO 进行 JSON 反序列化的最佳实践,以提升 Flink 应用的性能和健壮性。

1. 引言与背景

在 Apache Flink 流处理应用中,处理来自 Kafka、文件或其他数据源的 JSON 格式字符串数据是常见的场景。开发者通常需要将这些原始字符串解析成结构化的 JSONObject 对象,以便进行后续的字段提取、转换或业务逻辑处理。然而,在尝试将解析后的 JSONObject 通过 Flink 的 Collector 发出时,有时会遇到令人困惑的 NullPointerException: Assigned key must not be null! 错误,即使调试显示字符串已成功解析为 JSONObject 实例。本文将深入探讨这一问题,并提供一个可靠的解决方案及最佳实践建议。

2. 问题描述:String 到 JSONObject 的转换困境

考虑以下 Flink 任务代码片段,其目标是将一个包含 JSON 字符串的 DataStream<String> 转换为 DataStream<JSONObject>:

import com.alibaba.fastjson.JSONObject; // 假设使用Fastjson
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkJsonParseIssue {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator<JSONObject> jsonObjDS = inputDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
                // 尝试使用 JSONObject.parseObject 进行解析
                JSONObject jsonObject = JSONObject.parseObject(value);
                out.collect(jsonObject); // 在此处抛出异常
            }
        });
        jsonObjDS.print();

        env.execute();
    }
}

当运行上述代码时,尽管在 processElement 内部调试发现 JSONObject.parseObject(value) 确实成功生成了 JSONObject 实例,但在调用 out.collect(jsonObject) 时,程序却抛出了以下运行时异常:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: Assigned key must not be null!
Caused by: java.lang.NullPointerException: Assigned key must not be null!

这个错误信息 Assigned key must not be null! 通常与 Flink 的状态管理或某些内部序列化机制有关,但在此场景下,它并非直接指向用户代码中显式设置的 Key 为 null。这表明问题可能出在 JSONObject 对象本身的某种特性,或者其与 Flink 内部序列化机制的兼容性上。

3. 解决方案:使用 org.json 库的正确姿势

经过分析和实践,我们发现问题可能与所使用的 JSON 库及其解析方式有关。如果采用 org.json 库,并使用其构造函数进行解析,可以有效避免上述问题。

首先,确保项目中引入了 org.json 库的依赖:

<dependency>
    <groupId>org.json</groupId>
    <artifactId>json</artifactId>
    <version>20180130</version> <!-- 或更高版本 -->
</dependency>

然后,修改 processElement 方法中的 JSON 解析逻辑,使用 org.json.JSONObject 的构造函数来创建对象:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.json.JSONObject; // 注意这里导入的是 org.json.JSONObject

public class FlinkJsonParseSolution {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator<JSONObject> jsonObjDS = inputDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
                // 使用 org.json.JSONObject 的构造函数进行解析
                JSONObject jsonObject = new JSONObject(value);
                out.collect(jsonObject); // 现在可以正常收集
            }
        });
        jsonObjDS.print();

        env.execute();
    }
}

修改后,程序将能够顺利运行并打印出解析后的 JSONObject 内容,例如:

{"bill_info":{"ADDER_NAME":"sss","ADDER_NO":"0706","UPDATER_NAME":"ssss","UPDATER_NO":"0706","BILL_ID":"8687b584-038c-498c-8f97-ec1ca197da96","ADD_TIME":"2022-11-12 16:05:28:418","ORDER_ID":"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67","S_USER_ID":"s68","B_USER_ID":"b77","UPDATE_TIME":"2022-11-12 16:05:28:418"}}

这表明 org.json.JSONObject 的构造函数方式与 Flink 的内部机制(特别是序列化)具有更好的兼容性,从而避免了 NullPointerException。

4. 注意事项与最佳实践

尽管上述方法可以解决 JSONObject 收集的问题,但在生产环境中,直接在 Flink 流中传递 JSONObject 实例通常不是最佳实践。

ModelGate
ModelGate

一站式AI模型管理与调用工具

下载
  1. 序列化与反序列化开销: JSONObject 对象通常包含复杂的内部结构,其序列化和反序列化成本较高。在 Flink 这种高性能的流处理框架中,频繁地序列化和反序列化 JSONObject 会带来显著的性能损耗,尤其是在数据量大或状态操作多的场景下。

  2. 类型安全与可读性: 使用 JSONObject 意味着在运行时通过字符串键访问数据,缺乏编译时检查,容易出错。代码可读性也相对较差。

推荐的最佳实践是:将 JSON 数据反序列化为 Plain Old Java Objects (POJOs)。

POJO 具有以下优势:

  • 性能优化: Flink 对 POJO 有着优秀的序列化支持(特别是 Kryo 序列化),通常比通用 JSON 对象更高效。
  • 类型安全: POJO 定义了明确的字段类型,可以在编译时捕获类型错误。
  • 代码可读性与维护性: 业务逻辑可以直接操作 POJO 字段,代码更清晰、更易于理解和维护。

示例:将 JSON 反序列化为 POJO

假设我们的 JSON 结构可以映射到一个 BillInfo POJO:

// 定义 POJO 类
public class BillInfo {
    private String ADD_TIME;
    private String ORDER_ID;
    private String ADDER_NO;
    private String UPDATER_NO;
    private String S_USER_ID;
    private String B_USER_ID;
    private String BILL_ID;
    private String ADDER_NAME;
    private String UPDATE_TIME;
    private String UPDATER_NAME;

    // 必须提供无参构造函数
    public BillInfo() {}

    // 提供所有字段的 Getter 和 Setter 方法
    public String getADD_TIME() { return ADD_TIME; }
    public void setADD_TIME(String ADD_TIME) { this.ADD_TIME = ADD_TIME; }
    // ... 其他字段的 Getter/Setter
    public String getORDER_ID() { return ORDER_ID; }
    public void setORDER_ID(String ORDER_ID) { this.ORDER_ID = ORDER_ID; }
    public String getADDER_NO() { return ADDER_NO; }
    public void setADDER_NO(String ADDER_NO) { this.ADDER_NO = ADDER_NO; }
    public String getUPDATER_NO() { return UPDATER_NO; }
    public void setUPDATER_NO(String UPDATER_NO) { this.UPDATER_NO = UPDATER_NO; }
    public String getS_USER_ID() { return S_USER_ID; }
    public void setS_USER_ID(String S_USER_ID) { this.S_USER_ID = S_USER_ID; }
    public String getB_USER_ID() { return B_USER_ID; }
    public void setB_USER_ID(String B_USER_ID) { this.B_USER_ID = B_USER_ID; }
    public String getBILL_ID() { return BILL_ID; }
    public void setBILL_ID(String BILL_ID) { this.BILL_ID = BILL_ID; }
    public String getADDER_NAME() { return ADDER_NAME; }
    public void setADDER_NAME(String ADDER_NAME) { this.ADDER_NAME = ADDER_NAME; }
    public String getUPDATE_TIME() { return UPDATE_TIME; }
    public void setUPDATE_TIME(String UPDATE_TIME) { this.UPDATE_TIME = UPDATE_TIME; }
    public String getUPDATER_NAME() { return UPDATER_NAME; }
    public void setUPDATER_NAME(String UPDATER_NAME) { this.UPDATER_NAME = UPDATER_NAME; }

    @Override
    public String toString() {
        return "BillInfo{" +
               "ADD_TIME='" + ADD_TIME + '\'' +
               ", ORDER_ID='" + ORDER_ID + '\'' +
               // ... 其他字段
               '}';
    }
}

// 在 Flink 任务中使用 Jackson 或 Gson 进行反序列化
import com.fasterxml.jackson.databind.ObjectMapper; // 假设使用Jackson
// 或 import com.google.gson.Gson;

public class FlinkJsonToPojoSolution {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator<BillInfo> billInfoDS = inputDS.process(new ProcessFunction<String, BillInfo>() {
            // ObjectMapper 是线程安全的,可以作为成员变量
            private transient ObjectMapper objectMapper;

            @Override
            public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
                super.open(parameters);
                objectMapper = new ObjectMapper();
            }

            @Override
            public void processElement(String value, ProcessFunction<String, BillInfo>.Context ctx, Collector<BillInfo> out) throws Exception {
                // 假设 JSON 字符串的根元素是包含 "bill_info" 的对象
                // 需要先解析到 JsonNode 或 Map,再提取 "bill_info" 部分
                // 或者如果确定字符串直接是 bill_info 的内容,可以直接反序列化

                // 这里假设 value 是包含一个根键 "bill_info" 的完整 JSON 字符串
                // 先解析为 JsonNode,然后提取 "bill_info" 的内容
                com.fasterxml.jackson.databind.JsonNode rootNode = objectMapper.readTree(value);
                com.fasterxml.jackson.databind.JsonNode billInfoNode = rootNode.get("bill_info");

                if (billInfoNode != null) {
                    BillInfo billInfo = objectMapper.treeToValue(billInfoNode, BillInfo.class);
                    out.collect(billInfo);
                } else {
                    // 处理 JSON 格式不符合预期的情况
                    System.err.println("JSON string missing 'bill_info' key: " + value);
                }
            }
        });
        billInfoDS.print();

        env.execute();
    }
}

注意: 如果 JSON 字符串直接是 BillInfo 对象的内容(即没有外层的 {"bill_info": ...}),则 processElement 可以简化为 BillInfo billInfo = objectMapper.readValue(value, BillInfo.class);。上述示例处理了带有嵌套 bill_info 键的情况。

5. 总结

在 Flink 中处理 JSON 字符串时,将字符串解析为 JSONObject 并通过 Collector 发出可能因 JSON 库的选择和使用方式不当而导致 NullPointerException: Assigned key must not be null! 错误。通过使用 org.json 库并采用其构造函数 new JSONObject(value) 的方式,可以有效解决此问题。然而,从性能、类型安全和可维护性角度考虑,更推荐的做法是将 JSON 数据反序列化为 POJO。选择合适的 JSON 处理策略,将有助于构建更健壮、高效的 Flink 流处理应用。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
json数据格式
json数据格式

JSON是一种轻量级的数据交换格式。本专题为大家带来json数据格式相关文章,帮助大家解决问题。

457

2023.08.07

json是什么
json是什么

JSON是一种轻量级的数据交换格式,具有简洁、易读、跨平台和语言的特点,JSON数据是通过键值对的方式进行组织,其中键是字符串,值可以是字符串、数值、布尔值、数组、对象或者null,在Web开发、数据交换和配置文件等方面得到广泛应用。本专题为大家提供json相关的文章、下载、课程内容,供大家免费下载体验。

549

2023.08.23

jquery怎么操作json
jquery怎么操作json

操作的方法有:1、“$.parseJSON(jsonString)”2、“$.getJSON(url, data, success)”;3、“$.each(obj, callback)”;4、“$.ajax()”。更多jquery怎么操作json的详细内容,可以访问本专题下面的文章。

337

2023.10.13

go语言处理json数据方法
go语言处理json数据方法

本专题整合了go语言中处理json数据方法,阅读专题下面的文章了解更多详细内容。

82

2025.09.10

kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

175

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

159

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

207

2024.02.23

Java 流式处理与 Apache Kafka 实战
Java 流式处理与 Apache Kafka 实战

本专题专注讲解 Java 在流式数据处理与消息队列系统中的应用,系统讲解 Apache Kafka 的基础概念、生产者与消费者模型、Kafka Streams 与 KSQL 流式处理框架、实时数据分析与监控,结合实际业务场景,帮助开发者构建 高吞吐量、低延迟的实时数据流管道,实现高效的数据流转与处理。

177

2026.02.04

TypeScript类型系统进阶与大型前端项目实践
TypeScript类型系统进阶与大型前端项目实践

本专题围绕 TypeScript 在大型前端项目中的应用展开,深入讲解类型系统设计与工程化开发方法。内容包括泛型与高级类型、类型推断机制、声明文件编写、模块化结构设计以及代码规范管理。通过真实项目案例分析,帮助开发者构建类型安全、结构清晰、易维护的前端工程体系,提高团队协作效率与代码质量。

26

2026.03.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 4.4万人学习

C# 教程
C# 教程

共94课时 | 11.3万人学习

Java 教程
Java 教程

共578课时 | 81.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号