使用goroutine和channel实现生产者消费者模式,通过缓冲channel解耦数据生成与处理,生产者发送任务到channel,消费者从中接收并处理。

在Go语言中实现生产者消费者模式,最常用的方式是结合 goroutine 和 channel。这种模式能有效解耦数据生成和处理逻辑,适用于任务队列、数据流处理等场景。
使用Channel作为任务缓冲区
Go的channel天然适合实现生产者消费者模型。生产者将任务发送到channel,消费者从channel接收并处理。
定义一个缓冲channel作为任务队列,可以避免生产者被阻塞:
tasks := make(chan int, 100) // 缓冲大小为100的任务通道
生产者向channel发送数据:
立即学习“go语言免费学习笔记(深入)”;
go func() {
for i := 0; i < 1000; i++ {
tasks <- i
}
close(tasks) // 所有任务发送完成后关闭channel
}()
消费者从channel读取并处理数据:
go func() {
for task := range tasks {
fmt.Printf("处理任务: %d\n", task)
// 模拟处理耗时
time.Sleep(time.Millisecond * 10)
}
}()
启动多个消费者提升处理能力
为了提高并发处理能力,可以启动多个消费者goroutine同时消费任务。
在现实生活中的购物过程,购物者需要先到商场,找到指定的产品柜台下,查看产品实体以及标价信息,如果产品合适,就将该产品放到购物车中,到收款处付款结算。电子商务网站通过虚拟网页的形式在计算机上摸拟了整个过程,首先电子商务设计人员将产品信息分类显示在网页上,用户查看网页上的产品信息,当用户看到了中意的产品后,可以将该产品添加到购物车,最后使用网上支付工具进行结算,而货物将由公司通过快递等方式发送给购物者
例如启动5个消费者:
for i := 0; i < 5; i++ {
go func(workerID int) {
for task := range tasks {
fmt.Printf("Worker %d 处理任务: %d\n", workerID, task)
time.Sleep(time.Millisecond * 10)
}
}(i)
}
所有消费者通过同一个channel读取任务,Go runtime会自动保证线程安全。
等待所有消费者完成
使用sync.WaitGroup确保主程序在所有消费者处理完毕后再退出。
var wg sync.WaitGroup wg.Add(5) // 启动5个消费者for i := 0; i < 5; i++ { go func(workerID int) { defer wg.Done() for task := range tasks { fmt.Printf("Worker %d 处理任务: %d\n", workerID, task) time.Sleep(time.Millisecond * 10) } }(i) }
// 等待所有消费者完成 wg.Wait()
完整示例代码
以下是一个完整的生产者消费者实现:
package mainimport ( "fmt" "sync" "time" )
func main() { tasks := make(chan int, 100) var wg sync.WaitGroup
// 生产者 go func() { for i := 0; i < 100; i++ { tasks <- i } close(tasks) }() // 消费者 for i := 0; i < 3; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() for task := range tasks { fmt.Printf("Worker %d 处理任务 %d\n", workerID, task) time.Sleep(time.Millisecond * 50) } }(i) } wg.Wait() fmt.Println("所有任务已完成")}
基本上就这些。利用channel的并发安全特性和goroutine的轻量性,Go实现生产者消费者模式非常简洁高效。









