0

0

如何用Golang实现发布订阅模式 使用channel构建事件驱动架构

P粉602998670

P粉602998670

发布时间:2025-08-14 15:58:02

|

446人浏览过

|

来源于php中文网

原创

使用channel实现发布订阅模式的核心在于维护订阅者列表并解耦发布者与订阅者。1. 通过map存储主题与订阅者channel的对应关系,实现订阅和取消订阅操作;2. 发布消息时遍历订阅者列表,并用goroutine发送以防止阻塞;3. 防止channel阻塞可采用带缓冲的channel、加锁控制或丢弃策略;4. 缓冲大小应根据发布与订阅速度差异选择,通常从保守值开始调整;5. 处理订阅者掉线可通过超时机制检测并移除无效channel,结合心跳检测提升可靠性;6. 若需保证顺序性,可通过单channel串行分发或为消息添加序列号排序实现。

如何用Golang实现发布订阅模式 使用channel构建事件驱动架构

发布订阅模式在Go语言中,利用channel可以优雅地实现。核心在于维护一个订阅者列表(通常是channel的slice或map),当事件发生时,将消息发送到所有订阅者的channel中。这种方式能有效解耦发布者和订阅者,构建灵活的事件驱动架构。

如何用Golang实现发布订阅模式 使用channel构建事件驱动架构

解决方案

package main

import (
    "fmt"
    "sync"
    "time"
)

// EventData 事件数据结构
type EventData struct {
    Topic string
    Data  interface{}
}

// Broker 发布者/代理
type Broker struct {
    mu          sync.RWMutex
    subscribers map[string][]chan EventData
    quit        chan bool
}

// NewBroker 创建一个新的 Broker 实例
func NewBroker() *Broker {
    return &Broker{
        subscribers: make(map[string][]chan EventData),
        quit:        make(chan bool),
    }
}

// Subscribe 订阅特定主题
func (b *Broker) Subscribe(topic string, ch chan EventData) {
    b.mu.Lock()
    defer b.mu.Unlock()
    b.subscribers[topic] = append(b.subscribers[topic], ch)
}

// Unsubscribe 取消订阅
func (b *Broker) Unsubscribe(topic string, ch chan EventData) {
    b.mu.Lock()
    defer b.mu.Unlock()

    if _, ok := b.subscribers[topic]; !ok {
        return
    }

    var newSubs []chan EventData
    for _, sub := range b.subscribers[topic] {
        if sub != ch {
            newSubs = append(newSubs, sub)
        }
    }
    b.subscribers[topic] = newSubs

    // 如果该主题没有订阅者,则删除该主题
    if len(b.subscribers[topic]) == 0 {
        delete(b.subscribers, topic)
    }
}

// Publish 发布事件
func (b *Broker) Publish(event EventData) {
    b.mu.RLock()
    defer b.mu.RUnlock()

    if subs, ok := b.subscribers[event.Topic]; ok {
        for _, ch := range subs {
            // 使用 goroutine 防止阻塞发布者
            go func(channel chan EventData) {
                select {
                case channel <- event: // 发送事件到channel
                default:
                    fmt.Println("Channel full, dropping message") // 处理channel阻塞的情况
                }
            }(ch)
        }
    }
}

// Start 启动 Broker
func (b *Broker) Start() {
    <-b.quit // 等待退出信号
}

// Stop 停止 Broker
func (b *Broker) Stop() {
    close(b.quit)
}

func main() {
    broker := NewBroker()
    go broker.Start()
    defer broker.Stop()

    // 创建订阅者
    subscriber1 := make(chan EventData, 10) // 带缓冲的channel
    subscriber2 := make(chan EventData, 10)

    // 订阅主题
    broker.Subscribe("topicA", subscriber1)
    broker.Subscribe("topicB", subscriber2)

    // 发布事件
    broker.Publish(EventData{Topic: "topicA", Data: "Message for topicA"})
    broker.Publish(EventData{Topic: "topicB", Data: "Message for topicB"})
    broker.Publish(EventData{Topic: "topicA", Data: "Another message for topicA"})

    time.Sleep(time.Second) // 等待消息处理

    // 从订阅者接收消息
    select {
    case msg := <-subscriber1:
        fmt.Printf("Subscriber 1 received: %+v\n", msg)
    default:
        fmt.Println("Subscriber 1 received nothing")
    }

    select {
    case msg := <-subscriber1:
        fmt.Printf("Subscriber 1 received: %+v\n", msg)
    default:
        fmt.Println("Subscriber 1 received nothing")
    }

    select {
    case msg := <-subscriber2:
        fmt.Printf("Subscriber 2 received: %+v\n", msg)
    default:
        fmt.Println("Subscriber 2 received nothing")
    }


    // 取消订阅
    broker.Unsubscribe("topicA", subscriber1)

    // 再次发布事件,subscriber1 不应该再收到
    broker.Publish(EventData{Topic: "topicA", Data: "This should not be received by subscriber1"})
    time.Sleep(time.Second)

    select {
    case msg := <-subscriber1:
        fmt.Printf("Subscriber 1 received: %+v\n", msg)
    default:
        fmt.Println("Subscriber 1 received nothing") // 预期输出
    }
}

