0

0

Flink中JSON字符串到JSONObject转换的陷阱与最佳实践

霞舞

霞舞

发布时间:2025-09-25 12:02:19

|

507人浏览过

|

来源于php中文网

原创

Flink中JSON字符串到JSONObject转换的陷阱与最佳实践

在Apache Flink中处理JSON字符串时,开发者常遇到将String类型数据转换为JSONObject的需求。然而,直接使用JSONObject.parseObject()可能导致NullPointerException,即使字符串已正确解析。本文将揭示这一常见问题的原因,提供使用org.json库中new JSONObject(value)的正确解决方案,并强调出于性能和类型安全考虑,在生产环境中优先使用POJO进行JSON反序列化的最佳实践。

Flink中JSON字符串转换的常见问题

apache flink流处理应用中,从数据源(如kafka、文件等)获取的原始数据通常是json格式的字符串。为了进一步解析和处理这些结构化数据,我们通常需要将其转换为jsonobject对象。然而,许多开发者在尝试将string类型的数据通过processfunction或其他算子转换为jsonobject并收集输出时,可能会遇到如下异常:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: Assigned key must not be null!
Caused by: java.lang.NullPointerException: Assigned key must not be null!

尽管在调试过程中发现String已经成功解析成了JSONObject实例,但当尝试通过Collector收集这些对象时,作业却失败了。这表明问题并非出在JSON解析本身,而可能与JSONObject对象的特定实现、其与Flink内部序列化机制的兼容性,或者其内部状态有关。

原始的错误代码片段如下:

import com.alibaba.fastjson.JSONObject; // 假设使用了Fastjson
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkJsonProcessingIssue {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator<JSONObject> jsonObjDS = inputDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
                // 尝试使用JSONObject.parseObject()
                JSONObject jsonObject = JSONObject.parseObject(value);
                out.collect(jsonObject); // 在这里抛出异常
            }
        });
        jsonObjDS.print();

        env.execute();
    }
}

解决方案:使用org.json库的JSONObject构造函数

解决上述NullPointerException的关键在于选择正确的JSONObject实现及其初始化方式。经过验证,使用org.json库提供的JSONObject,并通过其构造函数直接传入JSON字符串可以避免此问题。

首先,确保您的项目中引入了org.json库的依赖:

<dependency>
    <groupId>org.json</groupId>
    <artifactId>json</artifactId>
    <version>20180130</version> <!-- 可以根据需要选择最新稳定版本 -->
</dependency>

然后,在ProcessFunction中将JSONObject.parseObject(value)替换为new JSONObject(value):

import org.json.JSONObject; // 注意这里引入的是org.json.JSONObject
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkJsonProcessingSolution {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator<JSONObject> jsonObjDS = inputDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
                // 使用org.json.JSONObject的构造函数
                JSONObject jsonObject = new JSONObject(value);
                out.collect(jsonObject); // 现在可以正常收集
            }
        });
        jsonObjDS.print();

        env.execute();
    }
}

此更改后,Flink作业将能够正常运行并打印出转换后的JSONObject。这表明org.json库的JSONObject实现与Flink的内部机制(尤其是其类型序列化器)兼容性更好,或者其内部状态在被序列化和反序列化时能够保持完整性,从而避免了NullPointerException。

最佳实践:优先使用POJO进行JSON反序列化

尽管上述方法能够解决String到JSONObject的转换问题,但在实际生产环境中,特别是在处理大量数据或对性能有较高要求的Flink应用中,直接传递和处理JSONObject通常不是最佳实践。

原因如下:

Magic AI Avatars
Magic AI Avatars

神奇的AI头像,获得200多个由AI制作的自定义头像。

