0

0

Go语言并发批量处理数据_Golang数据处理实战

P粉602998670

P粉602998670

发布时间:2026-01-22 08:32:54

|

590人浏览过

|

来源于php中文网

原创

应使用信号量限流而非无限制启goroutine,避免OOM;批量写入需拼接多行SQL而非单条执行;优先切片分片而非channel传数据;错误需计数、重试、记录,不可忽略或fatal。

go语言并发批量处理数据_golang数据处理实战

并发控制用 semaphore 而不是无限制启 goroutine

直接在循环里写 go process(item) 看似简单,但数据量一大就会触发 runtime: out of memory 或系统级资源耗尽。Go 的 goroutine 虽轻量,但每个仍占 2KB+ 空间,上万并发瞬间吃光内存。

正确做法是用信号量(如 golang.org/x/sync/semaphore)限流:

sem := semaphore.NewWeighted(int64(maxConcurrency))
var wg sync.WaitGroup
for _, item := range items {
    wg.Add(1)
    go func(i Item) {
        defer wg.Done()
        if err := sem.Acquire(context.Background(), 1); err != nil {
            log.Printf("acquire failed: %v", err)
            return
        }
        defer sem.Release(1)
        process(i)
    }(item)
}
wg.Wait()
  • maxConcurrency 建议设为 CPU 核心数 × 2~5,具体看 process 是 CPU 密集还是 IO 密集
  • 别漏掉 defer sem.Release(1),否则后续 goroutine 永远拿不到许可
  • sem.Acquire 可能阻塞或超时,生产环境建议传带超时的 context

批量写入数据库要避免逐条 INSERT

go 并发处理数据后直接调 db.Exec("INSERT ...", item.ID, item.Name),本质是把单条 SQL 放到多个连接里跑——不仅没提升吞吐,还放大了连接池压力和事务开销。

真正高效的批量写法是:先分组,再拼 INSERT INTO ... VALUES (...), (...), (...),一次提交多行:

立即学习go语言免费学习笔记(深入)”;

PathFinder
PathFinder

AI驱动的销售漏斗分析工具

下载
const batchSize = 100
for i := 0; i < len(items); i += batchSize {
    batch := items[i:min(i+batchSize, len(items))]
    values := make([]interface{}, 0, len(batch)*2)
    placeholders := make([]string, 0, len(batch))
    for _, b := range batch {
        placeholders = append(placeholders, "(?, ?)")
        values = append(values, b.ID, b.Name)
    }
    query := "INSERT INTO users (id, name) VALUES " + strings.Join(placeholders, ", ")
    _, err := db.Exec(query, values...)
    if err != nil {
        log.Printf("batch insert failed: %v", err)
    }
}
  • MySQL / SQLite 支持单语句多值插入;PostgreSQL 需用 UNNESTpgx.Batch
  • 注意 values 参数顺序必须和 placeholders 严格对应
  • 过大的 batchSize(如 > 1000)可能触发 MySQL 的 max_allowed_packet 限制

channel 传数据不如切片传得快,别为了“看起来并发”硬套

常见反模式:itemsCh := make(chan Item, 1000),然后一个 goroutine 往里塞,多个 goroutine 从 channel 读。这引入了额外的同步开销和内存拷贝,实测比直接切片分段慢 15%~30%。

除非需要动态负载均衡(比如任务耗时差异极大),否则优先用预分片 + 启固定 goroutine:

func splitSlice[T any](s []T, n int) [][]T {
    var chunks [][]T
    for i := 0; i < len(s); i += n {
        end := i + n
        if end > len(s) {
            end = len(s)
        }
        chunks = append(chunks, s[i:end])
    }
    return chunks
}

chunks := splitSlice(items, len(items)/runtime.NumCPU())
var wg sync.WaitGroup
for _, chunk := range chunks {
    wg.Add(1)
    go func(c []Item) {
        defer wg.Done()
        for _, item := range c {
            process(item)
        }
    }(chunk)
}
wg.Wait()
  • channel 适合做事件通知、结果聚合、或跨 goroutine 协作,不是通用数据搬运工
  • 切片分片后传参,零拷贝(只传指针和长度),cache 局部性更好
  • 如果下游要合并结果,用 sync.Map 或预先分配好结果切片更可控

