goroutine 泄漏的根本原因是未关闭 channel 或未消费完数据,导致发送方在 ch

用 channel 实现 pipeline 时,为什么 goroutine 会泄漏?
根本原因是没关闭 channel 或没消费完数据,导致发送方卡在 ch ,接收方永远等不到 <code>close()。pipeline 里每个阶段都可能成为阻塞点。
- 上游 stage 发完数据后必须
close(ch),下游用range ch才能安全退出 - 如果某个 stage 提前 panic 或 return,但没 close 输出 channel,后续 stage 就会永久阻塞
- 别用无缓冲 channel 做 pipeline 主干——容易一卡全卡;优先用带缓冲的
make(chan int, 16) - 示例错误写法:
out := make(chan int); go func() { for _, v := range in { out → 忘了 <code>close(out)
多个 stage 串联时,如何避免“扇入/扇出”混乱?
扇入(merge)和扇出(fan-out)不是可选技巧,而是 pipeline 的刚性需求:一个 stage 往往要从多个输入 channel 读,或向多个输出 channel 写。不规范处理会导致漏数据、死锁、重复发送。
- 扇入推荐用
func merge(cs ...:起一个 goroutine,对每个 <code>c启动go func(c ,最后 <code>close(out) - 扇出慎用
select随机转发——它不保序、不均衡;真要分发,显式轮询或加负载标记更可控 - 所有输出 channel 必须由创建它的 stage 负责 close;被 merge 的 channel 可以提前 close,
range会自动退出 - 别让一个 goroutine 同时读多个 channel 并写入同一个输出 channel —— 没缓冲就极易死锁
怎么让 pipeline 支持取消和超时?
原生 channel 不带上下文,一旦启动就停不下来。必须把 context.Context 显式传进每个 stage,并在关键阻塞点检查 ctx.Done()。
- 每个 stage 函数签名建议为
func(ctx context.Context, in - 在
for v := range in循环内,每次迭代前加select { case - 向输出 channel 发送前也应 select:
select { case out - 注意:不要在 stage 内部 new context(比如
context.WithTimeout),超时控制应由调用方统一管理 - 错误示例:
for v := range in { out → <code>expensiveCalc耗时长,ctx 无法中断它;得拆成可中断的子步骤
实际业务中,哪些地方最容易被忽略?
不是语法不会写,而是边界情况没人盯——比如空输入、panic 恢复、资源清理、错误传递路径。
立即学习“go语言免费学习笔记(深入)”;
- 第一个 stage 如果输入是空 slice 或数据库查无结果,要主动
close(out),否则下游永远等 - 每个 stage 的 goroutine 应包一层
defer func(){ if r := recover(); r != nil { /* log */ } }(),否则 panic 会让整个 pipeline 静默挂掉 - 涉及文件、HTTP、DB 的 stage,不能只关 channel,还要显式
Close()底层资源(如rows.Close()) - 错误不能只打日志:要用额外的
errCh chan error向外暴露,或者把 error 塞进数据结构(如struct{ Val int; Err error })
channel pipeline 看似简单,真正难的是让每个 stage 在任意时刻都能被安全打断、重试、观测——这要求每个环节都明确自己的生命周期和失败契约。










