0

0

Flink 中 String 到 JSONObject 转换的陷阱与优化实践

霞舞

霞舞

发布时间:2025-09-25 14:46:40

|

819人浏览过

|

来源于php中文网

原创

Flink 中 String 到 JSONObject 转换的陷阱与优化实践

本文深入探讨了在 Apache Flink 中将 JSON 字符串转换为 JSONObject 时遇到的常见问题,特别是由不当的 JSON 库使用和对象实例化方式导致的 NullPointerException。文章详细阐述了如何通过正确使用 org.json 库的 JSONObject 构造函数来解决此问题,并提供了代码示例。此外,还强调了在 Flink 应用中直接传递 JSONObject 的性能劣势,并推荐了使用 POJO 进行高效序列化和反序列化的最佳实践。

1. 问题背景:Flink 中 JSON 字符串转换的挑战

apache flink 流处理应用中,经常需要从数据源(如 kafka、文件等)接收 json 格式的字符串数据,并将其解析为可操作的 jsonobject 对象。然而,这一看似简单的转换过程有时会遇到意想不到的错误。一个常见的场景是,当尝试将解析后的 jsonobject 通过 collector 发送时,flink 作业会抛出 java.lang.nullpointerexception: assigned key must not be null! 异常,导致作业失败。尽管调试显示字符串已成功解析为 jsonobject 实例,但 flink 的内部机制却无法正确处理这些对象。

2. 错误示例与问题分析

让我们首先审视导致此问题的典型代码结构。以下是一个尝试在 Flink ProcessFunction 中将 String 转换为 JSONObject 的示例:

import com.alibaba.fastjson.JSONObject; // 假设使用了FastJSON库的JSONObject

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkJsonProcessingExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator jsonObjDS = inputDS.process(new ProcessFunction() {
            @Override
            public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception {
                // 问题所在:可能使用了不兼容的JSONObject解析方法
                JSONObject jsonObject = JSONObject.parseObject(value); // 假设这里使用了FastJSON的parseObject
                out.collect(jsonObject); // 在此处抛出异常
            }
        });
        jsonObjDS.print();

        env.execute();
    }
}

运行上述代码时,可能会遇到以下异常信息:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: Assigned key must not be null!
Caused by: java.lang.NullPointerException: Assigned key must not be null!

这个 NullPointerException: Assigned key must not be null! 异常通常与 Flink 内部的状态管理或数据序列化机制有关。尽管 JSONObject.parseObject(value) 可能成功将字符串转换为 JSONObject 对象,但如果这个 JSONObject 对象(例如,来自 com.alibaba.fastjson 库)与 Flink 内部默认期望的 JSONObject 类型(例如,来自 org.json 库)不兼容,或者其内部结构不符合 Flink 序列化器的预期,就可能导致在 out.collect() 时出现问题。

3. 正确的转换方法与代码实现

解决这个问题的关键在于确保 JSONObject 对象的正确实例化,并与 Flink 环境中使用的 JSON 库保持一致。如果目标是使用 org.json 库提供的 JSONObject 类型,那么正确的做法是利用其构造函数来解析 JSON 字符串。

3.1 引入正确的依赖

首先,请确保您的 pom.xml 文件中包含了 org.json 库的依赖:


    org.json
    json
    20180130 

3.2 修正代码逻辑

将 ProcessFunction 中的 JSONObject 实例化方式从 JSONObject.parseObject(value) 修改为 new JSONObject(value):

import org.json.JSONObject; // 确保导入的是org.json库的JSONObject

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkJsonProcessingSolution {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator jsonObjDS = inputDS.process(new ProcessFunction() {
            @Override
            public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception {
                // 正确的做法:使用org.json.JSONObject的构造函数
                JSONObject jsonObject = new JSONObject(value);
                out.collect(jsonObject);
            }
        });
        jsonObjDS.print();

        env.execute();
    }
}

通过这一修改,Flink 作业将能够成功运行并打印出解析后的 JSONObject,例如:

{"bill_info":{"ADDER_NAME":"sss","ADDER_NO":"0706","UPDATER_NAME":"ssss","UPDATER_NO":"0706","BILL_ID":"8687b584-038c-498c-8f97-ec1ca197da96","ADD_TIME":"2022-11-12 16:05:28:418","ORDER_ID":"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67","S_USER_ID":"s68","B_USER_ID":"b77","UPDATE_TIME":"2022-11-12 16:05:28:418"}}

这个解决方案的关键在于,org.json.JSONObject 的构造函数 new JSONObject(String source) 能够直接从 JSON 字符串构建一个 JSONObject 实例,并且这个实例与 Flink 内部默认的序列化器对 org.json.JSONObject 的处理方式是兼容的。而如果混合使用不同 JSON 库的 API(例如,使用 com.alibaba.fastjson.JSONObject.parseObject 来创建对象,但 Flink 的类型信息或序列化器期望的是 org.json.JSONObject),就可能导致上述的 NullPointerException。

4. 关键点与注意事项

4.1 JSON 库的选择与统一

Java 生态系统中有多种 JSON 处理库,如 org.json、FastJSON、Jackson、Gson 等。在 Flink 应用中,选择一个库并保持其在整个项目中的一致性至关重要。如果您的项目依赖了多个 JSON 库,请务必明确您正在使用的 JSONObject 类是来自哪个库,并使用该库提供的正确 API 进行解析和操作。本教程中的解决方案是针对 org.json 库的。