使用channel实现发布订阅,最怕的就是channel阻塞。上面的代码里,我加了

select
语句,就是为了防止这种情况。如果channel满了,就直接丢弃消息,当然,更好的做法是加一些重试机制或者把消息持久化到队列里,但这会增加复杂度,看具体场景需求吧。

立即学习go语言免费学习笔记(深入)”;

如何用Golang实现发布订阅模式 使用channel构建事件驱动架构

Golang channel的缓冲大小如何选择?

Kite
Kite

代码检测和自动完成工具

下载

Channel的缓冲大小直接影响到发布订阅模式的性能和可靠性。太小容易阻塞,太大又浪费内存。一般来说,需要根据发布者和订阅者的速度差异来调整。如果发布速度远大于订阅速度,可以适当增加缓冲大小。另外,还可以考虑使用动态调整缓冲大小的策略,但这会引入额外的复杂性。我的经验是,先用一个相对保守的值,比如10或者100,然后根据实际运行情况调整。

如何用Golang实现发布订阅模式 使用channel构建事件驱动架构

如何处理订阅者掉线或崩溃的情况?

这是个很现实的问题。如果订阅者掉线了,broker继续往它的channel里发消息,会导致goroutine泄漏。解决这个问题,可以在

Publish
方法里加一个超时机制,如果一段时间内无法向channel发送消息,就认为订阅者已经掉线,然后从订阅者列表中移除。另外,订阅者在重新连接后,需要重新订阅。更健壮的方案,可以考虑使用心跳检测,定期检查订阅者是否存活。

如何保证消息的顺序性?

在发布订阅模式中,保证消息的顺序性是一个挑战。因为消息是并发发送到多个订阅者的,很难保证所有订阅者都按照相同的顺序接收到消息。如果对顺序性有严格要求,可以考虑使用单一的channel来发送消息,然后由一个goroutine负责将消息分发到各个订阅者。但这样做会降低并发度。另一种方法是为每个消息添加一个序列号,订阅者收到消息后,按照序列号进行排序。

相关文章

驱动精灵
驱动精灵

驱动精灵基于驱动之家十余年的专业数据积累,驱动支持度高,已经为数亿用户解决了各种电脑驱动问题、系统故障,是目前有效的驱动软件,有需要的小伙伴快来保存下载体验吧!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

相关专题

更多
golang如何定义变量
golang如何定义变量

golang定义变量的方法:1、声明变量并赋予初始值“var age int =值”;2、声明变量但不赋初始值“var age int”;3、使用短变量声明“age :=值”等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

180

2024.02.23

golang有哪些数据转换方法
golang有哪些数据转换方法

golang数据转换方法:1、类型转换操作符;2、类型断言;3、字符串和数字之间的转换;4、JSON序列化和反序列化;5、使用标准库进行数据转换;6、使用第三方库进行数据转换;7、自定义数据转换函数。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

228

2024.02.23

golang常用库有哪些
golang常用库有哪些

golang常用库有:1、标准库;2、字符串处理库;3、网络库;4、加密库;5、压缩库;6、xml和json解析库;7、日期和时间库;8、数据库操作库;9、文件操作库;10、图像处理库。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

340

2024.02.23

golang和python的区别是什么
golang和python的区别是什么

golang和python的区别是:1、golang是一种编译型语言,而python是一种解释型语言;2、golang天生支持并发编程,而python对并发与并行的支持相对较弱等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

209

2024.03.05

golang是免费的吗
golang是免费的吗

golang是免费的。golang是google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的开源编程语言,采用bsd开源协议。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

393

2024.05.21

golang结构体相关大全
golang结构体相关大全

本专题整合了golang结构体相关大全,想了解更多内容,请阅读专题下面的文章。

198

2025.06.09

golang相关判断方法
golang相关判断方法

本专题整合了golang相关判断方法,想了解更详细的相关内容,请阅读下面的文章。

191

2025.06.10

golang数组使用方法
golang数组使用方法

本专题整合了golang数组用法,想了解更多的相关内容,请阅读专题下面的文章。

273

2025.06.17

c++空格相关教程合集
c++空格相关教程合集

本专题整合了c++空格相关教程,阅读专题下面的文章了解更多详细内容。

0

2026.01.23

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Go 教程
Go 教程

共32课时 | 4.1万人学习

Go语言实战之 GraphQL
Go语言实战之 GraphQL

共10课时 | 0.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号