首页 > Java > java教程 > 正文

在 Apache Flink 中高效读取带键(Keyed)的 Kafka 记录

聖光之護
发布: 2025-11-05 14:35:02
原创
170人浏览过

在 Apache Flink 中高效读取带键(Keyed)的 Kafka 记录

本教程详细阐述了如何在 apache flink 中使用 `kafkasource` 读取带键(keyed)的 kafka 记录。通过实现自定义的 `kafkarecorddeserializationschema`,用户可以灵活地访问 kafka `consumerrecord` 中的键、值、时间戳及其他元数据,从而构建更丰富的数据处理逻辑,克服了默认 `valueonly` 模式的局限性。

当从 Apache Kafka 消费数据时,生产者通常会为记录同时指定键(Key)和值(Value),尤其是在需要进行日志压缩、状态管理或基于键的路由等场景中。Apache Flink 的 KafkaSource 是一个强大的连接器,用于与 Kafka 进行集成。然而,默认的反序列化策略,例如 KafkaRecordDeserializationSchema.valueOnly(),仅提取记录的值,使得键和其他重要的元数据无法直接访问。为了在 Flink 中充分利用带键的 Kafka 记录,需要采用一种自定义的反序列化方法。

理解 KafkaRecordDeserializationSchema 的作用

在 Flink 中读取带键的 Kafka 记录的核心在于实现一个自定义的 KafkaRecordDeserializationSchema。这个接口提供了一个 deserialize 方法,它接收一个 ConsumerRecord<byte[], byte[]> 对象。这个 ConsumerRecord 对象封装了 Kafka 记录的所有组件,包括原始的键字节数组、值字节数组、时间戳、主题、分区和偏移量等。通过实现此接口,您可以定义如何将这些原始字节数据转换为 Flink 应用程序所需的特定数据类型。

实现自定义的 Kafka 记录反序列化

以下步骤将指导您如何创建一个自定义的反序列化器,以从带键的 Kafka 记录中提取键、值和时间戳,并在 Flink DataStream 中进行处理。

1. 定义数据传输对象 (DTO)

首先,我们需要一个 Java 类来封装从 Kafka 记录中提取的键、值和时间戳。这个类通常被称为 POJO (Plain Old Java Object),并应遵循 Flink 的 POJO 规则(例如,所有字段都必须是 public 或有 getter/setter 方法,并且必须有一个无参构造函数)。

import java.io.Serializable;

public class KeyedKafkaRecord implements Serializable {
    private String key;
    private String value;
    private long timestamp;
    // 可根据需要添加其他元数据,例如 topic, partition, offset

    public KeyedKafkaRecord() {} // Flink POJO 要求无参构造函数

    public KeyedKafkaRecord(String key, String value, long timestamp) {
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
    }

    public String getKey() { return key; }
    public void setKey(String key) { this.key = key; }

    public String getValue() { return value; }
    public void setValue(String value) { this.value = value; }

    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }

    @Override
    public String toString() {
        return "KeyedKafkaRecord{" +
               "key='" + key + '\'' +
               ", value='" + value + '\'' +
               ", timestamp=" + timestamp +
               '}';
    }
}
登录后复制

2. 实现自定义 KafkaRecordDeserializationSchema

接下来,创建一个实现了 KafkaRecordDeserializationSchema<KeyedKafkaRecord> 接口的类。在这个类的 deserialize 方法中,我们将使用 Kafka 内置的 StringDeserializer 来解析 ConsumerRecord 中的键和值字节数组。

wifi优化大师app v1.0.1 安卓版
wifi优化大师app v1.0.1 安卓版

Wifi优化大师最新版是一款免费的手机应用程序,专为优化 Wi-Fi 体验而设计。它提供以下功能: 增强信号:提高 Wi-Fi 信号强度,防止网络中断。 加速 Wi-Fi:提升上网速度,带来更流畅的体验。 Wi-Fi 安检:检测同时在线设备,防止蹭网。 硬件加速:优化硬件传输性能,提升连接效率。 网速测试:实时监控网络速度,轻松获取网络状态。 Wifi优化大师还支持一键连接、密码记录和上网安全测试,为用户提供全面的 Wi-Fi 管理体验。

wifi优化大师app v1.0.1 安卓版 0
查看详情 wifi优化大师app v1.0.1 安卓版
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;

public class CustomKeyedKafkaDeserializationSchema implements KafkaRecordDeserializationSchema<KeyedKafkaRecord> {

