应使用 completablefuture 链式编排任务:支持顺序、并行、组合与降级;需显式指定线程池避免 commonpool 拥塞;用 exceptionally/handle 捕获并解包异常;allof 后须单独 join 取结果,不可直接 get 超时。

用 CompletableFuture 链式编排任务,别用 ExecutorService 直接 submit 堆砌
直接用 ExecutorService.submit() 丢一堆 Runnable 进去,看似并发,实则丢失依赖、异常不可达、结果难聚合。真正做 Pipeline,得靠 CompletableFuture 的链式能力——它天然支持顺序、并行、组合、降级。
- 每个阶段返回
CompletableFuture<t></t>,下游用thenApply/thenCompose接入,不是靠共享变量或 CountDownLatch 同步 - 关键区别:
thenApply是同步转换(同线程执行),thenApplyAsync才真正切线程;不显式指定 executor 时,默认走ForkJoinPool.commonPool(),高负载下容易挤占 CPU 导致延迟毛刺 - 必须用
exceptionally或handle拦住异常,否则上游失败会静默终止整个链,下游 get() 时才爆ExecutionException,排查困难
处理中间阶段失败:别只 catch Exception,要区分 CompletionException 和业务异常
CompletableFuture 包装的异常默认是 CompletionException,它把原始异常藏在 getCause() 里。如果下游只写 catch (Exception e),会漏掉真正的业务错误类型,导致 fallback 逻辑错配。
- 在
exceptionally回调里,先用e.getCause()解包,再判断是否为预期的TimeoutException或自定义RetryableException - 不要在
thenApply里手动 throw 新异常——这会让堆栈丢失原始上下文;改用completeExceptionally(new MyError(...))主动完成 - 若某阶段必须重试,别用 while 循环套
get(),而是用thenCompose+ 递归构造新 future,避免阻塞线程
控制并发度:用专用线程池,禁用 commonPool 处理 IO 密集型阶段
ForkJoinPool.commonPool() 默认并行度 = CPU 核数 -1,适合 CPU 密集型计算。但 Pipeline 里常混着 HTTP 调用、DB 查询等 IO 操作,用 commonPool 会导致线程饥饿、响应延迟飙升。
- 为 IO 阶段单独建池:
Executors.newFixedThreadPool(20)或更优的newCachedThreadPool(注意回收策略) - 每个
thenApplyAsync/supplyAsync显式传入该池,例如supplyAsync(() -> callApi(), ioPool) - 池大小不是越大越好:连接池、DB 连接数、远程服务限流都会成为瓶颈,建议从 4–8 开始压测,观察
pool.getActiveCount()和排队队列长度
结果聚合与超时:用 allOf + join 而非 get,避免单点拖垮整条流水线
CompletableFuture.allOf(f1, f2, f3).get(10, TimeUnit.SECONDS) 看似方便,但只要任意一个 future 超时或失败,整个 get 就抛异常,你拿不到其余两个的成功结果——Pipeline 失去了“尽力而为”的弹性。
- 改用
allOf后,对每个 future 单独join()(无超时)或封装成带 fallback 的orTimeout().exceptionally(...) - 若需整体超时,用
CompletableFuture.anyOf(timeoutFuture, mainFuture)实现“主流程 or 超时兜底”,而不是给 allOf 加 timeout - 警惕
allOf返回的是CompletableFuture<void></void>,要取结果得手动 collect 到数组或 map,别指望它自动打包
真正难的不是串起几个 future,而是当某个 HTTP 阶段慢了 3 秒、DB 批量写入部分失败、下游服务返回 503 时,Pipeline 是否还能给出可解释的结果——这些边界 case 的处理逻辑,比主线代码多三倍。








