0

0

Python asyncio并发任务的超时管理与优雅关闭策略

聖光之護

聖光之護

发布时间:2025-07-22 15:20:01

|

869人浏览过

|

来源于php中文网

原创

python asyncio并发任务的超时管理与优雅关闭策略

本文旨在解决 asyncio.gather 在处理长时间阻塞任务时无法按时终止的问题。通过深入探讨 asyncio.wait 方法,我们将学习如何为并发任务设置全局超时,并有效地管理已完成和未完成的任务。文章将提供详细的代码示例,指导读者如何优雅地取消超时任务,确保异步应用的健壮性和可控性。

异步任务阻塞问题解析

在Python的 asyncio 编程中,我们经常使用 asyncio.gather 来并发执行多个异步任务。然而,当某些任务内部包含长时间阻塞的 await 调用(例如等待网络数据或消息队列事件),并且这些调用可能长时间不返回时,即使外部设置了停止标志,整个 gather 操作也可能无法按预期在指定时间内终止。

考虑以下场景:

import asyncio

stop = False

async def watch_task1(client):
    while not stop:
        print("watch_task1: Waiting for data...")
        await client.ws.get_data() # 可能会长时间阻塞
        print("watch_task1: Data received.")

async def watch_task2(client):
    while not stop:
        print("watch_task2: Waiting for news...")
        await client.ws.get_news() # 可能会长时间阻塞
        print("watch_task2: News received.")

async def stop_after(delay):
    global stop
    print(f"stop_after: Will stop after {delay} seconds.")
    await asyncio.sleep(delay)
    stop = True
    print("stop_after: Stop flag set to True.")

async def main_gather(client):
    tasks = [
        watch_task1(client),
        watch_task2(client),
        stop_after(60),
    ]

    try:
        # 使用 gather,如果 watch_task1/2 内部的 await 阻塞,则无法按时停止
        await asyncio.gather(*tasks, return_exceptions=True)
    except Exception as e:
        print(f"main_gather: An exception occurred: {e}")
    finally:
        print("main_gather: All tasks finished or gathered.")

# 模拟一个简化的客户端
class MockClient:
    def __init__(self):
        self.ws = self.MockWebSocket()

    class MockWebSocket:
        async def get_data(self):
            # 模拟长时间阻塞,除非外部取消
            await asyncio.sleep(3600) # 模拟24小时,或直到被取消
            return "some_data"

        async def get_news(self):
            await asyncio.sleep(3600) # 模拟24小时,或直到被取消
            return "some_news"

# 运行示例 (不会真正运行,因为需要一个 client 实例)
# asyncio.run(main_gather(MockClient()))

在这个例子中,stop_after 函数会在60秒后将 stop 标志设置为 True。然而,watch_task1 和 watch_task2 内部的 await client.ws.get_data() 和 await client.ws.get_news() 调用可能会无限期地阻塞,直到有数据到来。这意味着即使 stop 标志为 True,这些任务也无法退出其 while 循环,导致 asyncio.gather 无法在60秒后完成。

解决方案:使用 asyncio.wait 进行超时控制

为了解决这个问题,我们可以使用 asyncio.wait 函数,它提供了强大的超时管理能力。asyncio.wait 允许我们指定一个 timeout 参数,在达到指定时间后,它会返回已完成的任务和未完成的任务。

立即学习Python免费学习笔记(深入)”;

asyncio.wait 的基本签名如下:

asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
  • aws: 一个可迭代对象,包含要等待的 awaitable 对象(通常是 Task 或 Future)。
  • timeout: 可选参数,指定等待的最大秒数。如果在此时间内所有任务都未完成,wait 会提前返回。
  • return_when: 一个常量,指定何时返回。常见的有:
    • asyncio.ALL_COMPLETED (默认): 所有任务都完成。
    • asyncio.FIRST_COMPLETED: 任意一个任务完成。
    • asyncio.FIRST_EXCEPTION: 任意一个任务抛出异常。

