0

0

如何在异步任务队列中动态维持固定并发数并优先重试失败任务

聖光之護

聖光之護

发布时间:2025-12-25 16:35:00

|

104人浏览过

|

来源于php中文网

原创

如何在异步任务队列中动态维持固定并发数并优先重试失败任务

本文介绍如何使用 `asyncio.as_completed()` 动态管理可变长度任务队列,始终保持 n 个并发任务运行;当任务失败时立即重新入队并获得最高优先级,确保快速重试,避免批量提交导致的调度延迟。

在实际异步任务调度场景中(如网络请求、文件下载或批量 API 调用),我们常需满足三个关键约束:

  • 固定并发上限(例如最多同时运行 8 个任务);
  • 失败任务需立即重试且享有最高优先级
  • 结果需按完成顺序实时产出(而非等待整批结束)

此时,若一次性创建全部 1000 个 Task(如 asyncio.create_task() 批量调用),不仅内存开销大,更会导致失败任务被“埋没”——因为 asyncio.as_completed(task_list) 仅作用于已创建的 Task 对象集合,无法动态插入新任务或调整优先级。

✅ 正确解法是:只维护一个大小可控的活跃任务池(pool),通过循环“取—执行—回收—补充”实现流式调度。核心思路如下:

有道智云AI开放平台
有道智云AI开放平台

有道智云AI开放平台

下载
  1. 使用字典 task_pool: Dict[asyncio.Task, Coroutine] 映射任务与其原始协程,便于失败后精准重试;
  2. 每次循环优先填充至目标并发数(如 8);
  3. 调用 asyncio.as_completed(task_pool.keys()) 获取首个完成任务;
  4. await 其结果,成功则 yield,失败则将对应协程重新加入待调度队列(推荐用 list.append() 实现 LIFO 高优先级);
  5. 无论成败,均从池中移除该任务,并在下一轮循环中自动补入新任务。

以下是生产就绪的参考实现(含错误处理与资源清理):

import asyncio
from typing import List, Coroutine, Any, Iterator

async def run_with_priority_retry(
    task_coros: List[Coroutine],
    max_concurrent: int = 8,
) -> Iterator[Any]:
    """
    动态维持 max_concurrent 个并发任务,失败任务立即重试(LIFO 优先)。

    Args:
        task_coros: 初始任务协程列表(可修改)
        max_concurrent: 最大并发数

    Yields:
        成功执行的结果
    """
    # 使用 list 模拟优先队列:append → 高优;pop() → 取最高优
    pending = list(task_coros)
    task_pool = {}  # asyncio.Task -> Coroutine

    while pending or task_pool:
        # ✅ 补充任务至满额
        while pending and len(task_pool) < max_concurrent:
            coro = pending.pop()  # LIFO:最后加入的最先执行(高优)
            task = asyncio.create_task(coro)
            task_pool[task] = coro

        if not task_pool:
            break

        # ✅ 等待任意一个完成(as_completed 返回迭代器,next 即首个)
        done, _ = await asyncio.wait(
            task_pool.keys(), 
            return_when=asyncio.FIRST_COMPLETED
        )
        completed_task = done.pop()

        # ✅ 提取原始协程并清理池
        coro = task_pool.pop(completed_task)

        try:
            result = await completed_task
            yield result
        except Exception as e:
            print(f"Task failed with {type(e).__name__}: {e} — re-queued with high priority")
            pending.append(coro)  # 失败任务插到末尾,下次 pop 优先执行

        # ✅ 清理已完成 task 引用(防止内存泄漏)
        completed_task.cancel()
        try:
            await completed_task
        except (asyncio.CancelledError, Exception):
            pass

? 关键注意事项

  • ❗切勿用 asyncio.as_completed(list) 直接传入动态变化的列表——它接收的是快照,不支持运行时增删;应始终基于稳定集合(如 dict.keys())调用。
  • ⚠️ 使用 pending.pop() 而非 pending.pop(0) 实现 O(1) 高优入队/出队;若需严格 FIFO 重试,可改用 collections.deque 并调用 appendleft() + pop()。
  • ? 务必显式 cancel() 并 await 已完成的 Task,避免悬空对象累积(尤其在长期运行服务中)。
  • ? 若任务本身含重试逻辑(如 tenacity),需与本层重试策略对齐,避免重复或冲突。

