
本教程详细阐述了如何在 apache flink 中使用 `kafkasource` 读取带键(keyed)的 kafka 记录。通过实现自定义的 `kafkarecorddeserializationschema`,用户可以灵活地访问 kafka `consumerrecord` 中的键、值、时间戳及其他元数据,从而构建更丰富的数据处理逻辑,克服了默认 `valueonly` 模式的局限性。
当从 Apache Kafka 消费数据时,生产者通常会为记录同时指定键(Key)和值(Value),尤其是在需要进行日志压缩、状态管理或基于键的路由等场景中。Apache Flink 的 KafkaSource 是一个强大的连接器,用于与 Kafka 进行集成。然而,默认的反序列化策略,例如 KafkaRecordDeserializationSchema.valueOnly(),仅提取记录的值,使得键和其他重要的元数据无法直接访问。为了在 Flink 中充分利用带键的 Kafka 记录,需要采用一种自定义的反序列化方法。
在 Flink 中读取带键的 Kafka 记录的核心在于实现一个自定义的 KafkaRecordDeserializationSchema。这个接口提供了一个 deserialize 方法,它接收一个 ConsumerRecord<byte[], byte[]> 对象。这个 ConsumerRecord 对象封装了 Kafka 记录的所有组件,包括原始的键字节数组、值字节数组、时间戳、主题、分区和偏移量等。通过实现此接口,您可以定义如何将这些原始字节数据转换为 Flink 应用程序所需的特定数据类型。
以下步骤将指导您如何创建一个自定义的反序列化器,以从带键的 Kafka 记录中提取键、值和时间戳,并在 Flink DataStream 中进行处理。
首先,我们需要一个 Java 类来封装从 Kafka 记录中提取的键、值和时间戳。这个类通常被称为 POJO (Plain Old Java Object),并应遵循 Flink 的 POJO 规则(例如,所有字段都必须是 public 或有 getter/setter 方法,并且必须有一个无参构造函数)。
import java.io.Serializable;
public class KeyedKafkaRecord implements Serializable {
private String key;
private String value;
private long timestamp;
// 可根据需要添加其他元数据,例如 topic, partition, offset
public KeyedKafkaRecord() {} // Flink POJO 要求无参构造函数
public KeyedKafkaRecord(String key, String value, long timestamp) {
this.key = key;
this.value = value;
this.timestamp = timestamp;
}
public String getKey() { return key; }
public void setKey(String key) { this.key = key; }
public String getValue() { return value; }
public void setValue(String value) { this.value = value; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
@Override
public String toString() {
return "KeyedKafkaRecord{" +
"key='" + key + '\'' +
", value='" + value + '\'' +
", timestamp=" + timestamp +
'}';
}
}接下来,创建一个实现了 KafkaRecordDeserializationSchema<KeyedKafkaRecord> 接口的类。在这个类的 deserialize 方法中,我们将使用 Kafka 内置的 StringDeserializer 来解析 ConsumerRecord 中的键和值字节数组。
Wifi优化大师最新版是一款免费的手机应用程序,专为优化 Wi-Fi 体验而设计。它提供以下功能: 增强信号:提高 Wi-Fi 信号强度,防止网络中断。 加速 Wi-Fi:提升上网速度,带来更流畅的体验。 Wi-Fi 安检:检测同时在线设备,防止蹭网。 硬件加速:优化硬件传输性能,提升连接效率。 网速测试:实时监控网络速度,轻松获取网络状态。 Wifi优化大师还支持一键连接、密码记录和上网安全测试,为用户提供全面的 Wi-Fi 管理体验。
0
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
public class CustomKeyedKafkaDeserializationSchema implements KafkaRecordDeserializationSchema<KeyedKafkaRecord> {
private transient StringDeserializer keyDeserializer;
private transient StringDeserializer valueDeserializer;
@Override
public void open(KafkaRecordDeserializationSchema.InitializationContext context) throws Exception {
// 在反序列化器初始化时创建 Kafka Deserializer 实例
keyDeserializer = new StringDeserializer();
valueDeserializer = new StringDeserializer();
// 如果需要配置,可以在这里进行,例如 keyDeserializer.configure(configs, true);
}
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<KeyedKafkaRecord> out) throws IOException {
// 使用 Kafka StringDeserializer 反序列化键和值
String key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key());
String value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value());
long timestamp = record.timestamp(); // 获取记录的时间戳
// 将反序列化后的数据封装到自定义的 DTO 中
out.collect(new KeyedKafkaRecord(key, value, timestamp));
}
@Override
public TypeInformation<KeyedKafkaRecord> getProducedTypeInfo() {
// 返回反序列化器产生的数据类型信息
return TypeInformation.of(KeyedKafkaRecord.class);
}
}注意: open 方法用于初始化反序列化器实例,确保它们在运行时可用。getProducedTypeInfo() 方法必须返回您自定义的 KeyedKafkaRecord 类的 TypeInformation。
最后,将自定义的 CustomKeyedKafkaDeserializationSchema 应用到 KafkaSource 的构建器中。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
public class FlinkKeyedKafkaConsumerJob {
public static void main(String[] args) throws Exception {
// 获取 Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String bootstrapServers = "localhost:9092"; // 替换为您的 Kafka 集群地址
String topic = "test3"; // 替换为您的 Kafka 主题
String groupId = "my-flink-consumer-group"; // 消费者组 ID
// 构建 KafkaSource
KafkaSource<KeyedKafkaRecord> source = KafkaSource.<KeyedKafkaRecord>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(topic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest()) // 从最早的偏移量开始消费
.setDeserializer(new CustomKeyedKafkaDeserializationSchema()) // 使用自定义的反序列化器
.build();
// 从 Kafka Source 创建数据流
DataStream<KeyedKafkaRecord> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Keyed Kafka Source");
// 对接收到的数据进行处理并打印
stream.map(record -> "Received Key: " + record.getKey() +
", Value: " + record.getValue() +
", Timestamp: " + record.getTimestamp())
.print();
// 执行 Flink 作业
env.execute("Flink Keyed Kafka Consumer Job");
}
}通过实现 KafkaRecordDeserializationSchema 接口,Apache Flink 能够灵活地处理带键的 Kafka 记录,并提取出包括键、值、时间戳在内的所有重要元数据。这种方法为构建更复杂、更精细的 Flink 流处理应用提供了坚实的基础,特别是在需要基于键进行状态管理、数据去重或关联的场景中,它使得 Flink 能够充分利用 Kafka 消息的完整语义信息。
以上就是在 Apache Flink 中高效读取带键(Keyed)的 Kafka 记录的详细内容,更多请关注php中文网其它相关文章!
Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号