用 sync.map 实现线程安全订阅者管理需外层用 sync.map 存 topic→*sublist 映射,内层 sublist 用 rwmutex 保护 callback 切片,避免并发读写 panic;回调须自行处理超时或取消,防止 goroutine 泄漏。

用 sync.Map 实现线程安全的订阅者管理
Go 标准库没有内置的发布订阅(Pub/Sub)结构,但核心难点在于:多个 goroutine 同时 Subscribe、Unsubscribe 和 Publish 时,必须保证订阅者列表的读写安全。直接用 map[string][]func(interface{}) 会引发 panic:”concurrent map read and map write“。
推荐用 sync.Map 存储 topic → []callback 映射,但注意:sync.Map 不支持原子性地对 value 做切片操作(比如并发 append),所以需额外加锁保护 callback 列表本身:
type PubSub struct {
subs sync.Map // topic string → *subList
}
<p>type subList struct {
mu sync.RWMutex
fns []func(interface{})
}</p><p>func (s *subList) add(f func(interface{})) {
s.mu.Lock()
defer s.mu.Unlock()
s.fns = append(s.fns, f)
}</p><p>func (s *subList) call(data interface{}) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, f := range s.fns {
f(data)
}
}常见错误是只锁外层 sync.Map,却忽略对每个 topic 下 callback 切片的并发保护——这会导致漏调用或 panic。
避免 goroutine 泄漏:订阅者回调必须可取消或带超时
如果某个 func(interface{}) 订阅者内部阻塞(如等待 HTTP 请求、数据库查询),而发布方持续调用 Publish,就会堆积大量等待中的 goroutine,最终 OOM。
立即学习“go语言免费学习笔记(深入)”;
实操建议:
- 在回调函数内自行控制上下文(
ctx.Done())和超时,不要依赖发布方 - 不推荐在
Publish中统一加 timeout —— 这会中断正常回调,且无法区分哪些已执行、哪些被丢弃 - 若需强制解耦,可用
select { case ch 非阻塞投递,配合缓冲 channel 控制积压量
典型场景:微服务中监听配置变更事件,回调里调用 http.Get 却没设 timeout,一次网络抖动就卡住整个 topic 的后续分发。
用 chan interface{} 替代闭包回调,更适合跨服务边界
闭包方式(Subscribe("log", func(v interface{}) { ... }))适合进程内轻量通信;但若要对接消息队列(如 NATS、Redis Pub/Sub)、或需要序列化/反序列化(如 JSON over HTTP),闭包不可传递,必须换通道模型。
改造要点:
- 每个订阅者获取专属
chan interface{},由 PubSub 内部 goroutine 转发事件 -
Subscribe返回chan interface{}和unsubscribe func(),调用后者关闭通道并清理 - 发布时用
select+default避免阻塞发送(接收方消费慢时自动丢弃旧消息)
示例片段:
func (p *PubSub) Subscribe(topic string) (<code>chan interface{}</code>, func()) {
ch := make(chan interface{}, 16)
p.subs.LoadOrStore(topic, &subChan{ch: ch})
return ch, func() {
close(ch)
p.subs.Delete(topic) // 简化处理,实际需更细粒度
}
}这种模式天然适配 range ch 循环,也方便做背压(例如用 time.AfterFunc 清理空闲 channel)。
小心 interface{} 类型擦除带来的运行时 panic
Pub/Sub 通常用 interface{} 作消息载体,但下游回调若强制断言为具体类型(如 v.(MyEvent)),上游传入 string 就直接 panic。
两种务实做法:
- 约定消息结构体统一实现某个接口(如
Event接口含Type() string和Timestamp() time.Time),回调用类型开关而非断言 - 发布前用
json.Marshal序列化,订阅端再json.Unmarshal到目标结构——虽有开销,但类型安全、跨语言友好
最容易被忽略的是日志和监控场景:你写了 log.Printf("event: %+v", v),结果某次传入 nil 或含 sync.Mutex 字段的结构体,直接 crash。