下载
  1. 序列化与反序列化开销大: JSONObject是一个通用的Map结构,其内部字段类型不固定,这使得Flink在序列化和反序列化JSONObject时,需要进行更多的元数据处理和类型推断,导致额外的CPU和内存开销。相比之下,POJO(Plain Old Java Object)具有固定的结构和明确的字段类型,Flink可以利用Kyro等高效序列化器进行快速、紧凑的序列化。
  2. 缺乏类型安全: JSONObject的操作通常基于字符串键值对,容易出现拼写错误或类型转换错误,且这些错误通常在运行时才能发现。而POJO提供了编译时类型检查,能够有效减少运行时错误。
  3. 可读性和可维护性差: 使用JSONObject意味着需要通过getString("key")、getInt("key")等方法手动提取字段,代码冗长且不易阅读。POJO则允许直接通过属性访问数据,代码更简洁、更具可读性。
  4. Schema演进: 随着业务发展,JSON数据的Schema可能会发生变化。POJO可以更优雅地处理Schema演进,例如通过添加新字段或使用@JsonIgnoreProperties(ignoreUnknown = true)注解忽略未知字段。

推荐做法:将JSON字符串反序列化为POJO

在Flink中,最佳实践是将JSON字符串反序列化为定义好的POJO类。这通常通过自定义DeserializationSchema或使用Flink提供的JSON格式(如JsonRowSerializationSchema)来实现。

例如,对于上述JSON数据,我们可以定义一个POJO类:

import java.io.Serializable;

public class BillInfo implements Serializable {
    private String ADD_TIME;
    private String ORDER_ID;
    private String ADDER_NO;
    private String UPDATER_NO;
    private String S_USER_ID;
    private String B_USER_ID;
    private String BILL_ID;
    private String ADDER_NAME;
    private String UPDATE_TIME;
    private String UPDATER_NAME;

    // 必须有无参构造函数
    public BillInfo() {}

    // Getter和Setter方法
    public String getADD_TIME() { return ADD_TIME; }
    public void setADD_TIME(String ADD_TIME) { this.ADD_TIME = ADD_TIME; }
    public String getORDER_ID() { return ORDER_ID; }
    public void setORDER_ID(String ORDER_ID) { this.ORDER_ID = ORDER_ID; }
    public String getADDER_NO() { return ADDER_NO; }
    public void setADDER_NO(String ADDER_NO) { this.ADDER_NO = ADDER_NO; }
    public String getUPDATER_NO() { return UPDATER_NO; }
    public void setUPDATER_NO(String UPDATER_NO) { this.UPDATER_NO = UPDATER_NO; }
    public String getS_USER_ID() { return S_USER_ID; }
    public void setS_USER_ID(String S_USER_ID) { this.S_USER_ID = S_USER_ID; }
    public String getB_USER_ID() { return B_USER_ID; }
    public void setB_USER_ID(String B_USER_ID) { this.B_USER_ID = B_USER_ID; }
    public String getBILL_ID() { return BILL_ID; }
    public void setBILL_ID(String BILL_ID) { this.BILL_ID = BILL_ID; }
    public String getADDER_NAME() { return ADDER_NAME; }
    public void setADDER_NAME(String ADDER_NAME) { this.ADDER_NAME = ADDER_NAME; }
    public String getUPDATE_TIME() { return UPDATE_TIME; }
    public void setUPDATE_TIME(String UPDATE_TIME) { this.UPDATE_TIME = UPDATE_TIME; }
    public String getUPDATER_NAME() { return UPDATER_NAME; }
    public void setUPDATER_NAME(String UPDATER_NAME) { this.UPDATER_NAME = UPDATER_NAME; }

    @Override
    public String toString() {
        return "BillInfo{" +
               "ADD_TIME='" + ADD_TIME + '\'' +
               ", ORDER_ID='" + ORDER_ID + '\'' +
               ", ADDER_NO='" + ADDER_NO + '\'' +
               ", UPDATER_NO='" + UPDATER_NO + '\'' +
               ", S_USER_ID='" + S_USER_ID + '\'' +
               ", B_USER_ID='" + B_USER_ID + '\'' +
               ", BILL_ID='" + BILL_ID + '\'' +
               ", ADDER_NAME='" + ADDER_NAME + '\'' +
               ", UPDATE_TIME='" + UPDATE_TIME + '\'' +
               ", UPDATER_NAME='" + UPDATER_NAME + '\'' +
               '}';
    }
}

