flow api 仅提供接口和subscription工具类,无开箱即用publisher;唯一可用内置实现是线程安全、支持背压的submissionpublisher,需手动管理request()与close(),业务中推荐使用reactor或rxjava。

Flow.Publisher 和 Flow.Subscriber 不是让你直接 new 的
Java 9 的 Flow API 是对 Reactive Streams 规范的最小化内置实现,它只提供接口和一个工具类 Flow.Subscription,**不带任何开箱即用的 Publisher 实现**。你写 new Flow.Publisher() { ... } 是合法语法,但无法触发背压、不会被下游正确订阅——因为没人调用你的 subscribe() 方法里的逻辑。
常见错误现象:onSubscribe() 从不被调用,或者 request(1) 后没收到 onNext(),程序静默结束。
- 真正可用的起点是
Flow.Processor(需自行实现)或第三方库(如 Reactor 的Mono/Flux、RxJava 的Flowable) - 若坚持用原生
Flow,必须手写一个符合规范的Publisher:在subscribe()中把传入的Subscriber保存并主动调用其onSubscribe(),且后续所有回调(onNext/onError/onComplete)必须线程安全、不可重入 - JDK 自带的唯一参考实现是
SubmissionPublisher——它是线程安全、支持背压的通用Publisher,适合做数据源入口
SubmissionPublisher 是唯一靠谱的内置 Publisher
SubmissionPublisher 是 JDK 唯一提供的可直接实例化的 Publisher,它内部用 ForkJoinPool 管理异步提交,并通过 Estimator 动态估算下游处理能力来调节 offer() 行为。但它不是“发布即忘”:调用 submit() 或 offer() 后,数据是否真的发出,取决于当前订阅者的 request() 余量。
使用场景:需要把批量数据(如日志、传感器采样)以可控速率推给多个响应式消费者,且不能丢数据(submit() 阻塞等待)、也不能压垮下游(offer() 可配置拒绝策略)。
立即学习“Java免费学习笔记(深入)”;
-
submit(item):阻塞直到有request()余量或订阅者取消;适合强一致性要求 -
offer(item, dropHandler):非阻塞;当无请求余量时,交由dropHandler决定是否丢弃(返回true才丢) - 构造时传入的
Executor控制onNext()调用线程,默认用公共ForkJoinPool,高并发下建议传入自定义线程池 - 必须手动调用
close(),否则可能泄漏资源;关闭后新submit()抛IllegalStateException
Subscriber 必须自己管理 request() 节奏
响应式流的核心契约是「下游驱动上游」,Subscriber 不是被动收包,而是要主动调用 Subscription.request(n) 来声明“我现在能处理 n 个”。JDK 不帮你自动调用,也不限制 n 的大小——全靠你自己判断。
常见错误现象:request(1) 后只处理一个就停住;或 request(Long.MAX_VALUE) 导致上游狂发,OOM;或在 onNext() 里递归调用 request() 引发栈溢出。
- 典型安全模式:在
onSubscribe()里先request(1),等onNext()处理完再request(1)(逐个拉取) - 批量处理可
request(32)或request(1024),但需确保内存能 hold 住这批数据;处理完再补发相同数量 - 绝不要在
onNext()中直接调用request(),应投递到线程池或用Executor.execute()异步触发,避免阻塞上游线程 -
Subscription.cancel()后禁止再调用request(),否则行为未定义
Flow API 和 Project Reactor/RxJava 的关系很浅
Flow 接口只是类型契约,Reactor 的 Flux 和 RxJava 的 Flowable 都实现了 Publisher,所以能被 SubmissionPublisher 的 subscribe() 接收——但反过来不成立:Flux.just(1,2,3) 是 Publisher,可传给任何接受该接口的方法,但它内部完全不依赖 JDK 的 Flow 类,也不使用 SubmissionPublisher。
性能与兼容性影响:原生 Flow 没有操作符链、没有调度器抽象、没有错误恢复机制。想做 map/filter/retry,只能自己写 Processor,复杂度陡增。
- 如果你项目已用 Reactor,直接用
Flux.create()或Flux.from(submissionPublisher)即可,无需碰原生Flow - JDK 9+ 编译、JDK 17 运行时,
Flow接口始终存在,但SubmissionPublisher在 Android(非 ART)或某些裁剪版 JRE 中可能缺失 - 跨语言互操作(如与 Kotlin Flow、Scala ZIO)时,
Flow.Publisher是通用握手协议,但实际传输仍需序列化适配
Flow API 的设计意图从来不是让你造轮子,而是让框架作者有一套标准接口可实现。真正在业务代码里手撸 Publisher/Subscriber,往往意味着你在过早优化,或者正踩进背压逻辑的深坑里——而坑底通常写着:“这里本该用 Reactor”。










