asyncio没有现成协程池因其只负责调度而不限制并发数,需用asyncio.semaphore手动控制并发上限,避免压垮下游服务;错误使用会导致timeouterror、503增多或连接池耗尽。

为什么 asyncio 没有现成的协程池
因为 asyncio 本身不提供“池化”抽象——它只管调度,不负责限制并发数。你直接用 asyncio.gather() 或 asyncio.create_task() 丢一堆协程进去,很容易压垮下游服务或触发限流。真正的“协程池”得靠你自己控制同时运行的协程数量。
常见错误现象:asyncio.TimeoutError 频发、HTTP 503 响应突增、数据库连接池耗尽,但日志里看不到明显异常——其实是并发量失控了。
关键点:协程不是线程,不能靠 threading.Semaphore 或 multiprocessing.Pool 套用;必须用 asyncio.Semaphore 配合任务提交/等待逻辑来模拟池行为。
用 asyncio.Semaphore 实现最小可行池
核心就是用一个信号量(asyncio.Semaphore)卡住并发上限,每个协程执行前先 acquire(),结束后 release()。不需要复杂类封装,几行就能跑起来。
立即学习“Python免费学习笔记(深入)”;
- 初始化时传入最大并发数,比如
sem = asyncio.Semaphore(10) - 每个待执行协程包装成
async def worker(): await sem.acquire(); try: return await your_coro() finally: sem.release() - 别用
await sem.acquire()后忘写finally——一旦协程抛异常没释放,池就“漏气”,后续任务全卡死 - 注意:信号量是 per-event-loop 的,跨线程或跨 loop 会失效;多 loop 场景下得按 loop 实例分别建池
如何安全地提交任务并获取结果
直接 await 每个带信号量的协程会串行化,失去并发意义;必须用 asyncio.create_task() 并发启动,再统一 await 结果列表。
典型误用:results = [await run_with_semaphore(coro) for coro in coros] —— 这本质是顺序执行,只是加了锁,毫无意义。
正确做法:
tasks = []
for coro in coros:
task = asyncio.create_task(run_with_semaphore(coro))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return_exceptions=True 很关键:否则一个协程出错,整个 gather 就中断,其余还在跑的任务结果全丢掉。
要不要封装成类?什么情况下真需要
如果你只需要控制并发数,函数级封装(比如一个 run_limited() 工具函数)完全够用。类封装只有在以下情况才值得投入:
- 需要动态调整最大并发数(
pool.set_limit(20)),且已有大量调用点 - 要统计实时活跃任务数、排队长度,用于监控或熔断
- 需支持取消正在排队但未开始执行的任务(这要求自己维护等待队列 +
asyncio.CancelledError处理) - 和
concurrent.futures.Executor接口对齐,方便同步/异步代码混用
多数业务场景里,过度封装反而让调用方更难理解控制流——信号量在哪、谁负责 release、异常怎么透出,都藏在类里,出问题更难定位。










