0

0

详解Java中的Flow API (Publisher/Subscriber)_Java 9提供的响应式流类库

P粉602998670

P粉602998670

发布时间:2026-02-17 08:47:23

|

185人浏览过

|

来源于php中文网

原创

flow api 仅提供接口和subscription工具类,无开箱即用publisher;唯一可用内置实现是线程安全、支持背压的submissionpublisher,需手动管理request()与close(),业务中推荐使用reactor或rxjava。

详解java中的flow api (publisher/subscriber)_java 9提供的响应式流类库

Flow.Publisher 和 Flow.Subscriber 不是让你直接 new 的

Java 9 的 Flow API 是对 Reactive Streams 规范的最小化内置实现,它只提供接口和一个工具类 Flow.Subscription,**不带任何开箱即用的 Publisher 实现**。你写 new Flow.Publisher() { ... } 是合法语法,但无法触发背压、不会被下游正确订阅——因为没人调用你的 subscribe() 方法里的逻辑。

常见错误现象:onSubscribe() 从不被调用,或者 request(1) 后没收到 onNext(),程序静默结束。

  • 真正可用的起点是 Flow.Processor(需自行实现)或第三方库(如 Reactor 的 Mono/Flux、RxJava 的 Flowable
  • 若坚持用原生 Flow,必须手写一个符合规范的 Publisher:在 subscribe() 中把传入的 Subscriber 保存并主动调用其 onSubscribe(),且后续所有回调(onNext/onError/onComplete)必须线程安全、不可重入
  • JDK 自带的唯一参考实现是 SubmissionPublisher——它是线程安全、支持背压的通用 Publisher,适合做数据源入口

SubmissionPublisher 是唯一靠谱的内置 Publisher

SubmissionPublisher 是 JDK 唯一提供的可直接实例化的 Publisher,它内部用 ForkJoinPool 管理异步提交,并通过 Estimator 动态估算下游处理能力来调节 offer() 行为。但它不是“发布即忘”:调用 submit()offer() 后,数据是否真的发出,取决于当前订阅者的 request() 余量。

使用场景:需要把批量数据(如日志、传感器采样)以可控速率推给多个响应式消费者,且不能丢数据(submit() 阻塞等待)、也不能压垮下游(offer() 可配置拒绝策略)。

立即学习Java免费学习笔记(深入)”;

360智绘
360智绘

360智脑推出的AI绘画创作与分享平台

下载
  • submit(item):阻塞直到有 request() 余量或订阅者取消;适合强一致性要求
  • offer(item, dropHandler):非阻塞;当无请求余量时,交由 dropHandler 决定是否丢弃(返回 true 才丢)
  • 构造时传入的 Executor 控制 onNext() 调用线程,默认用公共 ForkJoinPool,高并发下建议传入自定义线程池
  • 必须手动调用 close(),否则可能泄漏资源;关闭后新 submit()IllegalStateException

Subscriber 必须自己管理 request() 节奏

响应式流的核心契约是「下游驱动上游」,Subscriber 不是被动收包,而是要主动调用 Subscription.request(n) 来声明“我现在能处理 n 个”。JDK 不帮你自动调用,也不限制 n 的大小——全靠你自己判断。

常见错误现象:request(1) 后只处理一个就停住;或 request(Long.MAX_VALUE) 导致上游狂发,OOM;或在 onNext() 里递归调用 request() 引发栈溢出。

  • 典型安全模式:在 onSubscribe() 里先 request(1),等 onNext() 处理完再 request(1)(逐个拉取)
  • 批量处理可 request(32)request(1024),但需确保内存能 hold 住这批数据;处理完再补发相同数量
  • 绝不要在 onNext() 中直接调用 request(),应投递到线程池或用 Executor.execute() 异步触发,避免阻塞上游线程
  • Subscription.cancel() 后禁止再调用 request(),否则行为未定义

Flow API 和 Project Reactor/RxJava 的关系很浅

Flow 接口只是类型契约,Reactor 的 Flux 和 RxJava 的 Flowable 都实现了 Publisher,所以能被 SubmissionPublishersubscribe() 接收——但反过来不成立:Flux.just(1,2,3)Publisher,可传给任何接受该接口的方法,但它内部完全不依赖 JDK 的 Flow 类,也不使用 SubmissionPublisher

性能与兼容性影响:原生 Flow 没有操作符链、没有调度器抽象、没有错误恢复机制。想做 map/filter/retry,只能自己写 Processor,复杂度陡增。

  • 如果你项目已用 Reactor,直接用 Flux.create()Flux.from(submissionPublisher) 即可,无需碰原生 Flow
  • JDK 9+ 编译、JDK 17 运行时,Flow 接口始终存在,但 SubmissionPublisher 在 Android(非 ART)或某些裁剪版 JRE 中可能缺失
  • 跨语言互操作(如与 Kotlin Flow、Scala ZIO)时,Flow.Publisher 是通用握手协议,但实际传输仍需序列化适配

Flow API 的设计意图从来不是让你造轮子,而是让框架作者有一套标准接口可实现。真正在业务代码里手撸 Publisher/Subscriber,往往意味着你在过早优化,或者正踩进背压逻辑的深坑里——而坑底通常写着:“这里本该用 Reactor”。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
Kotlin协程编程与Spring Boot集成实践
Kotlin协程编程与Spring Boot集成实践

本专题围绕 Kotlin 协程机制展开,深入讲解挂起函数、协程作用域、结构化并发与异常处理机制,并结合 Spring Boot 展示协程在后端开发中的实际应用。内容涵盖异步接口设计、数据库调用优化、线程资源管理以及性能调优策略,帮助开发者构建更加简洁高效的 Kotlin 后端服务架构。

106

2026.02.12

硬盘接口类型介绍
硬盘接口类型介绍

硬盘接口类型有IDE、SATA、SCSI、Fibre Channel、USB、eSATA、mSATA、PCIe等等。详细介绍:1、IDE接口是一种并行接口,主要用于连接硬盘和光驱等设备,它主要有两种类型:ATA和ATAPI,IDE接口已经逐渐被SATA接口;2、SATA接口是一种串行接口,相较于IDE接口,它具有更高的传输速度、更低的功耗和更小的体积;3、SCSI接口等等。

1486

2023.10.19

PHP接口编写教程
PHP接口编写教程

本专题整合了PHP接口编写教程,阅读专题下面的文章了解更多详细内容。

383

2025.10.17

php8.4实现接口限流的教程
php8.4实现接口限流的教程

PHP8.4本身不内置限流功能,需借助Redis(令牌桶)或Swoole(漏桶)实现;文件锁因I/O瓶颈、无跨机共享、秒级精度等缺陷不适用高并发场景。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

2225

2025.12.29

java接口相关教程
java接口相关教程

本专题整合了java接口相关内容,阅读专题下面的文章了解更多详细内容。

37

2026.01.19

堆和栈的区别
堆和栈的区别

堆和栈的区别:1、内存分配方式不同;2、大小不同;3、数据访问方式不同;4、数据的生命周期。本专题为大家提供堆和栈的区别的相关的文章、下载、课程内容,供大家免费下载体验。

418

2023.07.18

堆和栈区别
堆和栈区别

堆(Heap)和栈(Stack)是计算机中两种常见的内存分配机制。它们在内存管理的方式、分配方式以及使用场景上有很大的区别。本文将详细介绍堆和栈的特点、区别以及各自的使用场景。php中文网给大家带来了相关的教程以及文章欢迎大家前来学习阅读。

592

2023.08.10

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

675

2023.08.10

pixiv网页版官网登录与阅读指南_pixiv官网直达入口与在线访问方法
pixiv网页版官网登录与阅读指南_pixiv官网直达入口与在线访问方法

本专题系统整理pixiv网页版官网入口及登录访问方式,涵盖官网登录页面直达路径、在线阅读入口及快速进入方法说明,帮助用户高效找到pixiv官方网站,实现便捷、安全的网页端浏览与账号登录体验。

283

2026.02.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 3.6万人学习

C# 教程
C# 教程

共94课时 | 9.6万人学习

Java 教程
Java 教程

共578课时 | 66.7万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号