0

0

在Spring Boot中获取Flink聚合结果:无界数据源的挑战与策略

心靈之曲

心靈之曲

发布时间:2025-08-23 16:26:11

|

719人浏览过

|

来源于php中文网

原创

在Spring Boot中获取Flink聚合结果:无界数据源的挑战与策略

本文探讨了在Spring Boot应用中通过API获取Flink聚合结果的挑战,尤其是在使用无界数据源时。由于无界流的持续性,直接在API响应中返回最终聚合结果不可行。教程将阐述将数据源转换为有界流的策略,例如通过指定Kafka的起止偏移量,以实现实时或准实时的聚合结果查询。同时,文章还将提供替代方案,如使用外部存储或异步通知,以应对无界流场景下的数据查询需求。

理解挑战:无界流与API响应模型

在spring boot应用程序中,当一个api端点被调用时,通常期望在请求-响应周期内获得一个确定的结果。然而,当这个api端点触发或查询一个基于apache flink的流处理程序,并且该程序使用了“无界数据源”(unbounded data source)时,会遇到一个根本性的矛盾。无界数据源,顾名思义,是持续不断产生数据的,没有明确的结束点。这意味着flink作业会持续运行、持续处理数据并更新其内部聚合状态,但永远不会有一个“最终”的聚合结果。因此,在api请求的当下,无法从一个仍在运行的无界流作业中获取一个固定的、代表最终状态的聚合结果并将其作为http响应返回。

问题的核心在于:

  • 无界流的持续性: Flink处理的是永不停止的数据流,聚合结果是动态变化的。
  • API的即时性要求: HTTP请求通常期望一个在短时间内完成并返回的确定性响应。

为了解决这一矛盾,我们需要重新思考如何将Flink的流处理能力与Spring Boot的请求-响应模型结合起来。

策略一:将无界数据源转换为有界查询

最直接的解决方案是将原本的无界数据源在特定查询场景下转换为有界数据源。这意味着在API被调用时,我们指示Flink处理一个明确定义的数据范围,从而产生一个确定的、可返回的聚合结果。

以Kafka为例实现有界查询

对于像Apache Kafka这样的消息队列,我们可以通过指定起始和结束偏移量(offsets)来将其无界特性“截断”为一个有界的数据集。当Spring Boot API被调用时,它可以在内部构建一个Flink作业,该作业仅消费Kafka主题中特定范围的数据。

寻鲸AI
寻鲸AI

寻鲸AI是一款功能强大的人工智能写作工具,支持对话提问、内置多场景写作模板如写作辅助类、营销推广类等,更能一键写作各类策划方案。

下载
// 假设在Spring Boot中动态构建并提交Flink作业
public List> getAggregatedDataFromKafka(
    String topic, long startOffset, long endOffset, int partition) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 生产环境建议使用RemoteEnvironment或Standalone模式
    env.setRuntimeMode(RuntimeMode.BATCH); // 对于有界查询,建议设置为BATCH模式

    // 构建Kafka源,指定起始和结束偏移量
    KafkaSource source = KafkaSource.builder()
        .setBootstrapServers("localhost:9092")
        .setTopics(topic)
        .setStartingOffsets(OffsetsInitializer.forSpecificOffsets(
            new HashMap() {{
                put(new TopicPartition(topic, partition), startOffset);
            }}
        ))
        // 设置结束偏移量,将其变为有界源
        .setBoundedStopOffsets(OffsetsInitializer.forSpecificOffsets(
            new HashMap() {{
                put(new TopicPartition(topic, partition), endOffset);
            }}
        ))
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();

    DataStream kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Bounded Source");

    // 示例:简单词频统计
    DataStream> aggregatedStream = kafkaStream
        .flatMap((String value, Collector>) out -> {
            for (String word : value.split(" ")) {
                if (!word.isEmpty()) {
                    out.collect(new Tuple2<>(word, 1L));
                }
            }
        })
        .keyBy(0)
        .sum(1);

    // 将结果收集到List中 (适用于小规模数据,且会阻塞API调用)
    List> results = new ArrayList<>();
    try (CloseableIterator> it = aggregatedStream.executeAndCollect()) {
        it.forEachRemaining(results::add);
    }

    return results;
}

注意事项:

  • 动态偏移量: 如何确定 startOffset 和 endOffset 是关键。这可能需要一个外部机制来跟踪Kafka的最新偏移量,或者API调用者指定一个时间范围,然后转换为偏移量。
  • 性能开销: 每次API调用都启动一个新的Flink作业(即使是短暂的批处理模式)可能会有较高的启动开销。对于高并发场景,这可能不是最佳选择。
  • 结果捕获: executeAndCollect()方法会阻塞调用线程,并且在处理大规模数据时可能导致内存溢出。它更适用于测试或小规模数据查询。在生产环境中,更推荐将结果写入外部存储(参见策略二)。

