
本文将深入探讨在FastAPI应用中如何正确且高效地利用`ProcessPoolExecutor`与`asyncio.run_in_executor`实现CPU密集型任务的异步并发处理。核心在于通过FastAPI的`lifespan`事件管理`ProcessPoolExecutor`的生命周期,确保其作为单例在应用启动时创建并优雅关闭,从而避免重复创建进程带来的巨大性能开销和API阻塞问题。
在构建高性能的异步Web服务(如基于FastAPI)时,经常会遇到需要执行CPU密集型任务的场景,例如大规模数据处理、复杂计算或正则表达式匹配等。如果这些任务直接在主事件循环中执行,会导致事件循环阻塞,进而使整个API响应变慢甚至无响应。Python的asyncio库提供了run_in_executor方法,允许我们将阻塞型或CPU密集型任务 offload 到一个独立的线程池(ThreadPoolExecutor)或进程池(ProcessPoolExecutor)中执行,从而不阻塞主事件循环。
然而,在使用ProcessPoolExecutor时,如果不正确地管理其生命周期,反而可能引入新的性能问题。一个常见的错误是在每个API请求中实例化ProcessPoolExecutor。进程的创建和销毁是一个相对“昂贵”的操作,如果每个请求都创建新的进程,将导致:
解决上述问题的关键在于确保ProcessPoolExecutor在整个应用生命周期中只被创建一次,并作为共享资源供所有请求使用。FastAPI提供了lifespan事件管理机制,允许我们在应用启动时执行初始化操作,并在应用关闭时执行清理操作,这正是管理ProcessPoolExecutor的理想场所。
首先,我们需要一个全局变量来持有ProcessPoolExecutor实例。
import asyncio
import concurrent.futures
import functools
import re
from fastapi import FastAPI
from contextlib import asynccontextmanager
# 定义一个全局变量来持有进程池实例
process_pool: concurrent.futures.ProcessPoolExecutor = None
# 辅助函数:将同步任务提交到执行器
async def executor_task(fn, executor: concurrent.futures.Executor = None):
"""
将一个同步函数提交到指定的执行器中运行。
如果未指定执行器,将使用默认的线程池。
"""
event_loop = asyncio.get_event_loop()
return await event_loop.run_in_executor(executor, fn)
# 示例:内容分块函数 (与问题原文保持一致,假设已存在)
def split_on_whitespace(content: str, count: int = 6): # 假设count为默认值
if not content: return ['' for _ in range(count)]
# 简化实现,实际可能需要更复杂的逻辑来确保分块有效
chunk_size = len(content) // count
return [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
# 示例:在内容块上运行正则表达式匹配 (与问题原文保持一致)
def run_regex_on_content_chunk(content: str):
domains = []
domain_patt = re.compile(r'([a-zA-Z0-9\-_]+\.){1,}[a-zA-Z0-9\-_]+')
for match in domain_patt.finditer(content): # 使用finditer更高效
domains.append(match.group(0))
return domainsasynccontextmanager装饰器允许我们创建一个异步上下文管理器,它可以在应用启动时执行初始化代码(进入上下文),并在应用关闭时执行清理代码(退出上下文)。
@asynccontextmanager
async def executor_pool_lifespan(app: FastAPI):
"""
FastAPI应用的生命周期管理器,用于初始化和关闭ProcessPoolExecutor。
"""
global process_pool
# 根据CPU核心数和任务类型设置合适的worker数量
# 对于CPU密集型任务,通常不应超过CPU核心数
# 对于混合型或有I/O等待的任务,可以适当增加
nworkers = 6 # 示例值,实际应根据服务器CPU核心数和负载进行调整
process_pool = concurrent.futures.ProcessPoolExecutor(max_workers=nworkers)
print(f"ProcessPoolExecutor initialized with {nworkers} workers.")
try:
yield # FastAPI在此点启动服务器并运行应用
finally:
# 应用关闭时,优雅地关闭进程池
print("Shutting down ProcessPoolExecutor...")
process_pool.shutdown(wait=True) # 等待所有提交的任务完成
print("ProcessPoolExecutor shut down.")
# 在创建FastAPI应用实例时,指定lifespan
app = FastAPI(lifespan=executor_pool_lifespan)max_workers的注意事项:
正好p2p网贷系统是一个以Java进行开发的免费网贷系统,软件包含了运行环境和相关源码。高速缓存+异步处理。高效,稳定。杜绝操作中断引起的各种问题,无需漫长的操作等待,能承受超大并发。测底解决网贷抢标卡现状。
653
现在,我们的API路由可以安全地使用全局的process_pool来 offload CPU密集型任务。
@app.post("/addContent")
async def add_content(content_data: dict):
"""
接收内容并使用进程池异步处理正则表达式匹配。
"""
all_content = content_data.get('data', '')
if not all_content:
return {"message": "No content provided", "domains": []}
# 将内容分割成多个块
# 这里的nworkers应该与ProcessPoolExecutor的max_workers保持一致或根据需求调整
# 确保分块数量与worker数量匹配或适当倍数
content_chunks = split_on_whitespace(all_content, count=process_pool._max_workers) # 假设分块数量与worker数相同
async_tasks = []
for chunk in content_chunks:
# 使用functools.partial封装带参数的函数,使其成为无参数函数
regex_fn = functools.partial(run_regex_on_content_chunk, chunk)
# 将任务提交到全局的ProcessPoolExecutor
async_tasks.append(executor_task(regex_fn, process_pool))
# 等待所有进程任务完成
all_domains_lists = await asyncio.gather(*async_tasks)
# 合并所有结果
final_domains = [domain for sublist in all_domains_lists for domain in sublist]
return {"message": "Content processed successfully", "domains": final_domains}当使用multiprocessing模块(ProcessPoolExecutor底层使用)时,必须确保主应用代码(包括FastAPI实例的创建)仅在主进程中执行。这通常通过if __name__ == "__main__":保护块来实现。否则,子进程可能会尝试重新导入并执行主模块的代码,导致不可预测的行为,包括创建多个FastAPI服务器实例。
if __name__ == "__main__":
import uvicorn
# 启动Uvicorn服务器
uvicorn.run(app, host="0.0.0.0", port=8000)将以上所有部分整合,形成一个完整的FastAPI应用:
import asyncio
import concurrent.futures
import functools
import re
from fastapi import FastAPI
from contextlib import asynccontextmanager
# 定义一个全局变量来持有进程池实例
process_pool: concurrent.futures.ProcessPoolExecutor = None
# 辅助函数:将同步任务提交到执行器
async def executor_task(fn, executor: concurrent.futures.Executor = None):
"""
将一个同步函数提交到指定的执行器中运行。
如果未指定执行器,将使用默认的线程池。
"""
event_loop = asyncio.get_event_loop()
return await event_loop.run_in_executor(executor, fn)
# 示例:内容分块函数
def split_on_whitespace(content: str, count: int = 6):
if not content: return ['' for _ in range(count)]
length = len(content)
part_size = length // count
chunks = []
for i in range(count):
start = i * part_size
end = (i + 1) * part_size if i < count - 1 else length
chunks.append(content[start:end])
return chunks
# 示例:在内容块上运行正则表达式匹配
def run_regex_on_content_chunk(content: str):
domains = []
domain_patt = re.compile(r'([a-zA-Z0-9\-_]+\.){1,}[a-zA-Z0-9\-_]+')
for match in domain_patt.finditer(content):
domains.append(match.group(0))
return domains
@asynccontextmanager
async def executor_pool_lifespan(app: FastAPI):
"""
FastAPI应用的生命周期管理器,用于初始化和关闭ProcessPoolExecutor。
"""
global process_pool
nworkers = 6 # 建议根据CPU核心数调整
process_pool = concurrent.futures.ProcessPoolExecutor(max_workers=nworkers)
print(f"ProcessPoolExecutor initialized with {nworkers} workers.")
try:
yield # FastAPI在此点启动服务器并运行应用
finally:
print("Shutting down ProcessPoolExecutor...")
process_pool.shutdown(wait=True)
print("ProcessPoolExecutor shut down.")
# 在创建FastAPI应用实例时,指定lifespan
app = FastAPI(lifespan=executor_pool_lifespan)
@app.post("/addContent")
async def add_content(content_data: dict):
"""
接收内容并使用进程池异步处理正则表达式匹配。
"""
all_content = content_data.get('data', '')
if not all_content:
return {"message": "No content provided", "domains": []}
# 确保进程池已初始化
if process_pool is None:
return {"message": "Process pool not initialized", "domains": []}, 500
# 根据进程池的worker数量来分块
num_chunks = process_pool._max_workers
content_chunks = split_on_whitespace(all_content, count=num_chunks)
async_tasks = []
for chunk in content_chunks:
regex_fn = functools.partial(run_regex_on_content_chunk, chunk)
async_tasks.append(executor_task(regex_fn, process_pool))
all_domains_lists = await asyncio.gather(*async_tasks)
final_domains = [domain for sublist in all_domains_lists for domain in sublist]
return {"message": "Content processed successfully", "domains": final_domains}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)通过上述方法,我们实现了在FastAPI中高效管理ProcessPoolExecutor,从而将CPU密集型任务 offload 到独立的进程中执行,同时保持主事件循环的响应性。
关键点回顾:
进一步的思考:
以上就是FastAPI中高效管理ProcessPoolExecutor的异步并发实践的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号