异步 Session 必须用 AsyncSession 和 async_sessionmaker,不能复用同步 sessionmaker;事务需显式 await session.begin() 或用 async with session.begin(),expire_on_commit=False 防止提交后字段为 None。

异步 Session 必须用 AsyncSession,不能复用同步 sessionmaker
SQLAlchemy 2.x 的异步支持是彻底分离的:同步用 sessionmaker,异步必须用 async_sessionmaker。直接把同步工厂传给 async with 会报 RuntimeError: This event loop is already running 或静默失败。
正确写法是:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker from sqlalchemy import textengine = create_async_engine("sqlite+aiosqlite:///db.sqlite", echo=True) AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
✅ 正确:await 获取 session 实例
async def get_user(user_id: int): async with AsyncSessionLocal() as session: result = await session.execute(text("SELECT * FROM users WHERE id = :id"), {"id": user_id}) return result.fetchone()
-
expire_on_commit=False很关键——否则commit()后对象字段变None,尤其在返回 Pydantic 模型时直接崩 -
create_async_engine必须用带aiosqlite、asyncpg或aiomysql的 URL,sqlite:///这种同步驱动不支持异步 - 不要在
async with外保存session引用,它不是线程/协程安全的
事务必须显式 await session.begin() 或用 async with session.begin()
异步 Session 不会自动开启事务。执行 session.execute() 或 session.add() 时,若没处于事务中,SQLAlchemy 会临时开一个只读事务(对写操作无效),导致 commit 报错 InvalidRequestError: No transaction is in progress。
两种可靠方式:
# 方式一:显式 begin + commit/rollback(适合复杂控制流)
async def transfer_money(from_id: int, to_id: int, amount: float):
async with AsyncSessionLocal() as session:
await session.begin() # ⚠️ 必须 await
try:
await session.execute(
text("UPDATE accounts SET balance = balance - :amt WHERE id = :id"),
{"amt": amount, "id": from_id}
)
await session.execute(
text("UPDATE accounts SET balance = balance + :amt WHERE id = :id"),
{"amt": amount, "id": to_id}
)
await session.commit()
except Exception:
await session.rollback()
raise
方式二:async with session.begin()(更简洁,自动 rollback on exception)
async def create_post(title: str, content: str):
async with AsyncSessionLocal() as session:
async with session.begin(): # ✅ 自动 commit,异常时自动 rollback
session.add(Post(title=title, content=content))
-
session.begin()是协程函数,必须await;漏掉await会导致后续操作在无事务上下文中执行 -
async with session.begin()内部已包含await session.begin(),无需再手动调 -
session.rollback()和session.commit()也必须await,否则事务挂起不提交
嵌套 async with 事务会报 InvalidRequestError: Transaction is already begun
SQLAlchemy 2.x 异步不支持真正的嵌套事务(savepoint 是另一回事)。如果外层用了 async with session.begin(),内层再 async with session.begin() 就会触发该错误。
需要“子事务”语义时,改用 session.begin_nested():
async def outer_logic():
async with AsyncSessionLocal() as session:
async with session.begin():
await session.execute(text("INSERT INTO logs (msg) VALUES ('start')"))
# 子事务:失败不影响外层
nested = await session.begin_nested() # ✅ 返回 SavepointTransaction
try:
await session.execute(text("INSERT INTO users (name) VALUES ('test')"))
await nested.commit() # 提交 savepoint
except Exception:
await nested.rollback() # 回滚到 savepoint,外层仍可 commit
-
begin_nested()返回的是SavepointTransaction,不是新AsyncSession,所有操作仍在同一 session 中 -
savepoint在 PostgreSQL / MySQL 上有效,SQLite 的aiosqlite不支持(会静默退化为普通事务) - 别混淆
session.begin_nested()和async_sessionmaker(begin_nested=True)—— 后者无效,参数只存在于同步版
依赖注入(如 FastAPI)中传 AsyncSession 要注意生命周期和 scope
FastAPI 的 Depends 默认每次请求新建依赖,但如果你手动 yield 一个未关闭的 AsyncSession,或在中间件里缓存 session 实例,容易引发 ResourceClosedError 或连接泄漏。
安全做法是让依赖函数本身负责 async with 并 yield:
from fastapi import Dependsasync def get_db(): async with AsyncSessionLocal() as session: yield session # ✅ yield session,而非 yield AsyncSessionLocal()
@app.get("/users/{id}") async def read_user(id: int, session: AsyncSession = Depends(get_db)): result = await session.execute(select(User).where(User.id == id)) return result.scalar_one_or_none()
- yield 前的
async with确保 session 在请求结束时自动 close - 不要在依赖中
return session—— 那样 session 会在依赖返回后立即被 GC 关闭,后续使用报ResourceClosedError - 若需跨多个
Depends共享 session(比如 repo 层 + service 层),必须用同一个get_db实例,不能定义两个独立依赖
实际用的时候,最常踩的坑是忘了 await 所有 session 方法,以及误以为 async_sessionmaker 创建的是“自动事务 session”。事务边界永远要自己画清楚,async 版本不会替你猜。










