必须使用 aio-pika 而非 pika:pika 的 blockingconnection 和 asyncconnection 均不兼容 asyncio,前者阻塞事件循环,后者依赖 trio/curio;aio-pika 基于 aiormq,原生支持 asyncio,提供 robustconnection、自动重连、async context manager 等特性,且需注意 vhost 格式、ssl 配置及 publish/consume 分离处理。

asyncio 下直接用 pika.Connection 失败是必然的
pika 默认所有连接和通道都是同步阻塞的,底层用的是普通 socket 和 select,跟 asyncio 的事件循环根本不兼容。你如果在 async def 里调用 BlockingConnection 或 Connection,整个协程就卡死,event loop 被拖住,后续所有异步任务都停摆。
常见错误现象:RuntimeWarning: coroutine 'xxx' was never awaited、CPU 占用飙高但消息没发出去、asyncio.TimeoutError 频发——其实根本不是超时,是线程/IO 被锁死了。
- 别试图给
BlockingConnection加loop.run_in_executor包一层:能跑但吞吐差、资源泄漏风险高,尤其在高并发 publish 场景下 channel 复用混乱 - 真正适配 asyncio 的只有
RobustConnection(来自aio-pika),不是 pika 官方包自带的 - pika 1.0+ 虽然加了
AsyncConnection,但它依赖trio或curio,不原生支持 asyncio;强行用会报NotImplementedError: asyncio not supported
必须换用 aio-pika 而不是 pika
aio-pika 是专为 asyncio 设计的 RabbitMQ client,API 基本兼容 pika,但所有方法都返回 await-able 对象。它底层用的是 aiormq,完全基于 asyncio transport 和 protocol 实现,没有线程池、没有阻塞调用。
使用场景:需要从 FastAPI/Starlette 启动时建立连接、用 async context manager 管理生命周期、或在 async for 中持续消费消息。
立即学习“Python免费学习笔记(深入)”;
- 安装命令是
pip install aio-pika,不是pip install pika;两者不能混用 -
aio-pika的connect_robust()会自动重连,比手动写 retry 逻辑干净得多 - 注意版本:aio-pika >= 9.0 才默认用 asyncio event loop;旧版可能 fallback 到 thread-based 模式,需显式传
loop=asyncio.get_event_loop()
publish 和 consume 必须分开处理异常与生命周期
publish 是无状态、可批量、失败可重试的操作;consume 是长连接、有状态、中断后需重新声明队列和绑定。混在一起写容易导致 channel 错误复用或 connection 意外关闭后无法恢复。
典型错误:在 consumer callback 里直接 await publish,结果 publish 报 ChannelClosed,但 consumer 还在跑,消息不断重复投递。
- publish 推荐用独立的
RobustChannel,每次操作完不 close,靠连接池复用;出错时捕获aio_pika.exceptions.AMQPConnectionError或ChannelClosedError,重连后重试 - consume 必须用
RobustConnection+set_qos控制预取数,否则大量 unack 消息堆积会拖垮 broker - 不要在 consumer 回调里做耗时 IO(比如 HTTP 请求);必须做的话,用
asyncio.to_thread()或拆到后台 task,避免阻塞 channel 的 ack 流程
SSL/TLS 和 vhost 配置容易漏掉斜杠
RabbitMQ 的 vhost 如果不是 /,URL 格式必须写成 amqps://user:pass@host:5671/vhost_name,注意开头是斜杠;少写或写成 vhost_name/ 都会导致认证失败,报错信息却是模糊的 ConnectionClosedByBroker 或 403 ACCESS_REFUSED。
SSL 配置更麻烦:aio-pika 不接受 ssl_options 字典,得传 ssl.SSLContext 实例,且必须显式设 verify_mode=ssl.CERT_REQUIRED,否则自签名证书直接拒绝连接。
- vhost 名含下划线或短横?没问题,但 URL 里不能 url-encode,aio-pika 内部会处理
- 用 Docker 跑 RabbitMQ 时,默认 vhost 是
/,但很多团队改成了myapp,这时 URL 必须是amqp://.../myapp,不是amqp://.../myapp/ - 本地开发用自签证书,记得把
cafile路径传进ssl_context.load_verify_locations(),路径错一个字符就是SSLCertVerificationError