错误处理不能只靠 log.Fatal 或忽略 err

并发场景下,一个 goroutine panic 会终止整个程序;而静默忽略 err 会导致数据丢失却毫无感知。

推荐组合策略:失败计数 + 可恢复错误重试 + 不可恢复错误记录后跳过:

var failed atomic.Int64
var mu sync.RWMutex
var errors []error

for _, item := range items {
    go func(i Item) {
        defer func() {
            if r := recover(); r != nil {
                mu.Lock()
                errors = append(errors, fmt.Errorf("panic on %v: %v", i.ID, r))
                mu.Unlock()
                failed.Add(1)
            }
        }()
        if err := processWithRetry(i, 3); err != nil {
            mu.Lock()
            errors = append(errors, fmt.Errorf("fail on %v: %w", i.ID, err))
            mu.Unlock()
            failed.Add(1)
            return
        }
    }(item)
}
// 等待结束后检查 failed.Load() > 0 再决定是否告警
  • 别在 goroutine 里用 log.Fatal,它会杀掉 main goroutine
  • 网络类错误(如 DB timeout)适合指数退避重试;校验失败类错误(如字段为空)应直接跳过
  • 所有错误必须进日志或监控,至少包含 item 关键标识(如 ID、批次号)
实际跑通的关键不在“并发多”,而在“控得住、写得稳、错得明”。很多线上事故,都是漏了信号量释放、批大小越界、或 channel 缓冲区填满后死锁——这些点比语法细节更容易卡住上线节奏。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
golang如何定义变量
golang如何定义变量

golang定义变量的方法:1、声明变量并赋予初始值“var age int =值”;2、声明变量但不赋初始值“var age int”;3、使用短变量声明“age :=值”等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

211

2024.02.23

golang有哪些数据转换方法
golang有哪些数据转换方法

golang数据转换方法:1、类型转换操作符;2、类型断言;3、字符串和数字之间的转换;4、JSON序列化和反序列化;5、使用标准库进行数据转换;6、使用第三方库进行数据转换;7、自定义数据转换函数。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

247

2024.02.23

golang常用库有哪些
golang常用库有哪些

golang常用库有:1、标准库;2、字符串处理库;3、网络库;4、加密库;5、压缩库;6、xml和json解析库;7、日期和时间库;8、数据库操作库;9、文件操作库;10、图像处理库。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

356

2024.02.23

golang和python的区别是什么
golang和python的区别是什么

golang和python的区别是:1、golang是一种编译型语言,而python是一种解释型语言;2、golang天生支持并发编程,而python对并发与并行的支持相对较弱等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

214

2024.03.05

golang是免费的吗
golang是免费的吗

golang是免费的。golang是google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的开源编程语言,采用bsd开源协议。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

409

2024.05.21

golang结构体相关大全
golang结构体相关大全

本专题整合了golang结构体相关大全,想了解更多内容,请阅读专题下面的文章。

490

2025.06.09

golang相关判断方法
golang相关判断方法

本专题整合了golang相关判断方法,想了解更详细的相关内容,请阅读下面的文章。

201

2025.06.10

golang数组使用方法
golang数组使用方法

本专题整合了golang数组用法,想了解更多的相关内容,请阅读专题下面的文章。

1478

2025.06.17

Python异步编程与Asyncio高并发应用实践
Python异步编程与Asyncio高并发应用实践

本专题围绕 Python 异步编程模型展开,深入讲解 Asyncio 框架的核心原理与应用实践。内容包括事件循环机制、协程任务调度、异步 IO 处理以及并发任务管理策略。通过构建高并发网络请求与异步数据处理案例,帮助开发者掌握 Python 在高并发场景中的高效开发方法,并提升系统资源利用率与整体运行性能。

37

2026.03.12

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
MySQL 教程
MySQL 教程

共48课时 | 2.5万人学习

MySQL 初学入门(mosh老师)
MySQL 初学入门(mosh老师)

共3课时 | 0.3万人学习

简单聊聊mysql8与网络通信
简单聊聊mysql8与网络通信

共1课时 | 848人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号