该模式已被广泛应用于异步爬虫、批量数据同步及微服务熔断降级等场景,兼顾性能、可控性与健壮性。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
append用法
append用法

append是一个常用的命令行工具,用于将一个文件的内容追加到另一个文件的末尾。想了解更多append用法相关内容,可以阅读本专题下面的文章。

348

2023.10.25

python中append的用法
python中append的用法

在Python中,append()是列表对象的一个方法,用于向列表末尾添加一个元素。想了解更多append的更多内容,可以阅读本专题下面的文章。

1080

2023.11.14

python中append的含义
python中append的含义

本专题整合了python中append的相关内容,阅读专题下面的文章了解更多详细内容。

185

2025.09.12

Go高并发任务调度与Goroutine池化实践
Go高并发任务调度与Goroutine池化实践

本专题围绕 Go 语言在高并发任务处理场景中的实践展开,系统讲解 Goroutine 调度模型、Channel 通信机制以及并发控制策略。内容包括任务队列设计、Goroutine 池化管理、资源限制控制以及并发任务的性能优化方法。通过实际案例演示,帮助开发者构建稳定高效的 Go 并发任务处理系统,提高系统在高负载环境下的处理能力与稳定性。

22

2026.03.10

Kotlin Android模块化架构与组件化开发实践
Kotlin Android模块化架构与组件化开发实践

本专题围绕 Kotlin 在 Android 应用开发中的架构实践展开,重点讲解模块化设计与组件化开发的实现思路。内容包括项目模块拆分策略、公共组件封装、依赖管理优化、路由通信机制以及大型项目的工程化管理方法。通过真实项目案例分析,帮助开发者构建结构清晰、易扩展且维护成本低的 Android 应用架构体系,提升团队协作效率与项目迭代速度。

48

2026.03.09

JavaScript浏览器渲染机制与前端性能优化实践
JavaScript浏览器渲染机制与前端性能优化实践

本专题围绕 JavaScript 在浏览器中的执行与渲染机制展开,系统讲解 DOM 构建、CSSOM 解析、重排与重绘原理,以及关键渲染路径优化方法。内容涵盖事件循环机制、异步任务调度、资源加载优化、代码拆分与懒加载等性能优化策略。通过真实前端项目案例,帮助开发者理解浏览器底层工作原理,并掌握提升网页加载速度与交互体验的实用技巧。

93

2026.03.06

Rust内存安全机制与所有权模型深度实践
Rust内存安全机制与所有权模型深度实践

本专题围绕 Rust 语言核心特性展开,深入讲解所有权机制、借用规则、生命周期管理以及智能指针等关键概念。通过系统级开发案例,分析内存安全保障原理与零成本抽象优势,并结合并发场景讲解 Send 与 Sync 特性实现机制。帮助开发者真正理解 Rust 的设计哲学,掌握在高性能与安全性并重场景中的工程实践能力。

216

2026.03.05

PHP高性能API设计与Laravel服务架构实践
PHP高性能API设计与Laravel服务架构实践

本专题围绕 PHP 在现代 Web 后端开发中的高性能实践展开,重点讲解基于 Laravel 框架构建可扩展 API 服务的核心方法。内容涵盖路由与中间件机制、服务容器与依赖注入、接口版本管理、缓存策略设计以及队列异步处理方案。同时结合高并发场景,深入分析性能瓶颈定位与优化思路,帮助开发者构建稳定、高效、易维护的 PHP 后端服务体系。

412

2026.03.04

AI安装教程大全
AI安装教程大全

2026最全AI工具安装教程专题:包含各版本AI绘图、AI视频、智能办公软件的本地化部署手册。全篇零基础友好,附带最新模型下载地址、一键安装脚本及常见报错修复方案。每日更新,收藏这一篇就够了,让AI安装不再报错!

143

2026.03.04

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Java 教程
Java 教程

共578课时 | 80.8万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号