玄鲸Timeline
玄鲸Timeline

一个AI驱动的历史时间线生成平台

下载

4.2 依赖管理

确保 Maven 或 Gradle 依赖中只包含您实际使用的 JSON 库,并避免版本冲突。不必要的依赖或冲突可能导致类加载问题,进而引发运行时错误。

4.3 性能考量与最佳实践:优先使用 POJO

尽管将 JSON 字符串直接转换为 JSONObject 看起来很方便,但在 Flink 这样的高性能流处理框架中,直接在数据流中传递 JSONObject 通常不是最佳实践。原因如下:

  • 序列化/反序列化开销大: JSONObject 是一种通用容器,其内部结构复杂,字段类型不固定。这使得 Flink 在序列化和反序列化这些对象时需要进行更多的反射和类型推断,导致显著的性能开销。
  • 缺乏类型安全: 直接操作 JSONObject 需要通过字符串键来访问字段,容易出错且缺乏编译时检查。
  • 代码可读性与维护性: 使用 POJO 可以使代码更清晰、更易读,因为它明确定义了数据的结构。

推荐的做法是: 将 JSON 字符串反序列化为 POJO(Plain Old Java Object)。POJO 具有明确的结构和类型,Fink 可以对其进行高效的序列化(如 Kryo 序列化器),从而显著提升性能并增强类型安全性。

示例:将 JSON 转换为 POJO

  1. 定义 POJO 类:

    public class BillInfo {
        public String ADD_TIME;
        public String ORDER_ID;
        public String ADDER_NO;
        public String UPDATER_NO;
        public String S_USER_ID;
        public String B_USER_ID;
        public String BILL_ID;
        public String ADDER_NAME;
        public String UPDATE_TIME;
        public String UPDATER_NAME;
    
        // 默认构造函数是必需的
        public BillInfo() {}
    
        // 可以在这里添加getter/setter或其他业务逻辑
    }
    
    public class BillData {
        public BillInfo bill_info;
    
        public BillData() {}
    }
  2. 使用 MapFunction 或 ProcessFunction 结合 JSON 库(如 Jackson 或 Gson)进行反序列化:

    import com.fasterxml.jackson.databind.ObjectMapper; // 示例使用Jackson
    
    // ... Flink setup code ...
    
    SingleOutputStreamOperator billDataDS = inputDS.map(jsonString -> {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.readValue(jsonString, BillData.class);
    });
    
    billDataDS.print();
    env.execute();

    或者,对于更复杂的场景,可以实现一个自定义的 DeserializationSchema。

5. 总结

在 Flink 中处理 JSON 字符串时,正确选择和使用 JSON 库的 API 是避免 NullPointerException 的关键。特别是当使用 org.json 库时,应通过 new JSONObject(String source) 构造函数来实例化对象。然而,从性能和类型安全的角度考虑,将 JSON 字符串反序列化为 POJO 是更推荐的实践。通过采用 POJO,您可以构建更健壮、高效且易于维护的 Flink 流处理应用。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
json数据格式
json数据格式

JSON是一种轻量级的数据交换格式。本专题为大家带来json数据格式相关文章,帮助大家解决问题。

420

2023.08.07

json是什么
json是什么

JSON是一种轻量级的数据交换格式,具有简洁、易读、跨平台和语言的特点,JSON数据是通过键值对的方式进行组织,其中键是字符串,值可以是字符串、数值、布尔值、数组、对象或者null,在Web开发、数据交换和配置文件等方面得到广泛应用。本专题为大家提供json相关的文章、下载、课程内容,供大家免费下载体验。

536

2023.08.23

jquery怎么操作json
jquery怎么操作json

操作的方法有:1、“$.parseJSON(jsonString)”2、“$.getJSON(url, data, success)”;3、“$.each(obj, callback)”;4、“$.ajax()”。更多jquery怎么操作json的详细内容,可以访问本专题下面的文章。

311

2023.10.13

go语言处理json数据方法
go语言处理json数据方法

本专题整合了go语言中处理json数据方法,阅读专题下面的文章了解更多详细内容。

77

2025.09.10

Java Maven专题
Java Maven专题

本专题聚焦 Java 主流构建工具 Maven 的学习与应用,系统讲解项目结构、依赖管理、插件使用、生命周期与多模块项目配置。通过企业管理系统、Web 应用与微服务项目实战,帮助学员全面掌握 Maven 在 Java 项目构建与团队协作中的核心技能。

0

2025.09.15

kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

168

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

151

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

202

2024.02.23

C++ 设计模式与软件架构
C++ 设计模式与软件架构

本专题深入讲解 C++ 中的常见设计模式与架构优化,包括单例模式、工厂模式、观察者模式、策略模式、命令模式等,结合实际案例展示如何在 C++ 项目中应用这些模式提升代码可维护性与扩展性。通过案例分析,帮助开发者掌握 如何运用设计模式构建高质量的软件架构,提升系统的灵活性与可扩展性。

9

2026.01.30

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 3万人学习

C# 教程
C# 教程

共94课时 | 8万人学习

Java 教程
Java 教程

共578课时 | 53.5万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号