blockingqueue 可构建线程安全的“生产者→缓冲区→消费者”流水线,推荐 linkedblockingqueue(吞吐优先)或 arrayblockingqueue(背压控制),配合 phaser 实现多阶段批次同步,需防异常导致线程泄漏与性能瓶颈。

用 BlockingQueue 搭建最简流水线
流水线本质是“生产者→缓冲区→消费者”的三级结构,Java 里最直接的实现就是用 BlockingQueue 做中间缓冲。它天然支持线程安全、阻塞等待、容量控制,不用自己加锁或 wait/notify。
常见错误是用 ArrayList + synchronized 模拟队列——容易漏掉唤醒逻辑,或在空队列时忙等耗 CPU。
- 选
LinkedBlockingQueue(无界)适合吞吐优先、内存可控的场景;ArrayBlockingQueue(有界)能防止上游过快压垮下游,触发背压 - 每个处理阶段开一个
Thread或交给ExecutorService管理,但注意:线程数 ≠ 阶段数,一个阶段可配多个工作线程提升并行度 -
queue.take()会阻塞直到有数据,queue.poll(timeout, unit)更适合需要超时控制或定期检查中断的场景
用 Phaser 协调多阶段同步点
当流水线不是纯异步推送,而是需要“所有阶段完成第 N 批后,再统一进入第 N+1 批”,比如图像处理中每帧必须完整走完预处理→识别→后处理三步才输出结果,这时 BlockingQueue 不够用,得靠 Phaser。
别用 CyclicBarrier ——它只支持固定线程数且无法动态注册新参与者;CountDownLatch 只能用一次,没法循环复用。
立即学习“Java免费学习笔记(深入)”;
- 每个阶段启动时调用
phaser.register(),处理完一批数据后调用phaser.arriveAndAwaitAdvance() - 主线程或监控线程可用
phaser.getPhase()判断当前批次号,避免轮询 - 如果某阶段失败需终止整个流水线,调用
phaser.forceTermination(),后续arriveAndAwaitAdvance()会抛IllegalStateException
避免 ExecutorService 线程泄漏导致流水线卡死
用 Executors.newFixedThreadPool(n) 启动各阶段时,若下游阶段因异常退出而没消费队列,上游线程会持续往 BlockingQueue 写入,最终填满队列、阻塞上游——表面看是“卡住”,实则是线程池未正确关闭 + 异常未捕获。
- 每个阶段的
Runnable必须包一层try-catch(Throwable),记录日志并主动调用executor.shutdownNow()或通知协调器 - 不要依赖 JVM 退出自动清理:
shutdown()后应配合awaitTermination()等待任务结束,超时则强制中断 - 如果阶段间存在强依赖(如阶段二必须等阶段一输出才能启动),考虑用
CompletableFuture链式编排,比手动管理线程更清晰
流水线性能瓶颈往往不在并发数,而在数据拷贝和锁竞争
很多人一上来就调高线程数,结果 QPS 不升反降。真实瓶颈常是:阶段间传递大对象引发频繁 GC;多个线程争抢同一把锁(比如共享状态计数器);或 BlockingQueue 容量设得太小导致频繁阻塞。
- 阶段间尽量传 ID 或轻量引用,原始数据存缓存(如
ConcurrentHashMap),用空间换线程安全 - 避免在流水线核心路径里做日志打印、JSON 序列化等重操作,改用异步日志框架或批处理
- 用 JFR 或
jstack抓现场,重点看线程是否长期停留在parking to wait for(说明被队列阻塞)或in Object.wait()(说明用了低效同步)
流水线不是越深越好,阶段拆分要匹配实际 IO/计算边界;跨阶段共享状态越少,越容易定位问题。










