协程流实现需自定义generator/task类型,核心是封装带引用计数和队列的stream结构,正确实现promise_type(含yield_value背压控制、get_return_object返回stream),并确保跨线程安全与await_ready短路优化。

协程返回类型必须自定义 generator 或 task,不能用 std::coroutine_handle 直接裸用
标准库没提供开箱即用的流式协程类型,std::generator(C++23)只支持单向只读迭代,不支持 push 数据、暂停消费、错误注入等管道必需能力。自己封装时,核心是实现符合协程要求的 promise_type,并让 await_transform 能处理数据项或控制信号。
常见错误是直接把 std::coroutine_handle 当作流容器传给下游——它只是执行句柄,不带状态管理逻辑,也没生命周期绑定,极易悬垂或提前销毁。
- 用
struct stream { /* 有引用计数的 coroutine_handle + queue */ };封装,避免裸 handle -
promise_type::get_return_object()返回stream,不是void或auto - 所有
co_yield值必须经由promise_type::yield_value()入队并通知等待方,不能只存不唤醒
管道连接时必须显式处理背压,否则 co_yield 会无限堆积内存
协程默认“生产即入队”,但下游消费慢时,上游仍可能持续 co_yield,导致缓冲区暴涨。这不是调度问题,是设计上漏掉了阻塞/丢弃/重试策略。
典型场景:日志解析协程每秒 yield 10k 条结构体,下游写磁盘协程每秒只能处理 2k 条——5 秒后内存占用翻 5 倍。
立即学习“C++免费学习笔记(深入)”;
- 在
promise_type::yield_value()中检查下游缓冲水位,超限时调用co_await std::suspend_always{} - 用
std::atomic<size_t></size_t>记录待消费数,避免锁;但注意co_await暂停后需重新检查条件 - 不要依赖
std::this_thread::sleep_for模拟背压——协程挂起是协作式的,sleep 是抢占式,会破坏调度语义
跨线程传递 stream 对象必须保证 coroutine_handle 的线程安全访问
一个 stream 实例内部持有 std::coroutine_handle<promise_type></promise_type>,而该 handle 的 resume() / destroy() 不是线程安全的。多线程管道中,若 A 线程正在 resume 协程,B 线程同时 destroy 它,会触发未定义行为。
错误做法:把 stream 原样拷贝进 std::thread 参数,或塞进 std::async 的 lambda 捕获列表里——没有同步机制保障 handle 生命周期。
- 用
std::shared_ptr<promise_type></promise_type>替代裸 handle 存储,让 resume/destroy 都作用于 shared_ptr 所指对象 - 在
stream析构函数里调用handle.destroy(),但前提是 handle 有效且未被其他线程并发操作 - 更稳妥的是禁止跨线程转移
stream,改用线程本地队列 +std::condition_variable中转数据
co_await 输入源时,别忽略 await_ready() 的短路逻辑
管道常需要从文件、网络或另一个 stream 拉数据,这时容易写成 co_await input_stream.next()。但如果 next() 返回的 awaiter 的 await_ready() 总返回 false,哪怕数据已就绪也会强制挂起再唤醒,白白增加调度开销。
性能敏感场景(如高频传感器数据流),这种无谓挂起会让吞吐下降 15%+。
- 在
awaiter::await_ready()中检查内部缓冲是否非空,空才返回false - 如果输入源支持 peek(如
std::queue的front()),优先用它判断就绪性 - 对基于事件的输入(如 epoll/kqueue),
await_ready()应查 eventfd 或完成端口状态,而非总返回false
真正难的不是写出能跑的协程流,而是让每个环节都清楚自己在什么条件下挂起、谁负责唤醒、数据在哪儿落地、失败时怎么清理。这些细节藏在 promise_type 的十几行代码里,但错一处,整条管道就卡死或泄漏。











