Python async/await 协程:CPU密集型任务的陷阱与解决方案

心靈之曲
发布: 2025-12-01 11:55:02
原创
161人浏览过

python async/await 协程:cpu密集型任务的陷阱与解决方案

Python的`async/await`机制旨在通过协程实现并发,但其工作原理在处理CPU密集型任务时常引人困惑。本文将深入探讨为何`await`一个纯计算任务无法交出控制权,而`await asyncio.sleep(0)`却能实现任务切换。我们将剖析事件循环的协作机制,并提供针对CPU密集型任务的正确并发策略,帮助开发者避免常见陷阱。

1. asyncio 与协程概述

asyncio是Python中用于编写并发代码的库,它通过事件循环(event loop)和协程(coroutines)实现单线程内的协作式多任务。async/await语法是定义和等待协程的核心。其设计初衷是为了高效处理I/O密集型任务,例如网络请求、文件读写等。当一个协程遇到I/O操作并使用await关键字时,它会将控制权交还给事件循环,允许事件循环调度其他“准备就绪”的协程运行,从而避免了线程切换的开销,提高了资源利用率。

2. CPU密集型任务与 await 的误区

许多开发者在初次接触asyncio时,会误以为只要在函数前加上async,并在调用时使用await,就能实现任务的并发交替执行。然而,这并非总是如此,尤其是在处理CPU密集型任务时。

考虑以下示例代码:

立即学习Python免费学习笔记(深入)”;

import asyncio
import time

async def long_function():
    """一个纯粹的CPU密集型任务,不涉及任何I/O或异步操作。"""
    print(f"Task {asyncio.current_task().get_name()}: long_function started...")
    for _ in range(50_000_000): # 大量循环,模拟耗时计算
        pass
    print(f"Task {asyncio.current_task().get_name()}: long_function finished.")

async def count_blocking():
    """包含阻塞性CPU任务的协程。"""
    for x in range(3):
        print(f"Count {x} in {asyncio.current_task().get_name()}")
        await long_function() # 等待一个CPU密集型任务

async def main_blocking():
    """主协程,启动两个阻塞性计数任务。"""
    task1 = asyncio.create_task(count_blocking(), name="Count-A")
    task2 = asyncio.create_task(count_blocking(), name="Count-B")
    await asyncio.gather(task1, task2)

if __name__ == "__main__":
    start_time = time.perf_counter()
    asyncio.run(main_blocking())
    end_time = time.perf_counter()
    print(f"\n总执行时间 (阻塞): {end_time - start_time:.2f} 秒")
登录后复制

运行上述代码,你会发现输出结果是:一个count_blocking协程会完全执行完毕,包括其内部的long_function的所有迭代,然后另一个count_blocking协程才开始执行。输出顺序会是:

Count 0 in Count-A
Task Count-A: long_function started...
Task Count-A: long_function finished.
Count 1 in Count-A
Task Count-A: long_function started...
Task Count-A: long_function finished.
Count 2 in Count-A
Task Count-A: long_function started...
Task Count-A: long_function finished.
Count 0 in Count-B
Task Count-B: long_function started...
Task Count-B: long_function finished.
...
登录后复制

这与我们期望的交替输出(如0、0、1、1...)大相径庭。原因是await关键字本身并不具备“中断”正在执行的函数的能力。它仅仅表示“我正在等待某个异步操作完成,在此期间,你可以去执行其他准备就绪的协程”。而long_function内部是一个纯粹的计算循环,它没有任何I/O操作,也没有主动向事件循环报告它正在“等待”什么。因此,一旦事件循环将控制权交给long_function,它就会一直运行直到计算完成,期间不会释放控制权,从而阻塞了整个事件循环。

3. asyncio.sleep(0) 的作用:显式交出控制权

为了实现CPU密集型任务的协作式并发,我们需要在耗时计算中显式地将控制权交还给事件循环。asyncio.sleep(0)就是实现这一目的的常用技巧。

import asyncio
import time

async def long_function_cooperative():
    """一个协作式的CPU密集型任务,周期性地交出控制权。"""
    task_name = asyncio.current_task().get_name()
    # print(f"Task {task_name}: long_function_cooperative started...")
    for i in range(50_000_000):
        # 每隔一定次数的循环,显式地交出控制权
        if i % 10_000_000 == 0 and i != 0:
            # print(f"Task {task_name}: Yielding at iteration {i}")
            await asyncio.sleep(0) # 关键:交出控制权
    print(f"Task {task_name}: long_function_cooperative finished.")

async def count_cooperative():
    """包含协作性CPU任务的协程。"""
    for x in range(3):
        print(f"Count {x} in {asyncio.current_task().get_name()}")
        await long_function_cooperative() # 等待一个协作性CPU任务

async def main_cooperative():
    """主协程,启动两个协作性计数任务。"""
    task1 = asyncio.create_task(count_cooperative(), name="Count-X")
    task2 = asyncio.create_task(count_cooperative(), name="Count-Y")
    await asyncio.gather(task1, task2)

if __name__ == "__main__":
    start_time = time.perf_counter()
    asyncio.run(main_cooperative())
    end_time = time.perf_counter()
    print(f"\n总执行时间 (协作): {end_time - start_time:.2f} 秒")
登录后复制

现在,运行这段代码,你会看到期望的交替输出:

Count 0 in Count-X
Count 0 in Count-Y
Count 1 in Count-X
Count 1 in Count-Y
Count 2 in Count-X
Count 2 in Count-Y
Task Count-X: long_function_cooperative finished.
Task Count-Y: long_function_cooperative finished.
...
登录后复制

await asyncio.sleep(0)的原理是:它是一个非阻塞的异步操作,告诉事件循环“我暂时不需要CPU,你可以去检查是否有其他协程准备好了”。即使是sleep(0),它也触发了事件循环的调度机制,允许其他等待中的协程获得执行机会。这正是asyncio协作式多任务的核心体现。

Remove.bg
Remove.bg

AI在线抠图软件,图片去除背景

Remove.bg 174
查看详情 Remove.bg

4. 事件循环的工作机制

asyncio的事件循环是单线程的,它维护一个任务队列。当一个协程通过await等待一个异步操作(如网络I/O、定时器或asyncio.sleep(0))时,它会暂停执行,并将控制权交还给事件循环。事件循环会检查任务队列,选择下一个“准备就绪”的协程来运行。

  • I/O密集型任务: 当协程等待网络响应时,操作系统会处理网络通信,而Python线程可以去执行其他协程。当网络数据到达时,事件循环会收到通知,然后将等待该数据的协程标记为“准备就绪”,并在合适的时机重新调度它。
  • CPU密集型任务: 如果一个协程正在执行纯粹的CPU计算,它不会自动释放控制权。它会一直占用CPU,直到计算完成。除非它内部显式地调用await一个异步操作(如asyncio.sleep(0)),否则事件循环无法介入并切换到其他协程。

因此,asyncio的并发性是“协作式”的,而不是“抢占式”的。协程必须主动选择何时交出控制权。

5. CPU密集型任务的真正解决方案

虽然asyncio.sleep(0)可以在一定程度上缓解CPU密集型任务的阻塞问题,但它并不能真正实现并行计算,因为asyncio事件循环仍然运行在单个线程中。对于需要充分利用多核CPU的重度CPU密集型任务,真正的解决方案是使用多进程(multiprocessing)。

concurrent.futures模块提供了ProcessPoolExecutor,可以方便地将CPU密集型任务提交到独立的进程中执行,从而绕过Python的全局解释器锁(GIL)限制,实现真正的并行。

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

def blocking_cpu_task(task_id, iterations):
    """一个阻塞的CPU密集型函数,适合在进程池中运行。"""
    print(f"Process {task_id}: Starting CPU-bound task with {iterations} iterations...")
    result = 0
    for i in range(iterations):
        result += i # 执行一些计算
    print(f"Process {task_id}: Finished CPU-bound task. Result: {result % 1000}")
    return f"Task {task_id} completed."

async def run_cpu_tasks_with_pool():
    """使用ProcessPoolExecutor异步运行CPU密集型任务。"""
    # 使用ProcessPoolExecutor创建进程池,max_workers=None表示使用CPU核心数
    with ProcessPoolExecutor(max_workers=2) as executor:
        loop = asyncio.get_running_loop()

        # 将CPU密集型任务提交到进程池,并等待其完成
        tasks = [
            loop.run_in_executor(executor, blocking_cpu_task, "Alpha", 50_000_000),
            loop.run_in_executor(executor, blocking_cpu_task, "Beta", 50_000_000)
        ]

        results = await asyncio.gather(*tasks)
        print("\n所有CPU密集型任务通过ProcessPoolExecutor完成:")
        for res in results:
            print(res)

if __name__ == "__main__":
    start_time = time.perf_counter()
    asyncio.run(run_cpu_tasks_with_pool())
    end_time = time.perf_counter()
    print(f"\n总执行时间 (ProcessPoolExecutor): {end_time - start_time:.2f} 秒")
登录后复制

运行此代码,你会看到两个blocking_cpu_task几乎同时开始执行,并且总执行时间会接近单个任务的执行时间,因为它们在不同的CPU核心上并行运行。

注意事项:

  • ThreadPoolExecutor vs ProcessPoolExecutor: concurrent.futures.ThreadPoolExecutor用于线程池。虽然它可以用于将阻塞I/O操作移出主事件循环,但由于GIL的存在,对于纯Python的CPU密集型任务,线程池无法实现真正的并行计算。因此,对于CPU密集型任务,应优先考虑ProcessPoolExecutor。
  • 任务粒度: 如果CPU密集型任务可以被细分为许多小块,并且每小块的执行时间较短,那么在每小块结束后插入await asyncio.sleep(0)可能是一种权宜之计,但它增加了上下文切换的开销。对于长时间运行的、不可中断的CPU计算,使用进程池是更健壮的选择。

6. 总结

asyncio和async/await是Python实现高效并发的强大工具,但它们主要适用于I/O密集型任务。理解其协作式多任务的本质至关重要:

  • await关键字只有在等待一个异步操作(如I/O、定时器或显式地交出控制权)时,才会让出控制权。
  • 纯粹的CPU密集型计算会阻塞整个asyncio事件循环,直到其完成。
  • await asyncio.sleep(0)可以作为一种显式交出控制权的机制,使事件循环有机会调度其他协程。
  • 对于需要真正并行执行的CPU密集型任务,应使用concurrent.futures.ProcessPoolExecutor将任务提交到独立的进程中运行。

正确区分任务类型并选择合适的并发策略,是编写高效、响应迅速的Python异步应用程序的关键。

以上就是Python async/await 协程:CPU密集型任务的陷阱与解决方案的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号