策略二:外部状态存储与API查询

对于需要持续处理无界流并提供最新聚合结果的场景,最佳实践是让Flink作业将其聚合结果持续写入一个外部存储系统。Spring Boot应用程序的API则负责查询这个外部存储,而不是直接与运行中的Flink作业交互。

工作流程:

  1. Flink作业: 持续从无界数据源读取数据,执行聚合逻辑,并将最新的聚合结果实时更新到外部存储。
  2. 外部存储: 可以是关系型数据库(如PostgreSQL, MySQL)、NoSQL数据库(如MongoDB, Cassandra)、键值存储(如Redis)、或搜索索引(如Elasticsearch)。
  3. Spring Boot API: 当API端点被调用时,它向外部存储发出查询请求,获取当前的聚合结果,并将其作为响应返回。

示例:Flink写入Redis,Spring Boot查询Redis

Flink作业(概念性代码):

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;

public class FlinkRedisSinkJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60000); // 启用检查点,每60秒一次

        // 假设从Kafka读取数据
        KafkaSource source = KafkaSource.builder()
            .setBootstrapServers("localhost:9092")
            .setTopics("my-topic")
            .setStartingOffsets(OffsetsInitializer.earliest()) // 无界源,从最早的可用偏移量开始
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();

        DataStream stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // 示例:统计每个单词的出现次数
        DataStream> wordCounts = stream
            .flatMap((String value, Collector>) out -> {
                for (String word : value.split(" ")) {
                    if (!word.isEmpty()) {
                        out.collect(new Tuple2<>(word, 1L));
                    }
                }
            })
            .keyBy(0)
            .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 5秒滚动窗口聚合
            .sum(1);

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
mysql修改数据表名
mysql修改数据表名

MySQL修改数据表:1、首先查看数据库中所有的表,代码为:‘SHOW TABLES;’;2、修改表名,代码为:‘ALTER TABLE 旧表名 RENAME [TO] 新表名;’。php中文网还提供MySQL的相关下载、相关课程等内容,供大家免费下载使用。

666

2023.06.20

MySQL创建存储过程
MySQL创建存储过程

存储程序可以分为存储过程和函数,MySQL中创建存储过程和函数使用的语句分别为CREATE PROCEDURE和CREATE FUNCTION。使用CALL语句调用存储过程智能用输出变量返回值。函数可以从语句外调用(通过引用函数名),也能返回标量值。存储过程也可以调用其他存储过程。php中文网还提供MySQL创建存储过程的相关下载、相关课程等内容,供大家免费下载使用。

247

2023.06.21

mongodb和mysql的区别
mongodb和mysql的区别

mongodb和mysql的区别:1、数据模型;2、查询语言;3、扩展性和性能;4、可靠性。本专题为大家提供mongodb和mysql的区别的相关的文章、下载、课程内容,供大家免费下载体验。

281

2023.07.18

mysql密码忘了怎么查看
mysql密码忘了怎么查看

MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS 应用软件之一。那么mysql密码忘了怎么办呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

515

2023.07.19

mysql创建数据库
mysql创建数据库

MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS 应用软件之一。那么mysql怎么创建数据库呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

256

2023.07.25

mysql默认事务隔离级别
mysql默认事务隔离级别

MySQL是一种广泛使用的关系型数据库管理系统,它支持事务处理。事务是一组数据库操作,它们作为一个逻辑单元被一起执行。为了保证事务的一致性和隔离性,MySQL提供了不同的事务隔离级别。php中文网给大家带来了相关的教程以及文章欢迎大家前来学习阅读。

386

2023.08.08

sqlserver和mysql区别
sqlserver和mysql区别

SQL Server和MySQL是两种广泛使用的关系型数据库管理系统。它们具有相似的功能和用途,但在某些方面存在一些显著的区别。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

531

2023.08.11

mysql忘记密码
mysql忘记密码

MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。那么忘记mysql密码我们该怎么解决呢?php中文网给大家带来了相关的教程以及其他关于mysql的文章,欢迎大家前来学习阅读。

600

2023.08.14

拼多多赚钱的5种方法 拼多多赚钱的5种方法
拼多多赚钱的5种方法 拼多多赚钱的5种方法

在拼多多上赚钱主要可以通过无货源模式一件代发、精细化运营特色店铺、参与官方高流量活动、利用拼团机制社交裂变,以及成为多多进宝推广员这5种方法实现。核心策略在于通过低成本、高效率的供应链管理与营销,利用平台社交电商红利实现盈利。

1

2026.01.26

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
MySQL 教程
MySQL 教程

共48课时 | 1.9万人学习

MySQL 初学入门(mosh老师)
MySQL 初学入门(mosh老师)

共3课时 | 0.3万人学习

简单聊聊mysql8与网络通信
简单聊聊mysql8与网络通信

共1课时 | 811人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号