// 如果JSON结构更复杂,包含嵌套对象,则需要定义相应的嵌套POJO
public class RootData implements Serializable {
    private BillInfo bill_info;

    public RootData() {}

    public BillInfo getBill_info() { return bill_info; }
    public void setBill_info(BillInfo bill_info) { this.bill_info = bill_info; }

    @Override
    public String toString() {
        return "RootData{" +
               "bill_info=" + bill_info +
               '}';
    }
}

然后,可以使用Jackson或Gson等库在ProcessFunction中将字符串反序列化为POJO:

import com.fasterxml.jackson.databind.ObjectMapper; // 引入Jackson库
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkPojoProcessing {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator<RootData> pojoDS = inputDS.process(new ProcessFunction<String, RootData>() {
            // ObjectMapper 是线程安全的,可以作为类成员或静态成员
            private transient ObjectMapper objectMapper; 

            @Override
            public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
                super.open(parameters);
                objectMapper = new ObjectMapper();
            }

            @Override
            public void processElement(String value, ProcessFunction<String, RootData>.Context ctx, Collector<RootData> out) throws Exception {
                RootData rootData = objectMapper.readValue(value, RootData.class);
                out.collect(rootData);
            }
        });
        pojoDS.print();

        env.execute();
    }
}

为了使用Jackson,需要添加以下依赖:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.13.4</version> <!-- 根据需要选择最新稳定版本 -->
</dependency>

总结

在Flink中将JSON字符串转换为JSONObject时,如果遇到NullPointerException,尝试使用org.json库的new JSONObject(value)构造函数通常可以解决问题。然而,从长期维护和性能优化的角度来看,强烈建议将JSON字符串反序列化为POJO。POJO不仅提供了更好的类型安全和代码可读性,还能显著提高Flink应用的序列化和反序列化效率,是构建健壮、高性能流处理应用的基石。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
json数据格式
json数据格式

JSON是一种轻量级的数据交换格式。本专题为大家带来json数据格式相关文章,帮助大家解决问题。

457

2023.08.07

json是什么
json是什么

JSON是一种轻量级的数据交换格式,具有简洁、易读、跨平台和语言的特点,JSON数据是通过键值对的方式进行组织,其中键是字符串,值可以是字符串、数值、布尔值、数组、对象或者null,在Web开发、数据交换和配置文件等方面得到广泛应用。本专题为大家提供json相关的文章、下载、课程内容,供大家免费下载体验。

547

2023.08.23

jquery怎么操作json
jquery怎么操作json

操作的方法有:1、“$.parseJSON(jsonString)”2、“$.getJSON(url, data, success)”;3、“$.each(obj, callback)”;4、“$.ajax()”。更多jquery怎么操作json的详细内容,可以访问本专题下面的文章。

335

2023.10.13

go语言处理json数据方法
go语言处理json数据方法

本专题整合了go语言中处理json数据方法,阅读专题下面的文章了解更多详细内容。

82

2025.09.10

kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

175

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

159

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

207

2024.02.23

Java 流式处理与 Apache Kafka 实战
Java 流式处理与 Apache Kafka 实战

本专题专注讲解 Java 在流式数据处理与消息队列系统中的应用,系统讲解 Apache Kafka 的基础概念、生产者与消费者模型、Kafka Streams 与 KSQL 流式处理框架、实时数据分析与监控,结合实际业务场景,帮助开发者构建 高吞吐量、低延迟的实时数据流管道,实现高效的数据流转与处理。

173

2026.02.04

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

76

2026.03.11

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 4.4万人学习

C# 教程
C# 教程

共94课时 | 11.2万人学习

Java 教程
Java 教程

共578课时 | 81.4万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号