
Python的`async/await`机制旨在通过协程实现并发,但其工作原理在处理CPU密集型任务时常引人困惑。本文将深入探讨为何`await`一个纯计算任务无法交出控制权,而`await asyncio.sleep(0)`却能实现任务切换。我们将剖析事件循环的协作机制,并提供针对CPU密集型任务的正确并发策略,帮助开发者避免常见陷阱。
asyncio是Python中用于编写并发代码的库,它通过事件循环(event loop)和协程(coroutines)实现单线程内的协作式多任务。async/await语法是定义和等待协程的核心。其设计初衷是为了高效处理I/O密集型任务,例如网络请求、文件读写等。当一个协程遇到I/O操作并使用await关键字时,它会将控制权交还给事件循环,允许事件循环调度其他“准备就绪”的协程运行,从而避免了线程切换的开销,提高了资源利用率。
许多开发者在初次接触asyncio时,会误以为只要在函数前加上async,并在调用时使用await,就能实现任务的并发交替执行。然而,这并非总是如此,尤其是在处理CPU密集型任务时。
考虑以下示例代码:
立即学习“Python免费学习笔记(深入)”;
import asyncio
import time
async def long_function():
"""一个纯粹的CPU密集型任务,不涉及任何I/O或异步操作。"""
print(f"Task {asyncio.current_task().get_name()}: long_function started...")
for _ in range(50_000_000): # 大量循环,模拟耗时计算
pass
print(f"Task {asyncio.current_task().get_name()}: long_function finished.")
async def count_blocking():
"""包含阻塞性CPU任务的协程。"""
for x in range(3):
print(f"Count {x} in {asyncio.current_task().get_name()}")
await long_function() # 等待一个CPU密集型任务
async def main_blocking():
"""主协程,启动两个阻塞性计数任务。"""
task1 = asyncio.create_task(count_blocking(), name="Count-A")
task2 = asyncio.create_task(count_blocking(), name="Count-B")
await asyncio.gather(task1, task2)
if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(main_blocking())
end_time = time.perf_counter()
print(f"\n总执行时间 (阻塞): {end_time - start_time:.2f} 秒")运行上述代码,你会发现输出结果是:一个count_blocking协程会完全执行完毕,包括其内部的long_function的所有迭代,然后另一个count_blocking协程才开始执行。输出顺序会是:
Count 0 in Count-A Task Count-A: long_function started... Task Count-A: long_function finished. Count 1 in Count-A Task Count-A: long_function started... Task Count-A: long_function finished. Count 2 in Count-A Task Count-A: long_function started... Task Count-A: long_function finished. Count 0 in Count-B Task Count-B: long_function started... Task Count-B: long_function finished. ...
这与我们期望的交替输出(如0、0、1、1...)大相径庭。原因是await关键字本身并不具备“中断”正在执行的函数的能力。它仅仅表示“我正在等待某个异步操作完成,在此期间,你可以去执行其他准备就绪的协程”。而long_function内部是一个纯粹的计算循环,它没有任何I/O操作,也没有主动向事件循环报告它正在“等待”什么。因此,一旦事件循环将控制权交给long_function,它就会一直运行直到计算完成,期间不会释放控制权,从而阻塞了整个事件循环。
为了实现CPU密集型任务的协作式并发,我们需要在耗时计算中显式地将控制权交还给事件循环。asyncio.sleep(0)就是实现这一目的的常用技巧。
import asyncio
import time
async def long_function_cooperative():
"""一个协作式的CPU密集型任务,周期性地交出控制权。"""
task_name = asyncio.current_task().get_name()
# print(f"Task {task_name}: long_function_cooperative started...")
for i in range(50_000_000):
# 每隔一定次数的循环,显式地交出控制权
if i % 10_000_000 == 0 and i != 0:
# print(f"Task {task_name}: Yielding at iteration {i}")
await asyncio.sleep(0) # 关键:交出控制权
print(f"Task {task_name}: long_function_cooperative finished.")
async def count_cooperative():
"""包含协作性CPU任务的协程。"""
for x in range(3):
print(f"Count {x} in {asyncio.current_task().get_name()}")
await long_function_cooperative() # 等待一个协作性CPU任务
async def main_cooperative():
"""主协程,启动两个协作性计数任务。"""
task1 = asyncio.create_task(count_cooperative(), name="Count-X")
task2 = asyncio.create_task(count_cooperative(), name="Count-Y")
await asyncio.gather(task1, task2)
if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(main_cooperative())
end_time = time.perf_counter()
print(f"\n总执行时间 (协作): {end_time - start_time:.2f} 秒")现在,运行这段代码,你会看到期望的交替输出:
Count 0 in Count-X Count 0 in Count-Y Count 1 in Count-X Count 1 in Count-Y Count 2 in Count-X Count 2 in Count-Y Task Count-X: long_function_cooperative finished. Task Count-Y: long_function_cooperative finished. ...
await asyncio.sleep(0)的原理是:它是一个非阻塞的异步操作,告诉事件循环“我暂时不需要CPU,你可以去检查是否有其他协程准备好了”。即使是sleep(0),它也触发了事件循环的调度机制,允许其他等待中的协程获得执行机会。这正是asyncio协作式多任务的核心体现。
asyncio的事件循环是单线程的,它维护一个任务队列。当一个协程通过await等待一个异步操作(如网络I/O、定时器或asyncio.sleep(0))时,它会暂停执行,并将控制权交还给事件循环。事件循环会检查任务队列,选择下一个“准备就绪”的协程来运行。
因此,asyncio的并发性是“协作式”的,而不是“抢占式”的。协程必须主动选择何时交出控制权。
虽然asyncio.sleep(0)可以在一定程度上缓解CPU密集型任务的阻塞问题,但它并不能真正实现并行计算,因为asyncio事件循环仍然运行在单个线程中。对于需要充分利用多核CPU的重度CPU密集型任务,真正的解决方案是使用多进程(multiprocessing)。
concurrent.futures模块提供了ProcessPoolExecutor,可以方便地将CPU密集型任务提交到独立的进程中执行,从而绕过Python的全局解释器锁(GIL)限制,实现真正的并行。
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
def blocking_cpu_task(task_id, iterations):
"""一个阻塞的CPU密集型函数,适合在进程池中运行。"""
print(f"Process {task_id}: Starting CPU-bound task with {iterations} iterations...")
result = 0
for i in range(iterations):
result += i # 执行一些计算
print(f"Process {task_id}: Finished CPU-bound task. Result: {result % 1000}")
return f"Task {task_id} completed."
async def run_cpu_tasks_with_pool():
"""使用ProcessPoolExecutor异步运行CPU密集型任务。"""
# 使用ProcessPoolExecutor创建进程池,max_workers=None表示使用CPU核心数
with ProcessPoolExecutor(max_workers=2) as executor:
loop = asyncio.get_running_loop()
# 将CPU密集型任务提交到进程池,并等待其完成
tasks = [
loop.run_in_executor(executor, blocking_cpu_task, "Alpha", 50_000_000),
loop.run_in_executor(executor, blocking_cpu_task, "Beta", 50_000_000)
]
results = await asyncio.gather(*tasks)
print("\n所有CPU密集型任务通过ProcessPoolExecutor完成:")
for res in results:
print(res)
if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(run_cpu_tasks_with_pool())
end_time = time.perf_counter()
print(f"\n总执行时间 (ProcessPoolExecutor): {end_time - start_time:.2f} 秒")运行此代码,你会看到两个blocking_cpu_task几乎同时开始执行,并且总执行时间会接近单个任务的执行时间,因为它们在不同的CPU核心上并行运行。
注意事项:
asyncio和async/await是Python实现高效并发的强大工具,但它们主要适用于I/O密集型任务。理解其协作式多任务的本质至关重要:
正确区分任务类型并选择合适的并发策略,是编写高效、响应迅速的Python异步应用程序的关键。
以上就是Python async/await 协程:CPU密集型任务的陷阱与解决方案的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号