fan-out 通过固定数量 worker 并发处理任务,避免无节制启 goroutine 导致资源耗尽;需用带缓冲 channel 分发任务,及时关闭输入 channel,worker 用 for range 安全消费,且不在分发阶段做重试或阻塞 io。

Fan-Out:怎么启动多个 goroutine 并发处理任务
Fan-Out 的核心是把一个输入源拆成多份,分发给多个 goroutine 同时干活。关键不是“开很多 goroutine”,而是控制并发数、避免资源耗尽。
常见错误是直接对每个元素起一个 goroutine:go process(item),数据量大时可能瞬间创建几千个 goroutine,触发调度压力甚至 OOM。
- 用固定数量的 worker(比如
runtime.NumCPU()或业务评估出的安全值),通过 channel 接收任务 - 输入 channel 要关闭,否则 worker 无法退出;worker 内部用
for item := range inChan安全消费 - 别在 Fan-Out 阶段做重试或阻塞 IO —— 这会让某个 worker 卡住,拖慢整体吞吐
workers := 4
in := make(chan int, 100)
for i := 0; i < workers; i++ {
go func() {
for item := range in {
result := heavyWork(item)
out <- result // 发到输出 channel
}
}()
}Fan-In:如何安全聚合多个 goroutine 的输出结果
Fan-In 就是把多个 goroutine 的输出统一收进一个 channel。难点不在“合并”,而在“何时结束”和“是否丢数据”。
典型错误是用 select 配合超时或默认分支,导致部分结果被跳过;或者用 close(out) 过早,漏掉还在路上的结果。
立即学习“go语言免费学习笔记(深入)”;
- 每个 worker 完成后单独 close 自己的输出 channel(如果用了多路 channel),但更推荐统一由主 goroutine 管理生命周期
- 用
sync.WaitGroup等待所有 worker 退出,再 close 输出 channel —— 这是最稳的方式 - 不要用
for range直接读多个 channel,要用reflect.Select或更简单的:每个 worker 单独 go 写入同一 output channel,主 goroutine 只管读
var wg sync.WaitGroup
out := make(chan Result, 100)
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range in {
out <- process(item)
}
}()
}
go func() {
wg.Wait()
close(out)
}()ctx.Done() 怎么嵌入 Fan-Out/Fan-In 链防止 goroutine 泄露
没有上下文取消的 Fan-Out/Fan-In,在调用方提前退出时会继续跑完所有任务,goroutine 和 channel 缓冲区都悬着不释放。
错误做法是只在入口 check ctx.Err(),但 worker 内部仍在读 channel、执行逻辑。
- 每个 worker 启动时接收
ctx,并在循环中用select同时监听ctx.Done()和输入 channel - worker 内部的阻塞操作(如 http 调用、数据库查询)必须传入
ctx,否则 cancel 不生效 - 输出 channel 写入前也要 select 判断 ctx 是否已取消,避免往已关闭的 channel 发送 panic
go func() {
for {
select {
case item, ok := <-in:
if !ok {
return
}
result := doWork(ctx, item) // doWork 内部也用 ctx
select {
case out <- result:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()为什么不能直接用 for-range + time.After 做超时聚合
很多人想用 time.After 包裹整个 Fan-In 阶段,比如 “等 5 秒,不管有没有收完都关 channel”。这会导致结果截断,而且掩盖了真正的瓶颈。
更糟的是,time.After 在 for-range 外层使用,会阻止 goroutine 正常退出 —— 因为 input channel 没关,worker 一直卡在 range 里。
- 超时应该作用于单次任务(比如每个
http.Get加ctx.WithTimeout),而不是整条流水线 - 如果业务真需要“最多等 N 秒拿到尽可能多结果”,就用带缓冲的 output channel + 单独的 timer goroutine close 它,但主读取逻辑仍要处理
!ok - 注意:用
time.After代替ctx.Done()会丢失 cancel 原因(如 deadline exceeded vs canceled),调试困难
真正难的不是写对 Fan-Out/Fan-In,而是判断哪些环节该响应 cancel、哪些该完成、哪些该丢弃 —— 这取决于业务语义,不是模式本身能决定的。










