0

0

Python Asyncio:优雅地管理与终止长时间运行的任务

霞舞

霞舞

发布时间:2025-07-22 14:40:14

|

246人浏览过

|

来源于php中文网

原创

python asyncio:优雅地管理与终止长时间运行的任务

本文旨在探讨在Python asyncio异步编程中,如何有效管理和终止可能长时间阻塞的任务,以避免程序无限期等待。我们将重点介绍 asyncio.wait 和 asyncio.wait_for 这两个关键工具,它们提供了设置任务超时机制的能力。通过详细的代码示例和最佳实践,您将学会如何确保异步应用程序在预设时间内响应并关闭,即使某些I/O操作不活跃。

1. 问题背景:asyncio.gather 的局限性

在 asyncio 应用中,asyncio.gather(*coros) 是一个常用的工具,用于并发运行多个协程或任务,并等待它们全部完成。然而,当这些任务中包含长时间阻塞的I/O操作(例如,等待网络消息但消息不频繁)时,gather 会无限期地等待下去,即使您希望程序在一定时间后停止。

考虑以下场景:

import asyncio

stop = False

async def watch_task1(client):
    while not stop:
        # 假设 client.ws.get_data() 可能长时间没有数据返回
        await client.ws.get_data()
        print("Task 1 received data")

async def watch_task2(client):
    while not stop:
        # 假设 client.ws.get_news() 也可能长时间没有数据返回
        await client.ws.get_news()
        print("Task 2 received news")

async def stop_after(delay_seconds):
    global stop
    await asyncio.sleep(delay_seconds)
    print(f"Stopping after {delay_seconds} seconds...")
    stop = True

class MockClient:
    async def get_data(self):
        await asyncio.sleep(100) # 模拟长时间阻塞
    async def get_news(self):
        await asyncio.sleep(100) # 模拟长时间阻塞
    async def sleep(self, delay):
        await asyncio.sleep(delay)

async def main_gather():
    client = MockClient()
    tasks = [
        watch_task1(client),
        watch_task2(client),
        stop_after(5), # 尝试在5秒后停止
    ]

    try:
        # 使用 gather,即使 stop 变为 True,阻塞的 get_data/get_news 仍会阻止 gather 完成
        await asyncio.gather(*tasks, return_exceptions=True)
    except Exception as e:
        print(f"An exception occurred: {e}")
    print("Main gather finished.")

# 运行 main_gather() 会发现程序在5秒后并不会立即停止,而是会等待 get_data/get_news 结束

在这个例子中,即使 stop_after 在5秒后将 stop 标志设置为 True,watch_task1 和 watch_task2 中的 await client.ws.get_data() 或 await client.ws.get_news() 仍然可能处于阻塞状态,导致 asyncio.gather 无法按时完成。

2. 解决方案:使用 asyncio.wait 进行超时控制

为了解决上述问题,asyncio 提供了更灵活的等待机制:asyncio.wait。它允许您设置一个总体的超时时间,并在超时后返回已完成和未完成的任务集。

立即学习Python免费学习笔记(深入)”;

2.1 asyncio.wait 概述

asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED) 函数用于并发运行 aws(一个可等待对象集合),并等待它们中的一部分或全部完成。

SteveAI
SteveAI

Animaker旗下AI在线视频制作工具,能够在几分钟内创建专业视频。

下载
  • aws: 一个由协程、任务或 Future 组成的集合。
  • timeout: 可选参数,指定等待的最大秒数。如果超时,函数会立即返回。
  • return_when: 可选参数,定义何时返回。常用值包括:
    • asyncio.ALL_COMPLETED (默认): 等待所有任务完成。
    • asyncio.FIRST_COMPLETED: 只要有一个任务完成就返回。
    • asyncio.FIRST_EXCEPTION: 只要有一个任务抛出异常就返回。

asyncio.wait 返回两个集合:done(已完成的任务)和 pending(未完成的任务)。

2.2 实现超时停止逻辑

import asyncio

stop = False # 这是一个共享状态,用于控制协程的内部循环

async def watch_task1(client):
    try:
        while not stop:
            print("Task 1: Waiting for data...")
            await client.ws.get_data() # 可能会阻塞
            print("Task 1: Data received.")
    except asyncio.CancelledError:
        print("Task 1: Cancelled.")
    finally:
        print("Task 1: Exiting.")