    private transient StringDeserializer keyDeserializer;
    private transient StringDeserializer valueDeserializer;

    @Override
    public void open(KafkaRecordDeserializationSchema.InitializationContext context) throws Exception {
        // 在反序列化器初始化时创建 Kafka Deserializer 实例
        keyDeserializer = new StringDeserializer();
        valueDeserializer = new StringDeserializer();
        // 如果需要配置,可以在这里进行,例如 keyDeserializer.configure(configs, true);
    }

    @Override
    public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<KeyedKafkaRecord> out) throws IOException {
        // 使用 Kafka StringDeserializer 反序列化键和值
        String key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key());
        String value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value());
        long timestamp = record.timestamp(); // 获取记录的时间戳

        // 将反序列化后的数据封装到自定义的 DTO 中
        out.collect(new KeyedKafkaRecord(key, value, timestamp));
    }

    @Override
    public TypeInformation<KeyedKafkaRecord> getProducedTypeInfo() {
        // 返回反序列化器产生的数据类型信息
        return TypeInformation.of(KeyedKafkaRecord.class);
    }
}
登录后复制

注意: open 方法用于初始化反序列化器实例,确保它们在运行时可用。getProducedTypeInfo() 方法必须返回您自定义的 KeyedKafkaRecord 类的 TypeInformation。

3. 配置 Flink KafkaSource

最后,将自定义的 CustomKeyedKafkaDeserializationSchema 应用到 KafkaSource 的构建器中。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

public class FlinkKeyedKafkaConsumerJob {
    public static void main(String[] args) throws Exception {
        // 获取 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String bootstrapServers = "localhost:9092"; // 替换为您的 Kafka 集群地址
        String topic = "test3"; // 替换为您的 Kafka 主题
        String groupId = "my-flink-consumer-group"; // 消费者组 ID

        // 构建 KafkaSource
        KafkaSource<KeyedKafkaRecord> source = KafkaSource.<KeyedKafkaRecord>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics(topic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest()) // 从最早的偏移量开始消费
                .setDeserializer(new CustomKeyedKafkaDeserializationSchema()) // 使用自定义的反序列化器
                .build();

        // 从 Kafka Source 创建数据流
        DataStream<KeyedKafkaRecord> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Keyed Kafka Source");

        // 对接收到的数据进行处理并打印
        stream.map(record -> "Received Key: " + record.getKey() +
                             ", Value: " + record.getValue() +
                             ", Timestamp: " + record.getTimestamp())
              .print();

        // 执行 Flink 作业
        env.execute("Flink Keyed Kafka Consumer Job");
    }
}
登录后复制

注意事项与进阶用法

  • 键和值的类型: 示例中使用了 StringDeserializer,适用于键和值都是字符串的情况。如果您的键或值是其他类型(例如 Long, Integer, ByteArray),您应该使用相应的 Kafka 内置反序列化器(如 LongDeserializer, IntegerDeserializer)或实现自定义的 org.apache.kafka.common.serialization.Deserializer<T>。
  • 错误处理: 在 deserialize 方法中,如果反序列化过程中发生错误(例如数据格式不匹配),可以捕获异常并选择跳过该记录、记录错误日志或抛出异常以使 Flink 作业失败。
  • 访问其他元数据: ConsumerRecord 对象还提供了 partition(), offset(), headers() 等方法。您可以根据需要将这些信息也包含在 KeyedKafkaRecord 中,以丰富数据上下文。
  • 性能考量: 对于大规模数据流,自定义反序列化器的性能至关重要。确保反序列化逻辑高效且避免不必要的开销。
  • 复杂数据格式: 如果 Kafka 消息使用 Avro、Protobuf、JSON Schema 等复杂数据格式,您需要引入相应的序列化/反序列化库(例如 Confluent Schema Registry 提供的 KafkaAvroDeserializer)在 deserialize 方法中处理原始的 byte[]。

总结

通过实现 KafkaRecordDeserializationSchema 接口,Apache Flink 能够灵活地处理带键的 Kafka 记录,并提取出包括键、值、时间戳在内的所有重要元数据。这种方法为构建更复杂、更精细的 Flink 流处理应用提供了坚实的基础,特别是在需要基于键进行状态管理、数据去重或关联的场景中,它使得 Flink 能够充分利用 Kafka 消息的完整语义信息。

以上就是在 Apache Flink 中高效读取带键(Keyed)的 Kafka 记录的详细内容,更多请关注php中文网其它相关文章!

Kafka Eagle可视化工具
Kafka Eagle可视化工具

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号