
在 Apache Flink 流处理应用中,处理来自 Kafka、文件或其他数据源的 JSON 格式字符串数据是常见的场景。开发者通常需要将这些原始字符串解析成结构化的 JSONObject 对象,以便进行后续的字段提取、转换或业务逻辑处理。然而,在尝试将解析后的 JSONObject 通过 Flink 的 Collector 发出时,有时会遇到令人困惑的 NullPointerException: Assigned key must not be null! 错误,即使调试显示字符串已成功解析为 JSONObject 实例。本文将深入探讨这一问题,并提供一个可靠的解决方案及最佳实践建议。
考虑以下 Flink 任务代码片段,其目标是将一个包含 JSON 字符串的 DataStream<String> 转换为 DataStream<JSONObject>:
import com.alibaba.fastjson.JSONObject; // 假设使用Fastjson
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class FlinkJsonParseIssue {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputDS = env.fromElements(
"{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
);
SingleOutputStreamOperator<JSONObject> jsonObjDS = inputDS.process(new ProcessFunction<String, JSONObject>() {
@Override
public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
// 尝试使用 JSONObject.parseObject 进行解析
JSONObject jsonObject = JSONObject.parseObject(value);
out.collect(jsonObject); // 在此处抛出异常
}
});
jsonObjDS.print();
env.execute();
}
}当运行上述代码时,尽管在 processElement 内部调试发现 JSONObject.parseObject(value) 确实成功生成了 JSONObject 实例,但在调用 out.collect(jsonObject) 时,程序却抛出了以下运行时异常:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy Caused by: java.lang.RuntimeException: Assigned key must not be null! Caused by: java.lang.NullPointerException: Assigned key must not be null!
这个错误信息 Assigned key must not be null! 通常与 Flink 的状态管理或某些内部序列化机制有关,但在此场景下,它并非直接指向用户代码中显式设置的 Key 为 null。这表明问题可能出在 JSONObject 对象本身的某种特性,或者其与 Flink 内部序列化机制的兼容性上。
经过分析和实践,我们发现问题可能与所使用的 JSON 库及其解析方式有关。如果采用 org.json 库,并使用其构造函数进行解析,可以有效避免上述问题。
首先,确保项目中引入了 org.json 库的依赖:
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180130</version> <!-- 或更高版本 -->
</dependency>然后,修改 processElement 方法中的 JSON 解析逻辑,使用 org.json.JSONObject 的构造函数来创建对象:
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.json.JSONObject; // 注意这里导入的是 org.json.JSONObject
public class FlinkJsonParseSolution {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputDS = env.fromElements(
"{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
);
SingleOutputStreamOperator<JSONObject> jsonObjDS = inputDS.process(new ProcessFunction<String, JSONObject>() {
@Override
public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
// 使用 org.json.JSONObject 的构造函数进行解析
JSONObject jsonObject = new JSONObject(value);
out.collect(jsonObject); // 现在可以正常收集
}
});
jsonObjDS.print();
env.execute();
}
}修改后,程序将能够顺利运行并打印出解析后的 JSONObject 内容,例如:
{"bill_info":{"ADDER_NAME":"sss","ADDER_NO":"0706","UPDATER_NAME":"ssss","UPDATER_NO":"0706","BILL_ID":"8687b584-038c-498c-8f97-ec1ca197da96","ADD_TIME":"2022-11-12 16:05:28:418","ORDER_ID":"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67","S_USER_ID":"s68","B_USER_ID":"b77","UPDATE_TIME":"2022-11-12 16:05:28:418"}}这表明 org.json.JSONObject 的构造函数方式与 Flink 的内部机制(特别是序列化)具有更好的兼容性,从而避免了 NullPointerException。
尽管上述方法可以解决 JSONObject 收集的问题,但在生产环境中,直接在 Flink 流中传递 JSONObject 实例通常不是最佳实践。
序列化与反序列化开销: JSONObject 对象通常包含复杂的内部结构,其序列化和反序列化成本较高。在 Flink 这种高性能的流处理框架中,频繁地序列化和反序列化 JSONObject 会带来显著的性能损耗,尤其是在数据量大或状态操作多的场景下。
类型安全与可读性: 使用 JSONObject 意味着在运行时通过字符串键访问数据,缺乏编译时检查,容易出错。代码可读性也相对较差。
推荐的最佳实践是:将 JSON 数据反序列化为 Plain Old Java Objects (POJOs)。
POJO 具有以下优势:
示例:将 JSON 反序列化为 POJO
假设我们的 JSON 结构可以映射到一个 BillInfo POJO:
// 定义 POJO 类
public class BillInfo {
private String ADD_TIME;
private String ORDER_ID;
private String ADDER_NO;
private String UPDATER_NO;
private String S_USER_ID;
private String B_USER_ID;
private String BILL_ID;
private String ADDER_NAME;
private String UPDATE_TIME;
private String UPDATER_NAME;
// 必须提供无参构造函数
public BillInfo() {}
// 提供所有字段的 Getter 和 Setter 方法
public String getADD_TIME() { return ADD_TIME; }
public void setADD_TIME(String ADD_TIME) { this.ADD_TIME = ADD_TIME; }
// ... 其他字段的 Getter/Setter
public String getORDER_ID() { return ORDER_ID; }
public void setORDER_ID(String ORDER_ID) { this.ORDER_ID = ORDER_ID; }
public String getADDER_NO() { return ADDER_NO; }
public void setADDER_NO(String ADDER_NO) { this.ADDER_NO = ADDER_NO; }
public String getUPDATER_NO() { return UPDATER_NO; }
public void setUPDATER_NO(String UPDATER_NO) { this.UPDATER_NO = UPDATER_NO; }
public String getS_USER_ID() { return S_USER_ID; }
public void setS_USER_ID(String S_USER_ID) { this.S_USER_ID = S_USER_ID; }
public String getB_USER_ID() { return B_USER_ID; }
public void setB_USER_ID(String B_USER_ID) { this.B_USER_ID = B_USER_ID; }
public String getBILL_ID() { return BILL_ID; }
public void setBILL_ID(String BILL_ID) { this.BILL_ID = BILL_ID; }
public String getADDER_NAME() { return ADDER_NAME; }
public void setADDER_NAME(String ADDER_NAME) { this.ADDER_NAME = ADDER_NAME; }
public String getUPDATE_TIME() { return UPDATE_TIME; }
public void setUPDATE_TIME(String UPDATE_TIME) { this.UPDATE_TIME = UPDATE_TIME; }
public String getUPDATER_NAME() { return UPDATER_NAME; }
public void setUPDATER_NAME(String UPDATER_NAME) { this.UPDATER_NAME = UPDATER_NAME; }
@Override
public String toString() {
return "BillInfo{" +
"ADD_TIME='" + ADD_TIME + '\'' +
", ORDER_ID='" + ORDER_ID + '\'' +
// ... 其他字段
'}';
}
}
// 在 Flink 任务中使用 Jackson 或 Gson 进行反序列化
import com.fasterxml.jackson.databind.ObjectMapper; // 假设使用Jackson
// 或 import com.google.gson.Gson;
public class FlinkJsonToPojoSolution {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputDS = env.fromElements(
"{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
);
SingleOutputStreamOperator<BillInfo> billInfoDS = inputDS.process(new ProcessFunction<String, BillInfo>() {
// ObjectMapper 是线程安全的,可以作为成员变量
private transient ObjectMapper objectMapper;
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
super.open(parameters);
objectMapper = new ObjectMapper();
}
@Override
public void processElement(String value, ProcessFunction<String, BillInfo>.Context ctx, Collector<BillInfo> out) throws Exception {
// 假设 JSON 字符串的根元素是包含 "bill_info" 的对象
// 需要先解析到 JsonNode 或 Map,再提取 "bill_info" 部分
// 或者如果确定字符串直接是 bill_info 的内容,可以直接反序列化
// 这里假设 value 是包含一个根键 "bill_info" 的完整 JSON 字符串
// 先解析为 JsonNode,然后提取 "bill_info" 的内容
com.fasterxml.jackson.databind.JsonNode rootNode = objectMapper.readTree(value);
com.fasterxml.jackson.databind.JsonNode billInfoNode = rootNode.get("bill_info");
if (billInfoNode != null) {
BillInfo billInfo = objectMapper.treeToValue(billInfoNode, BillInfo.class);
out.collect(billInfo);
} else {
// 处理 JSON 格式不符合预期的情况
System.err.println("JSON string missing 'bill_info' key: " + value);
}
}
});
billInfoDS.print();
env.execute();
}
}注意: 如果 JSON 字符串直接是 BillInfo 对象的内容(即没有外层的 {"bill_info": ...}),则 processElement 可以简化为 BillInfo billInfo = objectMapper.readValue(value, BillInfo.class);。上述示例处理了带有嵌套 bill_info 键的情况。
在 Flink 中处理 JSON 字符串时,将字符串解析为 JSONObject 并通过 Collector 发出可能因 JSON 库的选择和使用方式不当而导致 NullPointerException: Assigned key must not be null! 错误。通过使用 org.json 库并采用其构造函数 new JSONObject(value) 的方式,可以有效解决此问题。然而,从性能、类型安全和可维护性角度考虑,更推荐的做法是将 JSON 数据反序列化为 POJO。选择合适的 JSON 处理策略,将有助于构建更健壮、高效的 Flink 流处理应用。
以上就是Flink 中高效解析 JSON 字符串至 JSONObject 的实践指南的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号