现代编程语言中,并发已经成为必不可少的特性。现在绝大多数编程语言都有一些方法实现并发。 其中一些实现方式非常强大,能将负载转移到不同的系统线程,比如 Java 等;一些则在同一线程上模拟这种行为,比如 Ruby 等。 Golang 的并发模型非常强大,称为 CSP(通信顺序进程),它将一个问题分解成更小的顺序进程,然后调度这些进程的实例(称为 Goroutine)。这些进程通过 channel 传递信息实现通信。 本文,我们将探讨如何利用 golang 的并发性,以及如何在 workerPool 使用。系列文章的第二篇,我们将探讨如何构建一个强大的并发解决方案。 假设我们需要调用一个外部 API 接口,整个过程需要花费 100ms。如果我们需要同步地调用该接口 1000 次,则需要花费 100s。 上面的代码创建了 model 包,包里包含一个结构体,这个结构体只有一个 int 类型的成员。我们同步地处理 data,这显然不是最佳方案,因为可以并发处理这些任务。我们换一种方案,使用 goroutine 和 channel 来处理。 上面的代码,我们创建了容量 100 的缓存 channel,并通过 NoPooledWork() 将数据 push 到 channel 里。channel 长度满 100 之后,我们是无法再向其中添加元素直到有元素被读取走。使用 for range 读取 channel,并生成 goroutine 处理。这里我们没有限制生成 goroutine 的数量,这可以尽可能多地处理任务。从理论上来讲,在给定所需资源的情况下,可以处理尽可能多的数据。执行代码,完成 1000 个任务只花费了 100ms。很疯狂吧!不全是,接着往下看。 除非我们拥有地球上所有的资源,否则在特定时间内能够分配的资源是有限的。一个 goroutine 占用的最小内存是 2k,但也能达到 1G。上述并发执行所有任务的解决方案中,假设有一百万个任务,就会很快耗尽机器的内存和 CPU。我们要么升级机器的配置,要么就寻找其他更好的解决方案。 计算机科学家很久之前就考虑过这个问题,并提出了出色的解决方案 - 使用 Thread Pool 或者 Worker Pool。这个方案是使用 worker 数量受限的工作池来处理任务,workers 会按顺序一个接一个处理任务,这样就避免了 CPU 和内存使用急速增长。 我们通过实现 worker pool 来修复之前遇到的问题。 上面的代码,worker 数量限制在 100,我们创建了相应数量的 goroutine 来处理任务。我们可以把 channel 看作是队列,worker goroutine 看作是消费者。多个 goroutine 可以监听同一个 channel,但是 channel 里的每一个元素只会被处理一次。 Go 语言的 channel 可以当作队列使用。 这是一个比较好的解决方案,执行代码,我们看到完成所有任务花费 1s。虽然没有 100ms 这么快,但已经能满足业务需要,而且我们得到了一个更好的解决方案,能将负载均摊在不同的时间片上。 我们能做的还没完。上面看起来是一个完整的解决方案,但却不是的,我们没有处理错误情况。所以需要模拟出错的情形,并且看下我们需要怎么处理。 我们修改了 process() 函数,处理一些随机的错误并将错误 push 到 errors chnanel 里。所以,为了处理并发出现的错误,我们可以使用 errors channel 保存错误数据。在所有任务处理完成之后,可以检查错误 channel 是否有数据。错误 channel 里的元素保存了任务 ID,方便需要的时候再处理这些任务。 比之前没处理错误,很明显这是一个更好的解决方案。但我们还可以做得更好, 我们将在下篇文章讨论如何编写一个强大的 worker pool 包,并且在 worker 数量受限的情况下处理并发任务。 Go 语言的并发模型足够强大给力,只需要构建一个 worker pool 就能很好地解决问题而无需做太多工作,这就是它没有包含在标准库中的原因。但是,我们自己可以构建一个满足自身需求的方案。很快,我会在下一篇文章中讲到,敬请期待!
一个简单的例子
//// model/data.go
package model
type SimpleData struct {
ID int
}
//// basic/basic.go
package basic
import (
"fmt"
"github.com/Joker666/goworkerpool/model"
"time"
)
func Work(allData []model.SimpleData) {
start := time.Now()
for i, _ := range allData {
Process(allData[i])
}
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
func Process(data model.SimpleData) {
fmt.Printf("Start processing %d\n", data.ID)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Finish processing %d\n", data.ID)
}
//// main.go
package main
import (
"fmt"
"github.com/Joker666/goworkerpool/basic"
"github.com/Joker666/goworkerpool/model"
"github.com/Joker666/goworkerpool/worker"
)
func main() {
// Prepare the data
var allData []model.SimpleData
for i := 0; i < 1000; i++ {
data := model.SimpleData{ ID: i }
allData = append(allData, data)
}
fmt.Printf("Start processing all work \n")
// Process
basic.Work(allData)
}Start processing all work
Took ===============> 1m40.226679665s
异步
//// worker/notPooled.go
func NotPooledWork(allData []model.SimpleData) {
start := time.Now()
var wg sync.WaitGroup
dataCh := make(chan model.SimpleData, 100)
wg.Add(1)
go func() {
defer wg.Done()
for data := range dataCh {
wg.Add(1)
go func(data model.SimpleData) {
defer wg.Done()
basic.Process(data)
}(data)
}
}()
for i, _ := range allData {
dataCh <- allData[i]
}
close(dataCh)
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
//// main.go
// Process
worker.NotPooledWork(allData)Start processing all work
Took ===============> 101.191534ms
问题
解决方案:Worker Pool
//// worker/pooled.go
func PooledWork(allData []model.SimpleData) {
start := time.Now()
var wg sync.WaitGroup
workerPoolSize := 100
dataCh := make(chan model.SimpleData, workerPoolSize)
for i := 0; i < workerPoolSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range dataCh {
basic.Process(data)
}
}()
}
for i, _ := range allData {
dataCh <- allData[i]
}
close(dataCh)
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
//// main.go
// Process
worker.PooledWork(allData)Start processing all work
Took ===============> 1.002972449s
处理错误
//// worker/pooledError.go
func PooledWorkError(allData []model.SimpleData) {
start := time.Now()
var wg sync.WaitGroup
workerPoolSize := 100
dataCh := make(chan model.SimpleData, workerPoolSize)
errors := make(chan error, 1000)
for i := 0; i < workerPoolSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range dataCh {
process(data, errors)
}
}()
}
for i, _ := range allData {
dataCh <- allData[i]
}
close(dataCh)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case err := <-errors:
fmt.Println("finished with error:", err.Error())
case <-time.After(time.Second * 1):
fmt.Println("Timeout: errors finished")
return
}
}
}()
defer close(errors)
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
func process(data model.SimpleData, errors chan<- error) {
fmt.Printf("Start processing %d\n", data.ID)
time.Sleep(100 * time.Millisecond)
if data.ID % 29 == 0 {
errors <- fmt.Errorf("error on job %v", data.ID)
} else {
fmt.Printf("Finish processing %d\n", data.ID)
}
}
//// main.go
// Process
worker.PooledWorkError(allData)
总结
0
0
相关文章
如何在Golang中优化HTTP Client连接池 Go语言高并发请求客户端调优
解析Golang中的go mod graph命令 Go语言依赖树可视化分析
如何在Golang中操作MinIO对象存储服务 Go语言S3兼容API上传下载
如何在Golang中解析MIME Multipart邮件格式 Go语言mime/multipart处理
如何在Golang中实现HTTP/2 Server Push Go语言服务端资源推送优化
本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
热门AI工具
相关专题
golang定义变量的方法:1、声明变量并赋予初始值“var age int =值”;2、声明变量但不赋初始值“var age int”;3、使用短变量声明“age :=值”等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。
210
2024.02.23
golang数据转换方法:1、类型转换操作符;2、类型断言;3、字符串和数字之间的转换;4、JSON序列化和反序列化;5、使用标准库进行数据转换;6、使用第三方库进行数据转换;7、自定义数据转换函数。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。
247
2024.02.23
golang常用库有:1、标准库;2、字符串处理库;3、网络库;4、加密库;5、压缩库;6、xml和json解析库;7、日期和时间库;8、数据库操作库;9、文件操作库;10、图像处理库。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。
356
2024.02.23
golang和python的区别是:1、golang是一种编译型语言,而python是一种解释型语言;2、golang天生支持并发编程,而python对并发与并行的支持相对较弱等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。
214
2024.03.05
golang是免费的。golang是google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的开源编程语言,采用bsd开源协议。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。
409
2024.05.21
本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。
3
2026.03.11
热门下载
最新文章



