
本文详解如何通过自定义 __await__ 方法安全包装异步上下文管理器(如 asyncpg.pool.PoolAcquireContext),确保日志计数器仅在连接真正获取/释放后更新,避免因提前调用 acquire() 导致状态不一致。
本文详解如何通过自定义 `__await__` 方法安全包装异步上下文管理器(如 `asyncpg.pool.poolacquirecontext`),确保日志计数器仅在连接真正获取/释放后更新,避免因提前调用 `acquire()` 导致状态不一致。
在异步数据库连接池监控场景中,仅靠重写 __aenter__/__aexit__ 并不足以精确追踪底层连接的实际生命周期——因为 PoolAcquireContext 本身是可等待对象(即支持 await pool.acquire() 语法),其内部 __await__ 返回一个生成器,真正的连接获取发生在 await 执行过程中,而非 __aenter__ 调用时。若在 __await__ 中直接调用 self.cnt_logger.acquire() 后立即返回原协程的 __await__ 结果,会导致计数器在等待开始前就被递增,而一旦后续 await 失败(如超时、连接池满),计数器却已错误增加,破坏状态一致性。
正确的做法是:将 __await__ 实现为一个委托生成器(delegating generator),利用 yield from 将控制权交还给原始协程的 __await__ 迭代器,并在其执行完成(无论成功或异常)后,通过 finally 块确保计数器被准确更新。但注意:此处不能简单地在 finally 中调用 release() —— 因为 __await__ 仅对应 acquire 阶段;release 的时机应由 __aexit__ 或显式 await 后的清理逻辑负责。
以下是修正后的 LoggingPoolAcquireContext 关键实现:
class LoggingPoolAcquireContext:
def __init__(self, pool_acquire_context: asyncpg.pool.PoolAcquireContext, cnt_logger: CntPoolLogger):
self.pool_acquire_context = pool_acquire_context
self.cnt_logger = cnt_logger
async def __aenter__(self):
# __aenter__ 是标准协议入口,此处仍需 await 真正获取连接
conn = await self.pool_acquire_context.__aenter__()
self.cnt_logger.acquire() # ✅ 此时连接已确定获取成功
return conn
async def __aexit__(self, *exc_info):
# 先释放连接,再更新计数器
await self.pool_acquire_context.__aexit__(*exc_info)
self.cnt_logger.release() # ✅ 此时连接已确定释放完成
def __await__(self):
# ⚠️ 重要:__await__ 必须返回生成器对象(generator)
# 且需在 yield from 完成后,通过 finally 保证 acquire 调用不被遗漏
# 但注意:此处的 acquire 应与实际 await 结果绑定,因此推荐如下写法:
self.cnt_logger.acquire()
try:
# 委托原始上下文的 __await__,它会实际触发连接获取
return (yield from self.pool_acquire_context.__await__())
except BaseException:
# 若 await 过程中抛出异常(如 TimeoutError),需回滚计数器
self.cnt_logger.release()
raise✅ 关键原理说明:
- __await__ 方法必须返回一个实现了迭代器协议的对象(通常是生成器)。
- yield from expr.__await__() 不仅转发迭代,还自动处理 StopIteration 和异常传播,是 Python 异步机制底层的标准委托方式。
- try/except 捕获所有异常并回滚 acquire(),确保计数器始终反映真实连接状态;若使用 finally 则无法区分成功与失败路径,易导致误减。
此外,还需注意以下实践要点:
- 不要依赖 __del__ 清理资源:__del__ 触发时机不确定,可能引发 RuntimeError: I/O operation on closed file。应改用异步上下文管理器或显式 close() + await logger.close()。
- 日志文件需异步安全:当前 self.file.write() 是同步阻塞 I/O,在高并发下会严重拖慢协程调度。建议改用 aiofiles.open(..., 'a') 配合 await file.write()。
- 线程安全非必需,但协程安全必须保障:CntPoolLogger 的 conn_cnt 和 event_cnt 在单事件循环中无需锁,但若未来扩展至多 loop 或多进程,需升级为 asyncio.Lock 或原子计数器。
综上,包装 __await__ 的本质是将协程的“可等待性”作为可控的执行边界,通过生成器委托+异常防护,精准锚定业务逻辑(如日志计数)到异步操作的真实完成点。这是构建可靠异步监控中间件的核心技巧之一。