async def watch_task2(client):
    try:
        while not stop:
            print("Task 2: Waiting for news...")
            await client.ws.get_news() # 可能会阻塞
            print("Task 2: News received.")
    except asyncio.CancelledError:
        print("Task 2: Cancelled.")
    finally:
        print("Task 2: Exiting.")

# MockClient 保持不变
class MockClient:
    async def get_data(self):
        # 模拟长时间阻塞,但为了演示,将其缩短
        await asyncio.sleep(5)
    async def get_news(self):
        # 模拟长时间阻塞
        await asyncio.sleep(5)
    async def sleep(self, delay):
        await asyncio.sleep(delay)

async def main_wait_timeout():
    client = MockClient()
    tasks_to_run = [
        asyncio.create_task(watch_task1(client)), # 显式创建任务
        asyncio.create_task(watch_task2(client)),
    ]

    print("Starting tasks with a 3-second timeout...")
    # 设置一个全局超时,例如3秒
    done, pending = await asyncio.wait(tasks_to_run, timeout=3, return_when=asyncio.ALL_COMPLETED)

    print("\n--- After asyncio.wait ---")
    print(f"Completed tasks ({len(done)}):")
    for task in done:
        try:
            # 获取任务结果,如果任务抛出异常,这里会重新抛出
            result = task.result()
            print(f"  Task finished successfully: {task.get_name()}, Result: {result}")
        except asyncio.CancelledError:
            print(f"  Task {task.get_name()} was cancelled (expected for pending tasks).")
        except Exception as e:
            print(f"  Task {task.get_name()} raised an exception: {e}")

    print(f"\nPending tasks ({len(pending)}):")
    for task in pending:
        print(f"  Task {task.get_name()} is still pending. Cancelling...")
        task.cancel() # 取消未完成的任务
        try:
            await task # 等待任务真正结束,以便其处理 CancelledError
        except asyncio.CancelledError:
            print(f"  Task {task.get_name()} successfully cancelled and cleaned up.")
        except Exception as e:
            print(f"  Task {task.get_name()} raised an exception during cancellation cleanup: {e}")

    print("Main wait_timeout finished.")

# 运行主函数
if __name__ == "__main__":
    asyncio.run(main_wait_timeout())

代码解释:

  1. 显式创建任务: 在 main_wait_timeout 中,我们使用 asyncio.create_task() 将协程包装成任务。这是推荐的做法,因为 asyncio.wait 接受任务或 Future 对象。
  2. 设置超时: await asyncio.wait(tasks_to_run, timeout=3) 会在3秒后返回,无论 watch_task 是否完成其内部的 await client.ws.get_data()。
  3. 处理 done 集合: 遍历 done 集合中的任务,通过 task.result() 获取其结果或捕获可能抛出的异常。
  4. 处理 pending 集合: 遍历 pending 集合中的任务。这些任务在超时时仍未完成。为了确保资源被释放,必须对它们调用 task.cancel()。
  5. 协程中的 CancelledError: 当一个任务被 cancel() 时,它会向任务内部抛出一个 asyncio.CancelledError。任务内部的协程应该捕获这个异常,并执行必要的清理工作(例如关闭文件句柄、网络连接等)。在上述 watch_task 示例中,我们添加了 try...except asyncio.CancelledError...finally 块来演示这一点。
  6. 等待任务真正结束: 在 task.cancel() 之后,最好 await task 一下,确保任务有时间处理 CancelledError 并完成其清理逻辑。

3. 替代方案:asyncio.wait_for

如果只需要为单个协程设置超时,可以使用 asyncio.wait_for(aw, timeout)。

async def example_wait_for():
    client = MockClient()
    try:
        print("Attempting to get data with a 2-second timeout...")
        # watch_task1 内部的循环会因为超时而中断
        await asyncio.wait_for(watch_task1(client), timeout=2)
        print("watch_task1 finished within timeout.")
    except asyncio.TimeoutError:
        print("watch_task1 timed out!")
    except Exception as e:
        print(f"watch_task1 raised an unexpected exception: {e}")
    print("Example wait_for finished.")

# 如果在 main_wait_timeout 之后运行
# asyncio.run(example_wait_for())