当 timeout 参数被设置时,asyncio.wait 会返回两个集合:done 和 pending。

  • done: 包含在超时时间内完成(或抛出异常)的任务。
  • pending: 包含在超时时间内未完成的任务。

以下是使用 asyncio.wait 改进后的 main 函数示例:

剪映
剪映

一款全能易用的桌面端剪辑软件

下载
import asyncio

# 假设 watch_task1, watch_task2, stop_after 和 MockClient 的定义与上文相同
# ...

async def main_wait(client):
    tasks = [
        asyncio.create_task(watch_task1(client)), # 显式创建任务
        asyncio.create_task(watch_task2(client)),
        asyncio.create_task(stop_after(60)),
    ]

    print("main_wait: Starting tasks with a 60-second timeout...")
    done, pending = await asyncio.wait(tasks, timeout=60)

    print(f"main_wait: Wait completed. Done tasks: {len(done)}, Pending tasks: {len(pending)}")

    # 处理已完成的任务
    for task in done:
        try:
            # 获取任务结果,如果任务抛出异常,这里会重新抛出
            result = task.result()
            print(f"main_wait: Task {task.get_name()} completed with result: {result if result is not None else 'None'}")
        except asyncio.CancelledError:
            print(f"main_wait: Task {task.get_name()} was cancelled.")
        except Exception as e:
            print(f"main_wait: Task {task.get_name()} raised an exception: {e}")

    # 处理未完成的任务:通常需要取消它们以释放资源
    if pending:
        print("main_wait: Cancelling pending tasks...")
        for task in pending:
            task.cancel()

        # 等待所有取消操作完成,或者设置一个短的超时
        # 注意:task.cancel() 只是请求取消,任务需要自行处理 CancelledError
        await asyncio.gather(*pending, return_exceptions=True) # 等待取消请求生效

    print("main_wait: All tasks processed.")

# 实际运行示例
async def run_example():
    client = MockClient()
    await main_wait(client)

if __name__ == "__main__":
    asyncio.run(run_example())

代码解释:

  1. asyncio.create_task(): 在将 awaitable 对象传递给 asyncio.wait 之前,最好使用 asyncio.create_task() 将它们包装成 Task 对象。这使得我们可以更好地管理和取消这些任务。
  2. await asyncio.wait(tasks, timeout=60): 这是核心部分。它会等待 tasks 列表中的所有任务,但最长等待60秒。一旦60秒过去,或者所有任务都已完成,它就会返回 done 和 pending 两个集合。
  3. 处理 done 任务:
    • 遍历 done 集合中的每个任务。
    • 使用 task.result() 来获取任务的返回值或重新抛出任务内部发生的任何异常。这是处理 asyncio.wait 返回的已完成任务的关键。
    • 通过 task.get_name() 可以获取任务的名称(如果设置了)。
  4. 处理 pending 任务:
    • 遍历 pending 集合中的每个任务。这些任务在超时时间内未能完成。
    • 调用 task.cancel() 方法来请求取消这些任务。cancel() 方法会向任务内部注入一个 asyncio.CancelledError 异常。任务需要自行捕获并处理这个异常,以执行必要的清理工作。
    • await asyncio.gather(*pending, return_exceptions=True): 在取消所有 pending 任务后,我们通常需要等待这些取消操作真正生效,即等待这些任务处理完 CancelledError 并最终退出。使用 gather 并设置 return_exceptions=True 可以确保即使有任务在取消过程中抛出其他异常,也不会中断整个流程。

