
在python异步编程的实践中,尤其是在构建需要与异步i/o资源(如数据库、网络服务)交互的类时,开发者常常会遇到一个棘手的问题:如何在类的构造函数`__init__`中执行异步操作,例如初始化数据库连接或确保资源存在?由于`__init__`方法本质上是一个同步函数,它不允许直接使用`await`关键字,这使得许多期望在对象创建时完成异步初始化的尝试都以失败告终。
Python的async/await语法糖是为协程(coroutine)设计的,协程需要在事件循环中调度执行。而__init__方法是普通函数,它在对象实例化时同步执行,不参与事件循环。因此,尝试在__init__中直接使用await会导致SyntaxError。
考虑以下一个尝试在__init__中初始化Cosmos DB客户端的示例,它旨在确保数据库和容器在类实例创建时就已经准备就绪:
import asyncio
from azure.cosmos.aio import CosmosClient # 假设这是一个异步客户端
class CosmosCRUD:
def __init__(self, client: CosmosClient):
self.client = client
# 以下代码会导致SyntaxError,因为__init__不是async函数
# self.database = await self.client.create_database_if_not_exists("MY_DATABASE_NAME")
# self.container = await self.database.create_container_if_not_exists("MY_CONTAINER_NAME", partition_key={"kind": "Hash", "paths": ["/id"]})上述代码由于await的使用,将无法运行。
面对__init__的同步限制,开发者可能会考虑以下几种替代方案,但它们通常伴随着严重的性能或设计问题:
立即学习“Python免费学习笔记(深入)”;
这种方法试图在__init__内部通过asyncio.run()或loop.run_until_complete()来同步地执行异步初始化逻辑。
import asyncio
from azure.cosmos.aio import CosmosClient
class CosmosCRUDBad:
def __init__(self, client: CosmosClient):
self.client = client
loop = asyncio.get_event_loop() # 或者创建一个新的loop
# 这种方式会阻塞当前线程,直到异步操作完成
self.database = loop.run_until_complete(
self.client.create_database_if_not_exists("MY_DATABASE_NAME")
)
self.container = loop.run_until_complete(
self.database.create_container_if_not_exists("MY_CONTAINER_NAME", partition_key={"kind": "Hash", "paths": ["/id"]})
)弊端:
虽然依赖注入是一种优秀的设计模式,用于管理组件间的依赖关系,但它本身并不能直接解决在__init__中执行异步初始化逻辑的问题。如果被注入的依赖本身需要异步初始化,那么问题只是转移了,而不是解决了。例如,如果你注入的是一个已经完全初始化的Cosmos DB容器对象,那么这个容器对象的初始化过程仍然需要在某个异步上下文中完成。
最佳实践是保持__init__方法的轻量和同步,仅用于设置实例的基本属性,而将所有异步初始化逻辑封装在一个单独的异步工厂方法中。
以下是使用异步工厂方法模式重构CosmosCRUD类的示例:
import asyncio
from azure.cosmos.aio import CosmosClient, DatabaseProxy, ContainerProxy
class CosmosCRUD:
def __init__(self, client: CosmosClient, database: DatabaseProxy, container: ContainerProxy):
"""
同步构造函数,仅用于接收已初始化的依赖。
"""
self.client = client
self.database = database
self.container = container
print("CosmosCRUD 实例已同步创建。")
@classmethod
async def create(cls, client: CosmosClient, db_name: str, container_name: str, partition_key_path: str = "/id"):
"""
异步工厂方法,负责所有异步初始化逻辑。
"""
print(f"正在异步初始化 CosmosCRUD 实例...")
database = await client.create_database_if_not_exists(db_name)
container = await database.create_container_if_not_exists(
container_name, partition_key={"kind": "Hash", "paths": [partition_key_path]}
)
# 初始化完成后,调用同步构造函数创建并返回实例
return cls(client, database, container)
async def create_item(self, item: dict):
"""
示例:异步创建文档
"""
response = await self.container.create_item(item, enable_automatic_id_generation=True)
print(f"创建项成功: {response['id']}")
return response
async def read_item(self, item_id: str, partition_key: str):
"""
示例:异步读取文档
"""
response = await self.container.read_item(item_id, partition_key=partition_key)
print(f"读取项成功: {response['id']}")
return response
# 如何使用这个类
async def main():
# 假设 client 已经是一个异步 CosmosClient 实例
# 通常在应用启动时创建一次
# client = CosmosClient(url="YOUR_COSMOS_URL", credential="YOUR_COSMOS_KEY")
# 为了演示,我们使用一个模拟的客户端
class MockCosmosClient:
async def create_database_if_not_exists(self, name):
print(f"模拟:创建或获取数据库 '{name}'")
return MockDatabaseProxy(name)
class MockDatabaseProxy:
def __init__(self, name): self.name = name
async def create_container_if_not_exists(self, name, partition_key):
print(f"模拟:创建或获取容器 '{name}' (数据库: {self.name})")
return MockContainerProxy(name)
class MockContainerProxy:
def __init__(self, name): self.name = name
async def create_item(self, item, enable_automatic_id_generation):
print(f"模拟:创建项 {item}")
return {"id": "mock_id_123", **item}
async def read_item(self, item_id, partition_key):
print(f"模拟:读取项 {item_id} (分区键: {partition_key})")
return {"id": item_id, "data": "some_data", "pk": partition_key}
mock_client = MockCosmosClient()
# 使用异步工厂方法创建实例
crud_manager = await CosmosCRUD.create(
client=mock_client,
db_name="MyAwesomeDB",
container_name="MyContainer",
partition_key_path="/pk"
)
# 现在可以使用 crud_manager 实例进行异步操作
new_item = await crud_manager.create_item({"pk": "category1", "name": "Product A"})
await crud_manager.read_item(new_item["id"], "category1")
if __name__ == "__main__":
asyncio.run(main())在 FastAPI 项目中,这种模式与依赖注入(Dependency Injection)机制结合使用效果更佳。你可以在应用启动时(例如使用lifespan事件)创建并初始化一次CosmosCRUD实例,然后将其作为全局依赖提供给路由函数。
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
# 假设 CosmosClient 已经初始化
# cosmos_client = CosmosClient(...)
@asynccontextmanager
async def lifespan(app: FastAPI):
# 应用程序启动时运行
print("应用启动中...")
global crud_manager_instance
# 异步创建 CosmosCRUD 实例
crud_manager_instance = await CosmosCRUD.create(
client=cosmos_client,
db_name="MyAwesomeDB",
container_name="MyContainer",
partition_key_path="/pk"
)
yield
# 应用程序关闭时运行
print("应用关闭中...")
# 这里可以添加资源清理逻辑,例如关闭 CosmosClient
# await cosmos_client.close()
app = FastAPI(lifespan=lifespan)
# 依赖函数,提供 CosmosCRUD 实例
async def get_cosmos_crud() -> CosmosCRUD:
return crud_manager_instance
@app.get("/items/{item_id}")
async def read_item_route(item_id: str, pk: str, crud: CosmosCRUD = Depends(get_cosmos_crud)):
item = await crud.read_item(item_id, pk)
return {"message": "Item retrieved", "item": item}
@app.post("/items/")
async def create_item_route(item_data: dict, crud: CosmosCRUD = Depends(get_cosmos_crud)):
new_item = await crud.create_item(item_data)
return {"message": "Item created", "item": new_item}
注意事项:
在Python异步编程中,避免在类的__init__方法中执行异步操作是至关重要的设计原则。通过采用异步工厂方法模式,我们可以清晰地分离同步实例化和异步初始化逻辑,从而构建出高性能、可维护且符合异步范式的应用程序。这种模式不仅解决了await在__init__中的限制,也提升了代码的整体质量和可读性。
以上就是Python异步类构造器设计模式:避免在__init__中执行异步操作的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号