asyncio.wait_for 会在超时时抛出 asyncio.TimeoutError。如果被包装的协程内部没有处理 CancelledError,那么在 TimeoutError 抛出后,被包装的协程实际上可能还在后台运行,直到其下一个 await 点。因此,在使用 wait_for 时,被包装的协程也应该能够响应取消。

4. 注意事项与最佳实践

  • 任务取消的响应: 这是 asyncio 中非常重要的一点。当一个任务被 cancel() 时,它不会立即停止,而是在其下一个 await 点抛出 asyncio.CancelledError。任务的编写者必须在协程内部捕获 CancelledError,并执行必要的清理工作。如果协程不处理 CancelledError,则在取消后可能会继续运行,直到自然结束或遇到下一个 await 点。
  • 资源清理: 在 finally 块中进行资源清理是良好的实践,无论任务是正常完成还是被取消。
  • 选择 wait 还是 wait_for:
    • asyncio.wait_for 适用于为单个协程设置超时,并在超时时抛出 TimeoutError。
    • asyncio.wait 适用于管理一组协程/任务,并提供更细粒度的控制,如 return_when 参数,以及返回已完成和未完成任务的集合,方便后续处理(如取消未完成的任务)。
  • return_exceptions=True 与 task.result():
    • asyncio.gather 的 return_exceptions=True 会将协程中抛出的异常作为结果返回,而不是直接抛出。
    • 对于 asyncio.wait 返回的 done 任务,调用 task.result() 会重新抛出任务内部发生的任何异常,或者返回其结果。这是检查任务是否成功完成的推荐方式。

5. 总结

在 asyncio 应用程序中,有效管理和终止长时间运行或可能阻塞的任务至关重要。asyncio.wait 提供了一个强大的机制,允许您在指定超时后获取已完成和未完成的任务,并通过 task.cancel() 优雅地终止未完成的任务。结合任务内部对 asyncio.CancelledError 的处理,您可以构建出更加健壮、响应迅速的异步应用程序。记住,正确的取消处理和资源清理是编写高质量 asyncio 代码的关键。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
C++ 设计模式与软件架构
C++ 设计模式与软件架构

本专题深入讲解 C++ 中的常见设计模式与架构优化,包括单例模式、工厂模式、观察者模式、策略模式、命令模式等,结合实际案例展示如何在 C++ 项目中应用这些模式提升代码可维护性与扩展性。通过案例分析,帮助开发者掌握 如何运用设计模式构建高质量的软件架构,提升系统的灵活性与可扩展性。

8

2026.01.30

c++ 字符串格式化
c++ 字符串格式化

本专题整合了c++字符串格式化用法、输出技巧、实践等等内容,阅读专题下面的文章了解更多详细内容。

8

2026.01.30

java 字符串格式化
java 字符串格式化

本专题整合了java如何进行字符串格式化相关教程、使用解析、方法详解等等内容。阅读专题下面的文章了解更多详细教程。

6

2026.01.30

python 字符串格式化
python 字符串格式化

本专题整合了python字符串格式化教程、实践、方法、进阶等等相关内容,阅读专题下面的文章了解更多详细操作。

1

2026.01.30

java入门学习合集
java入门学习合集

本专题整合了java入门学习指南、初学者项目实战、入门到精通等等内容,阅读专题下面的文章了解更多详细学习方法。

20

2026.01.29

java配置环境变量教程合集
java配置环境变量教程合集

本专题整合了java配置环境变量设置、步骤、安装jdk、避免冲突等等相关内容,阅读专题下面的文章了解更多详细操作。

17

2026.01.29

java成品学习网站推荐大全
java成品学习网站推荐大全

本专题整合了java成品网站、在线成品网站源码、源码入口等等相关内容,阅读专题下面的文章了解更多详细推荐内容。

18

2026.01.29

Java字符串处理使用教程合集
Java字符串处理使用教程合集

本专题整合了Java字符串截取、处理、使用、实战等等教程内容,阅读专题下面的文章了解详细操作教程。

3

2026.01.29

Java空对象相关教程合集
Java空对象相关教程合集

本专题整合了Java空对象相关教程,阅读专题下面的文章了解更多详细内容。

6

2026.01.29

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.4万人学习

Django 教程
Django 教程

共28课时 | 3.7万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号