0

0

在 Apache Flink 中高效读取带键(Keyed)的 Kafka 记录

聖光之護

聖光之護

发布时间:2025-11-05 14:35:02

|

191人浏览过

|

来源于php中文网

原创

在 Apache Flink 中高效读取带键(Keyed)的 Kafka 记录

本教程详细阐述了如何在 apache flink 中使用 `kafkasource` 读取带键(keyed)的 kafka 记录。通过实现自定义的 `kafkarecorddeserializationschema`,用户可以灵活地访问 kafka `consumerrecord` 中的键、值、时间戳及其他元数据,从而构建更丰富的数据处理逻辑,克服了默认 `valueonly` 模式的局限性。

当从 Apache Kafka 消费数据时,生产者通常会为记录同时指定键(Key)和值(Value),尤其是在需要进行日志压缩、状态管理或基于键的路由等场景中。Apache Flink 的 KafkaSource 是一个强大的连接器,用于与 Kafka 进行集成。然而,默认的反序列化策略,例如 KafkaRecordDeserializationSchema.valueOnly(),仅提取记录的值,使得键和其他重要的元数据无法直接访问。为了在 Flink 中充分利用带键的 Kafka 记录,需要采用一种自定义的反序列化方法。

理解 KafkaRecordDeserializationSchema 的作用

在 Flink 中读取带键的 Kafka 记录的核心在于实现一个自定义的 KafkaRecordDeserializationSchema。这个接口提供了一个 deserialize 方法,它接收一个 ConsumerRecord<byte[], byte[]> 对象。这个 ConsumerRecord 对象封装了 Kafka 记录的所有组件,包括原始的键字节数组、值字节数组、时间戳、主题、分区和偏移量等。通过实现此接口,您可以定义如何将这些原始字节数据转换为 Flink 应用程序所需的特定数据类型。

实现自定义的 Kafka 记录反序列化

以下步骤将指导您如何创建一个自定义的反序列化器,以从带键的 Kafka 记录中提取键、值和时间戳,并在 Flink DataStream 中进行处理。

1. 定义数据传输对象 (DTO)

首先,我们需要一个 Java 类来封装从 Kafka 记录中提取的键、值和时间戳。这个类通常被称为 POJO (Plain Old Java Object),并应遵循 Flink 的 POJO 规则(例如,所有字段都必须是 public 或有 getter/setter 方法,并且必须有一个无参构造函数)。

import java.io.Serializable;

public class KeyedKafkaRecord implements Serializable {
    private String key;
    private String value;
    private long timestamp;
    // 可根据需要添加其他元数据,例如 topic, partition, offset

    public KeyedKafkaRecord() {} // Flink POJO 要求无参构造函数

    public KeyedKafkaRecord(String key, String value, long timestamp) {
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
    }

    public String getKey() { return key; }
    public void setKey(String key) { this.key = key; }

    public String getValue() { return value; }
    public void setValue(String value) { this.value = value; }

    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }

    @Override
    public String toString() {
        return "KeyedKafkaRecord{" +
               "key='" + key + '\'' +
               ", value='" + value + '\'' +
               ", timestamp=" + timestamp +
               '}';
    }
}

2. 实现自定义 KafkaRecordDeserializationSchema

接下来,创建一个实现了 KafkaRecordDeserializationSchema<KeyedKafkaRecord> 接口的类。在这个类的 deserialize 方法中,我们将使用 Kafka 内置的 StringDeserializer 来解析 ConsumerRecord 中的键和值字节数组。

wifi优化大师app v1.0.1 安卓版
wifi优化大师app v1.0.1 安卓版

Wifi优化大师最新版是一款免费的手机应用程序,专为优化 Wi-Fi 体验而设计。它提供以下功能: 增强信号:提高 Wi-Fi 信号强度,防止网络中断。 加速 Wi-Fi:提升上网速度,带来更流畅的体验。 Wi-Fi 安检:检测同时在线设备,防止蹭网。 硬件加速:优化硬件传输性能,提升连接效率。 网速测试:实时监控网络速度,轻松获取网络状态。 Wifi优化大师还支持一键连接、密码记录和上网安全测试,为用户提供全面的 Wi-Fi 管理体验。

下载
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;

public class CustomKeyedKafkaDeserializationSchema implements KafkaRecordDeserializationSchema<KeyedKafkaRecord> {

    private transient StringDeserializer keyDeserializer;
    private transient StringDeserializer valueDeserializer;

    @Override
    public void open(KafkaRecordDeserializationSchema.InitializationContext context) throws Exception {
        // 在反序列化器初始化时创建 Kafka Deserializer 实例
        keyDeserializer = new StringDeserializer();
        valueDeserializer = new StringDeserializer();
        // 如果需要配置,可以在这里进行,例如 keyDeserializer.configure(configs, true);
    }

    @Override
    public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<KeyedKafkaRecord> out) throws IOException {
        // 使用 Kafka StringDeserializer 反序列化键和值
        String key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key());
        String value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value());
        long timestamp = record.timestamp(); // 获取记录的时间戳

        // 将反序列化后的数据封装到自定义的 DTO 中
        out.collect(new KeyedKafkaRecord(key, value, timestamp));
    }

    @Override
    public TypeInformation<KeyedKafkaRecord> getProducedTypeInfo() {
        // 返回反序列化器产生的数据类型信息
        return TypeInformation.of(KeyedKafkaRecord.class);
    }
}

