Go管道模式是用channel连接多阶段goroutine,实现数据流式处理:输入→处理→合并→输出;各阶段独立解耦,需正确关闭channel、用errgroup管理生命周期、fan-in安全汇聚结果。

理解管道模式的核心结构
Go 的并发管道模式本质是用 channel 连接多个 goroutine 阶段,每个阶段专注一类操作:输入 → 处理1 → 处理2 → 合并 → 输出。关键不是“多开 goroutine”,而是让数据像水流一样自然流过各阶段,各阶段独立运行、解耦协作。
构建分阶段处理链(带错误传递与关闭控制)
每个阶段应接收输入 channel,返回输出 channel,并在输入关闭后主动退出。推荐统一使用 errgroup.Group 管理 goroutine 生命周期,避免 goroutine 泄漏:
- 第一阶段(读取):从切片/文件/网络读数据,发到
inCh chan Item;读完后close(inCh) - 中间阶段(如过滤、转换):接收前一阶段的 channel,启动固定数量 goroutine 并行处理,结果发到新 channel;监听输入关闭,处理完已接收项后退出
- 合并阶段:用
sync.WaitGroup或 errgroup 等待所有上游完成,再用for range从多个输出 channel 收集结果(可用select+default非阻塞轮询,或fan-in函数统一汇聚)
安全合并多路结果(避免死锁与数据丢失)
合并不是简单 for-range 多个 channel —— 若某路未关闭,range 会永久阻塞。正确做法是:
- 确保每个上游阶段在退出前 关闭其输出 channel(这是契约)
- 用 fan-in 模式:为每个输入 channel 启一个 goroutine,将值转发到一个共享的
outCh chan Result;所有转发 goroutine 结束后,close(outCh) - 主流程
for res := range outCh安全消费,无需担心某路卡住
完整可运行示例(含错误处理与资源清理)
以下是一个处理字符串切片、转大写、过滤空串、统计长度的三阶段管道:
立即学习“go语言免费学习笔记(深入)”;
func main() {
items := []string{"hello", "", "world", "go", ""}
// 阶段1:输入
in := make(chan string, len(items))
go func() {
defer close(in)
for _, s := range items {
in <- s
}
}()
// 阶段2:转大写(并行3个goroutine)
upper := upperCase(in, 3)
// 阶段3:过滤空串 + 统计长度(单goroutine,也可并行)
resultCh := countLen(upper)
// 合并并打印
for res := range resultCh {
fmt.Printf("'%s' -> %d\n", res.str, res.len)
}}
func upperCase(in aitGroup
wg.Add(workers)
for i := 0; i
func countLen(in










