asyncio.run()中不能直接await ProcessPoolExecutor.submit(),因其返回concurrent.futures.Future;应使用loop.run_in_executor()包装,配合with管理生命周期,并用asyncio.wait_for()控制超时。

asyncio.run() 里直接 await ProcessPoolExecutor.submit() 会卡死
因为 ProcessPoolExecutor 的 submit() 返回的是 concurrent.futures.Future,不是 asyncio.Future,不能直接 await。强行 await 会导致事件循环挂起,程序看似运行但无响应。
正确做法是用 loop.run_in_executor() 包一层:
import asyncio
from concurrent.futures import ProcessPoolExecutor
<p>def cpu_heavy(x):
return x ** 42</p><p>async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:</p><h1>✅ 正确:交给 loop 调度,返回真正的 awaitable</h1><pre class='brush:python;toolbar:false;'> result = await loop.run_in_executor(pool, cpu_heavy, 123)
print(result)</pre>- 必须传入正在运行的 event loop 实例(
asyncio.get_running_loop()),不能新建new_event_loop() -
run_in_executor()第二个参数是函数名,后续参数是它的实参,不要加括号调用 - 如果函数需要传入多个参数,用
lambda或functools.partial封装,避免在主线程提前执行
为什么不用 asyncio.to_thread() 替代?
asyncio.to_thread() 是 Python 3.9+ 提供的轻量封装,但它只支持线程池(ThreadPoolExecutor),**不支持进程池**。对 CPU 密集任务,用线程池没意义——GIL 会锁死,无法真正并行。
- IO 密集、调用阻塞库(如旧版
requests)、或需简单并发时,to_thread()更简洁 - CPU 密集任务必须用
ProcessPoolExecutor+run_in_executor() - 混用场景常见于:异步收请求 → 进程池算模型 → 异步写数据库 → 异步发通知
进程池实例该在哪里创建和关闭?
不能在每个协程里反复 ProcessPoolExecutor(),开销大且易触发“too many open files”;也不能全局单例长期持有,容易在 reload 或测试中残留子进程。
立即学习“Python免费学习笔记(深入)”;
- 推荐在顶层 async context(如
async with)或生命周期明确的 scope 中管理,例如 FastAPI 的lifespan或主async def main()里用with - 若需复用,把
ProcessPoolExecutor实例作为参数传入协程,或通过依赖注入(如asyncpg.Pool那样) - 注意:
ProcessPoolExecutor不是 awaitable,不能用async with,只能用普通with配合run_in_executor
子进程抛异常,主协程怎么捕获?
子进程崩溃、超时或函数抛出异常,会以 concurrent.futures.ProcessPoolExecutor 的方式包装后,原样抛到 await 处——但类型是 concurrent.futures.process.BrokenProcessPool 或原始异常(如 ValueError),不是 asyncio.CancelledError。
- 直接用
try/except捕获,不需要特殊转换 - 超时控制靠
asyncio.wait_for(),不是executor.submit(..., timeout=)(后者在子进程内无效) - 示例:
try: result = await asyncio.wait_for( loop.run_in_executor(pool, cpu_heavy, 123), timeout=5.0 ) except asyncio.TimeoutError: print("计算超时") except ValueError as e: print("子进程报了 ValueError:", e)
子进程 stderr 不会自动透出到主进程日志,调试时建议在 CPU 函数里加 print() 或用 logging 输出到文件。