注意: open 方法用于初始化反序列化器实例,确保它们在运行时可用。getProducedTypeInfo() 方法必须返回您自定义的 KeyedKafkaRecord 类的 TypeInformation。

3. 配置 Flink KafkaSource

最后,将自定义的 CustomKeyedKafkaDeserializationSchema 应用到 KafkaSource 的构建器中。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

public class FlinkKeyedKafkaConsumerJob {
    public static void main(String[] args) throws Exception {
        // 获取 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String bootstrapServers = "localhost:9092"; // 替换为您的 Kafka 集群地址
        String topic = "test3"; // 替换为您的 Kafka 主题
        String groupId = "my-flink-consumer-group"; // 消费者组 ID

        // 构建 KafkaSource
        KafkaSource<KeyedKafkaRecord> source = KafkaSource.<KeyedKafkaRecord>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics(topic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest()) // 从最早的偏移量开始消费
                .setDeserializer(new CustomKeyedKafkaDeserializationSchema()) // 使用自定义的反序列化器
                .build();

        // 从 Kafka Source 创建数据流
        DataStream<KeyedKafkaRecord> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Keyed Kafka Source");

        // 对接收到的数据进行处理并打印
        stream.map(record -> "Received Key: " + record.getKey() +
                             ", Value: " + record.getValue() +
                             ", Timestamp: " + record.getTimestamp())
              .print();

        // 执行 Flink 作业
        env.execute("Flink Keyed Kafka Consumer Job");
    }
}

注意事项与进阶用法

  • 键和值的类型: 示例中使用了 StringDeserializer,适用于键和值都是字符串的情况。如果您的键或值是其他类型(例如 Long, Integer, ByteArray),您应该使用相应的 Kafka 内置反序列化器(如 LongDeserializer, IntegerDeserializer)或实现自定义的 org.apache.kafka.common.serialization.Deserializer<T>。
  • 错误处理: 在 deserialize 方法中,如果反序列化过程中发生错误(例如数据格式不匹配),可以捕获异常并选择跳过该记录、记录错误日志或抛出异常以使 Flink 作业失败。
  • 访问其他元数据: ConsumerRecord 对象还提供了 partition(), offset(), headers() 等方法。您可以根据需要将这些信息也包含在 KeyedKafkaRecord 中,以丰富数据上下文。
  • 性能考量: 对于大规模数据流,自定义反序列化器的性能至关重要。确保反序列化逻辑高效且避免不必要的开销。
  • 复杂数据格式: 如果 Kafka 消息使用 Avro、Protobuf、JSON Schema 等复杂数据格式,您需要引入相应的序列化/反序列化库(例如 Confluent Schema Registry 提供的 KafkaAvroDeserializer)在 deserialize 方法中处理原始的 byte[]。

总结

通过实现 KafkaRecordDeserializationSchema 接口,Apache Flink 能够灵活地处理带键的 Kafka 记录,并提取出包括键、值、时间戳在内的所有重要元数据。这种方法为构建更复杂、更精细的 Flink 流处理应用提供了坚实的基础,特别是在需要基于键进行状态管理、数据去重或关联的场景中,它使得 Flink 能够充分利用 Kafka 消息的完整语义信息。

相关文章

Kafka Eagle可视化工具
Kafka Eagle可视化工具

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
json数据格式
json数据格式

JSON是一种轻量级的数据交换格式。本专题为大家带来json数据格式相关文章,帮助大家解决问题。

456

2023.08.07

json是什么
json是什么

JSON是一种轻量级的数据交换格式,具有简洁、易读、跨平台和语言的特点,JSON数据是通过键值对的方式进行组织,其中键是字符串,值可以是字符串、数值、布尔值、数组、对象或者null,在Web开发、数据交换和配置文件等方面得到广泛应用。本专题为大家提供json相关的文章、下载、课程内容,供大家免费下载体验。

547

2023.08.23

jquery怎么操作json
jquery怎么操作json

操作的方法有:1、“$.parseJSON(jsonString)”2、“$.getJSON(url, data, success)”;3、“$.each(obj, callback)”;4、“$.ajax()”。更多jquery怎么操作json的详细内容,可以访问本专题下面的文章。

335

2023.10.13

go语言处理json数据方法
go语言处理json数据方法

本专题整合了go语言中处理json数据方法,阅读专题下面的文章了解更多详细内容。

82

2025.09.10

kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

175

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

159

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

207

2024.02.23

Java 流式处理与 Apache Kafka 实战
Java 流式处理与 Apache Kafka 实战

本专题专注讲解 Java 在流式数据处理与消息队列系统中的应用,系统讲解 Apache Kafka 的基础概念、生产者与消费者模型、Kafka Streams 与 KSQL 流式处理框架、实时数据分析与监控,结合实际业务场景,帮助开发者构建 高吞吐量、低延迟的实时数据流管道,实现高效的数据流转与处理。

172

2026.02.04

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

76

2026.03.11

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 4.3万人学习

C# 教程
C# 教程

共94课时 | 11.2万人学习

Java 教程
Java 教程

共578课时 | 81万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号