
在CentOS系统上整合Hadoop分布式文件系统(HDFS)与Apache Kafka,通常会把Kafka当作数据的生成器或接收器,并且将数据存储到HDFS或者从HDFS获取数据。下面是一个简化版的应用场景,演示了怎样利用Kafka把数据存储到HDFS。
场景:利用Kafka向HDFS存入数据
- 初始化设置:
- 确认CentOS里已经装好了Hadoop和Kafka。
- 对Kafka的生成器和接收器做好配置。
- Kafka生成器配置:
- 建立一个Kafka主题,用来生成数据。
kafka-topics.sh --create --topic order-created-topic --partitions 12 --replication-factor 3 --bootstrap-server localhost:9092
- 构建Kafka生成器代码:
- 利用Kafka Producer API把数据传送到Kafka主题。
Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer(props); producer.send(new ProducerRecord("order-created-topic", orderId, orderJson)); producer.close();
- 数据导入HDFS:
- 在Kafka接收器中读取数据,并且把数据写进HDFS。可以采用Spark Streaming之类的工具来达成实时数据处理和存储。
SparkConf conf = new SparkConf().setAppName("Kafka to HDFS");
conf.setMaster("local[*]");
JavaPairRDD lines = KafkaUtils.createDirectStream(
conf,
"order-created-topic",
new StringDeserializer(),
new StringDeserializer()
).mapToPair(record -> new Tuple2(record.value(), record.key()));
lines.saveAsHadoopFile("/path/to/hdfs/directory",
new TextOutputFormat(),
"org.apache.hadoop.mapred.lib.MultipleTextOutputFormat",
new Configuration(false)
);
- 启动与监控:
- 启动Kafka生成器和接收器程序。
- 检查HDFS确保数据已成功存入。
请记住,上述代码样本和配置或许得依据实际环境做出改动。在真实应用里,还需要顾及到数据的序列化方法、错误处理、资源配置等细节。另外,对于生产环境,还需考量安全配置,例如SSL/TLS加密以及认证。