注意事项与最佳实践

  1. 任务取消的响应: task.cancel() 仅仅是发送一个取消请求。任务本身必须在适当的位置检查 CancelledError 并进行响应。如果任务内部有 await 调用,当 CancelledError 抛出时,await 表达式会立即抛出该异常。如果任务内部没有 await 调用或不处理异常,它将继续运行直到完成。

    async def my_cancellable_task():
        try:
            while True:
                # 模拟工作
                await asyncio.sleep(1)
                print("Task working...")
        except asyncio.CancelledError:
            print("Task was cancelled, performing cleanup...")
            # 执行清理操作
            await asyncio.sleep(0.1)
            print("Cleanup complete.")
        finally:
            print("Task finished.")
  2. 资源清理: 确保你的异步任务在被取消或正常完成时,能够正确地关闭文件句柄、网络连接或其他系统资源。try...finally 块是实现这一点的常用模式。

  3. 异常处理: 当从 task.result() 中获取结果时,如果任务内部发生了未捕获的异常,task.result() 会重新抛出该异常。务必在处理 done 任务时捕获这些潜在的异常,以防止主程序崩溃。

  4. asyncio.wait_for 的替代: 另一种方法是使用 asyncio.wait_for 为每个单独的长时间运行任务设置超时。

    async def main_wait_for(client):
        try:
            await asyncio.wait_for(watch_task1(client), timeout=60)
        except asyncio.TimeoutError:
            print("watch_task1 timed out!")
    
        try:
            await asyncio.wait_for(watch_task2(client), timeout=60)
        except asyncio.TimeoutError:
            print("watch_task2 timed out!")
    
        # 对于多个任务,这种方式可能不如 asyncio.wait 灵活,
        # 因为它会串行处理超时,而不是全局并行。
        # 但如果只需要对单个任务施加超时,则非常适用。

    asyncio.wait_for 更适合对单个 awaitable 设置超时,如果超时,它会取消该 awaitable 并抛出 asyncio.TimeoutError。对于需要全局管理一组任务并在超时时统一处理的场景,asyncio.wait 提供了更强大的控制。

总结

通过 asyncio.wait 及其 timeout 参数,我们可以精确地控制一组并发异步任务的最大执行时间。这种方法不仅能够确保应用程序在预设时间内响应,还提供了清晰的机制来识别已完成和未完成的任务,并允许我们优雅地取消那些未能按时完成的任务,从而实现健壮且可控的异步编程实践。理解并正确应用 asyncio.wait 是构建高性能、可靠的 asyncio 应用程序的关键。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
java基础知识汇总
java基础知识汇总

java基础知识有Java的历史和特点、Java的开发环境、Java的基本数据类型、变量和常量、运算符和表达式、控制语句、数组和字符串等等知识点。想要知道更多关于java基础知识的朋友,请阅读本专题下面的的有关文章,欢迎大家来php中文网学习。

1503

2023.10.24

while的用法
while的用法

while的用法是“while 条件: 代码块”,条件是一个表达式,当条件为真时,执行代码块,然后再次判断条件是否为真,如果为真则继续执行代码块,直到条件为假为止。本专题为大家提供while相关的文章、下载、课程内容,供大家免费下载体验。

97

2023.09.25

go语言 注释编码
go语言 注释编码

本专题整合了go语言注释、注释规范等等内容,阅读专题下面的文章了解更多详细内容。

0

2026.01.31

go语言 math包
go语言 math包

本专题整合了go语言math包相关内容,阅读专题下面的文章了解更多详细内容。

1

2026.01.31

go语言输入函数
go语言输入函数

本专题整合了go语言输入相关教程内容,阅读专题下面的文章了解更多详细内容。

1

2026.01.31

golang 循环遍历
golang 循环遍历

本专题整合了golang循环遍历相关教程,阅读专题下面的文章了解更多详细内容。

0

2026.01.31

Golang人工智能合集
Golang人工智能合集

本专题整合了Golang人工智能相关内容,阅读专题下面的文章了解更多详细内容。

1

2026.01.31

2026赚钱平台入口大全
2026赚钱平台入口大全

2026年最新赚钱平台入口汇总,涵盖任务众包、内容创作、电商运营、技能变现等多类正规渠道,助你轻松开启副业增收之路。阅读专题下面的文章了解更多详细内容。

67

2026.01.31

高干文在线阅读网站大全
高干文在线阅读网站大全

汇集热门1v1高干文免费阅读资源,涵盖都市言情、京味大院、军旅高干等经典题材,情节紧凑、人物鲜明。阅读专题下面的文章了解更多详细内容。

71

2026.01.31

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.4万人学习

Django 教程
Django 教程

共28课时 | 3.7万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号