asyncio不自动处理背压,需开发者显式设计:用有界asyncio.Queue(maxsize>0)、Semaphore限流、避免put_nowait等陷阱,全程匹配生产与消费速率。

asyncio 本身不自动处理背压,背压需由开发者显式设计和控制。核心在于:任务生产数据的速度超过消费能力时,必须有机制让生产者减速或暂停,否则内存暴涨、协程堆积、响应延迟甚至崩溃。
背压的常见来源
典型场景包括:
- 异步生成器持续 yield 数据,但下游 await 不及时(如未用 async for 或未 await 每次迭代)
- 多个协程向同一个 asyncio.Queue 快速 put,但 get 速度慢,队列无限增长
- HTTP 客户端并发请求过多(如 aiohttp.ClientSession 未限制并发数),压垮服务端或本地连接池
- 流式读取(如 StreamReader.read())后未及时处理,接收缓冲区堆积
用 asyncio.Queue 实现可控缓冲
asyncio.Queue 是最常用的背压载体,其 size 参数可设上限:
```python
queue = asyncio.Queue(maxsize=100) # 满时 put() 自动挂起生产者
async def producer():
for i in range(1000):
await queue.put(f"data-{i}") # 若队列满,此处暂停,直到有空位
async def consumer():
while True:
item = await queue.get()
# 处理 item
queue.task_done()
```
立即学习“Python免费学习笔记(深入)”;
关键点:maxsize > 0 才启用背压;若设为 0(默认),队列无界,失去背压能力。
限流与信号协同控制
仅靠队列不够时,可叠加更精细策略:
- 使用 asyncio.Semaphore 控制并发数(如限制同时发起的 HTTP 请求不超过 5 个)
- 在流处理中主动检查:例如读取 TCP 流后,用 await asyncio.sleep(0) 让出控制权,避免独占事件循环
- 结合 cancel/timeout:对慢消费者设置超时,主动中断或降级处理,防止阻塞整个流水线
警惕“假背压”陷阱
以下写法看似有节制,实则无效:
```python
# ❌ 错误:没有 await,put 立即返回,不触发背压
queue.put_nowait(item)
# ❌ 错误:try/except 吞掉 Full 异常,跳过等待逻辑
try:
queue.put_nowait(item)
except asyncio.QueueFull:
pass # 丢弃或重试?未定义行为
```
正确做法是统一用 await queue.put(...),依赖其内置的挂起逻辑;若需非阻塞,应配合 asyncio.wait_for 显式设超时。
背压不是加个队列就万事大吉,而是要从数据流源头到终点全程考虑速率匹配。设计时多问一句:“这里卡住时,上游会不会失控?”









