需借助第三方组件实现Kafka Connect读取XML:一、用FilePulse Connector配合Confluent XML Converter,配置XPath解析与JSON转换;二、开发自定义Source Connector,基于DOM/SAX解析并构造SourceRecord;三、以Logstash为中间层,通过XML filter提取字段后推送至Kafka。

如果您希望使用Kafka Connect从本地或网络文件系统读取XML格式数据并将其写入Kafka Topic,则需借助支持XML解析的Connector。Kafka Connect原生不支持XML解析,必须通过第三方转换器或自定义Source Connector实现。以下是可行的操作路径:
一、使用Confluent XML Converter配合FilePulse Connector
FilePulse Connector是开源的通用文件源连接器,支持多种格式解析,配合Confluent提供的XML Converter可将XML内容结构化为JSON Schema格式记录。
1、下载FilePulse Connector JAR包并放入Kafka Connect插件目录(如connect-plugins/)。
2、下载confluentinc/kafka-connect-xml并确保其JAR文件位于同一插件路径下。
3、启动Connect分布式集群后,提交以下REST配置:
4、在POST请求体中指定"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"。
5、设置"tasks.max": "1"与"fs.scan.directory.path": "/path/to/xml/files"指向XML所在目录。
6、配置"file.filter.regex.pattern": ".*\\.xml$"仅匹配XML文件。
7、启用XML解析:"format.type": "xml",并设置"xml.xpath.expression": "/root/record"提取每条记录节点。
8、指定"transforms": "unwrap"与"transforms.unwrap.type": "org.apache.kafka.connect.transforms.Flatten$Value"展平嵌套字段。
9、将"key.converter": "org.apache.kafka.connect.storage.StringConverter"和"value.converter": "io.confluent.connect.xml.XmlConverter"设为对应转换器类。
10、设置"value.converter.schema.registry.url": "http://schema-registry:8081"(若启用Schema Registry)。
二、编写自定义XML Source Connector
当标准Connector无法满足复杂XML结构(如混合文本/属性/命名空间)时,需开发继承SourceConnector与SourceTask的Java实现,直接解析DOM/SAX/StAX流并构造SourceRecord对象。
1、创建Maven模块,依赖kafka-connect-api与woodstox-core(用于稳健XML解析)。
2、在XmlSourceConnector中重写configDef()方法,暴露xml.input.path、xml.record.xpath等配置项。
3、于XmlSourceTask中初始化XMLInputFactory,逐文件打开输入流。
4、使用XPath定位目标记录节点,对每个匹配节点调用node.toString()或序列化为Map结构。
5、将每条解析结果封装为Struct,依据预定义Schema生成SourceRecord实例。
6、在poll()方法中返回记录列表,并设置offset以支持断点续传(如记录文件名+行号或最后修改时间戳)。
7、打包为fat JAR,放入Connect插件目录并重启worker进程。
8、通过REST API注册该Connector,填入"connector.class": "com.example.kafka.connect.xml.XmlSourceConnector"。
9、配置"xml.input.path": "/data/inbound"与"topic": "xml-ingest-topic"。
10、确认status返回"RUNNING"且日志中出现"Processed N records from file X.xml"。
三、使用Logstash + Kafka Output作为替代管道
若Kafka Connect部署受限或需快速验证流程,可用Logstash作为中间解析层:它内置XML filter插件,支持XPath提取、字段映射与嵌套展开,再通过kafka output插件推送至Topic。
1、安装Logstash并确保logstash-filter-xml与logstash-output-kafka已加载。
2、创建配置文件xml-to-kafka.conf,在input段设置file插件,指定path => "/var/log/xml/*.xml"。
3、添加start_position => "beginning"与sincedb_path => "/dev/null"确保首次读取全部内容。
4、在filter段加入xml插件,设置source => "message"与target => "parsed"。
5、配置xpath => { "/root/item/text()" => "item_text" }提取关键字段。
6、使用mutate插件重命名或删除冗余字段,如remove_field => ["message", "@version"]。
7、在output段配置kafka插件,指定bootstrap_servers => "kafka:9092"与topic_id => "xml-topic"。
8、设置codec => json确保消息体为JSON格式。
9、执行logstash -f xml-to-kafka.conf启动管道。
10、监控Kafka Topic是否接收到结构化JSON消息,字段与XPath提取一致。











