0

0

c# 如何用 Channel 实现一个批处理(Batching)的后台服务

月夜之吻

月夜之吻

发布时间:2026-01-04 09:45:59

|

948人浏览过

|

来源于php中文网

原创

直接用 channel 做批处理易丢数据,因其无“凑够n条”或“超时提交”语义;需封装定时器+批量读取逻辑,并确保 flushbatchasync 支持取消、不吞异常、清空列表,且生命周期与 ihostedservice 对齐。

c# 如何用 channel 实现一个批处理(batching)的后台服务

为什么直接用 Channel<t></t> 做批处理容易丢数据

因为 Channel<t></t> 本身不提供“等凑够 N 条再发”或“超时强制提交”的语义。你如果只靠 Channel.Reader.ReadAsync() 一条条读,就退化成单条处理;如果自己加循环 TryRead 拼批次,又得手动管超时、取消、边界条件——稍不留神,Writer 关闭时未读完的数据就丢了,或者批次卡住不触发。

核心矛盾在于:Channel 是流式传输原语,不是批处理原语。必须在它之上封装一层协调逻辑。

Channel<t></t> + Timer 实现可靠批处理的关键点

推荐用一个后台 Task 持续从 Channel.Reader 尝试批量读取,同时用 System.Threading.Timer 触发“兜底提交”。注意三点:

降重鸟
降重鸟

要想效果好,就用降重鸟。AI改写智能降低AIGC率和重复率。

下载
  • Timer 的回调必须是线程安全的,且不能阻塞(比如别在里面 await 或调 long-running 方法)
  • 每次读取前检查 Reader.Completion.IsCompleted,避免在 Channel 关闭后还尝试读
  • 批次收集必须用 List<t></t> 而非数组,且每次提交后要清空,不能复用引用(否则并发写会乱)
private async Task BatchingLoopAsync(CancellationToken ct)
{
    var batch = new List<MyData>();
    var timer = new Timer(_ => { _ = FlushBatchAsync(batch, ct); }, null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);

    try
    {
        while (!ct.IsCancellationRequested && await _channel.Reader.WaitToReadAsync(ct).ConfigureAwait(false))
        {
            while (_channel.Reader.TryRead(out var item))
            {
                batch.Add(item);
                if (batch.Count >= _batchSize)
                {
                    await FlushBatchAsync(batch, ct).ConfigureAwait(false);
                    batch.Clear();
                    timer.Change(_flushInterval, Timeout.InfiniteTimeSpan);
                }
            }

            // 每次有新数据进来,重置定时器(实现“最后一条进来后等 flushInterval 再提交”)
            timer.Change(_flushInterval, Timeout.InfiniteTimeSpan);
        }
    }
    finally
    {
        timer.Dispose();
        if (batch.Count > 0)
            await FlushBatchAsync(batch, ct).ConfigureAwait(false);
    }
}

FlushBatchAsync 必须支持取消且不能吞异常

这是最容易出问题的一环:如果 FlushBatchAsync 里调用的是外部 HTTP API 或数据库写入,它可能耗时、可能失败、可能被取消。必须显式传递 CancellationToken,并在 catch 块中区分 OperationCanceledException 和其他异常。

  • 遇到 OperationCanceledException:直接 return,不要重试,因为上层已要求停止
  • 遇到其他异常:记录日志,但不要 throw —— 否则整个 BatchingLoopAsync 会退出,后续数据全丢
  • 如果需要重试,应在 FlushBatchAsync 内部做(比如用 Polly),而不是让外层循环崩溃
private async Task FlushBatchAsync(List<MyData> batch, CancellationToken ct)
{
    try
    {
        await _httpClient.PostAsJsonAsync("/api/batch", batch, ct).ConfigureAwait(false);
    }
    catch (OperationCanceledException) when (ct.IsCancellationRequested)
    {
        // 正常退出路径,不记日志
        return;
    }
    catch (Exception ex) when (!(ex is OperationCanceledException))
    {
        _logger.LogError(ex, "Failed to flush batch of {Count} items", batch.Count);
        // 不 throw,继续下一轮
    }
    finally
    {
        batch.Clear(); // 确保清空,避免引用残留
    }
}

注册为 IHostedService 时要注意生命周期绑定

Channel 的 WriterReader 都需要和宿主生命周期对齐。常见错误是把 Channel.CreateBounded<t>()</t> 放在构造函数里,但没在 StopAsync 中显式调用 Writer.Complete(),导致 BatchingLoopAsync 永远等在 WaitToReadAsync 上,服务无法正常退出。

  • StartAsync 中启动 BatchingLoopAsync 并用 Task.RunBackgroundService 托管
  • StopAsync 中先调 _channel.Writer.Complete(),再 await _batchingTask 等它自然结束
  • 别在 Dispose 里做任何异步清理——IHostedService 的 Dispose 是同步的

真正难的是边界情况:比如 StopAsync 被调用时,FlushBatchAsync 正在发请求,这时 CancelToken 触发,你得确保那个 HTTP 请求真能被取消(HttpClient 默认支持),而不是留下悬挂连接。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

743

2023.08.10

Golang channel原理
Golang channel原理

本专题整合了Golang channel通信相关介绍,阅读专题下面的文章了解更多详细内容。

258

2025.11.14

golang channel相关教程
golang channel相关教程

本专题整合了golang处理channel相关教程,阅读专题下面的文章了解更多详细内容。

351

2025.11.17

数据库三范式
数据库三范式

数据库三范式是一种设计规范,用于规范化关系型数据库中的数据结构,它通过消除冗余数据、提高数据库性能和数据一致性,提供了一种有效的数据库设计方法。本专题提供数据库三范式相关的文章、下载和课程。

382

2023.06.29

如何删除数据库
如何删除数据库

删除数据库是指在MySQL中完全移除一个数据库及其所包含的所有数据和结构,作用包括:1、释放存储空间;2、确保数据的安全性;3、提高数据库的整体性能,加速查询和操作的执行速度。尽管删除数据库具有一些好处,但在执行任何删除操作之前,务必谨慎操作,并备份重要的数据。删除数据库将永久性地删除所有相关数据和结构,无法回滚。

2107

2023.08.14

vb怎么连接数据库
vb怎么连接数据库

在VB中,连接数据库通常使用ADO(ActiveX 数据对象)或 DAO(Data Access Objects)这两个技术来实现:1、引入ADO库;2、创建ADO连接对象;3、配置连接字符串;4、打开连接;5、执行SQL语句;6、处理查询结果;7、关闭连接即可。

357

2023.08.31

MySQL恢复数据库
MySQL恢复数据库

MySQL恢复数据库的方法有使用物理备份恢复、使用逻辑备份恢复、使用二进制日志恢复和使用数据库复制进行恢复等。本专题为大家提供MySQL数据库相关的文章、下载、课程内容,供大家免费下载体验。

259

2023.09.05

vb中怎么连接access数据库
vb中怎么连接access数据库

vb中连接access数据库的步骤包括引用必要的命名空间、创建连接字符串、创建连接对象、打开连接、执行SQL语句和关闭连接。本专题为大家提供连接access数据库相关的文章、下载、课程内容,供大家免费下载体验。

329

2023.10.09

Rust内存安全机制与所有权模型深度实践
Rust内存安全机制与所有权模型深度实践

本专题围绕 Rust 语言核心特性展开,深入讲解所有权机制、借用规则、生命周期管理以及智能指针等关键概念。通过系统级开发案例,分析内存安全保障原理与零成本抽象优势,并结合并发场景讲解 Send 与 Sync 特性实现机制。帮助开发者真正理解 Rust 的设计哲学,掌握在高性能与安全性并重场景中的工程实践能力。

19

2026.03.05

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
WEB前端教程【HTML5+CSS3+JS】
WEB前端教程【HTML5+CSS3+JS】

共101课时 | 10万人学习

JS进阶与BootStrap学习
JS进阶与BootStrap学习

共39课时 | 3.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号