0

0

优化 asyncio 任务调度:使用队列实现生产者-消费者模式

霞舞

霞舞

发布时间:2025-11-26 13:22:46

|

845人浏览过

|

来源于php中文网

原创

优化 asyncio 任务调度:使用队列实现生产者-消费者模式

本文探讨了在 `asyncio` 中如何解决因直接 `await` 耗时操作导致的并发阻塞问题。通过分析一个字符流处理示例,揭示了传统 `async for` 循环中 `await` 的局限性。核心解决方案是引入 `asyncio.queue` 和 `asyncio.event`,构建生产者-消费者模式,从而实现任务的解耦与并发执行,显著提升异步应用的响应性和效率。

在 asyncio 异步编程中,我们经常需要处理数据流的生产和消费。一个常见的挑战是,当一个任务正在处理数据并遇到耗时操作时,如何确保数据生产能够持续进行,而不是被阻塞。本文将深入探讨这一问题,并提供一个基于 asyncio.Queue 和 asyncio.Event 的优雅解决方案。

1. asyncio 中的并发挑战与 await 的局限性

考虑一个场景:我们有一个字符流生成器 stream(),它逐个生成字符;一个句子生成器 sentences_generator(),它从字符流中收集字符并生成完整的句子;以及一个句子处理器 process_sentence(),它模拟对句子的耗时处理。

初始实现可能如下所示:

import asyncio

async def stream():
    char_string = "Hi. Hello. Thank you."
    for char in char_string:
        await asyncio.sleep(0.1)  # 模拟耗时操作
        print("got char:", char)
        yield char

async def sentences_generator():
    sentence = ""
    async for char in stream():
        sentence += char
        if char in [".", "!", "?"]:
            print("got sentence: ", sentence)
            yield sentence
            sentence = ""

async def process_sentence(sentence: str):
    print("waiting for processing sentence: ", sentence)
    await asyncio.sleep(len(sentence) * 0.1) # 模拟句子处理耗时
    print("sentence processed!")

async def main():
    i = 0
    async for sentence in sentences_generator():
        print("processing sentence: ", i)
        await process_sentence(sentence) # 这里会阻塞
        i += 1

# asyncio.run(main())

运行上述代码,会观察到如下输出模式:

got char: H
got char: i
got char: .
got sentence:  Hi.
processing sentence:  0
waiting for processing sentence:  Hi.
sentence processed!
got char:  
got char: H
got char: e
got char: l
got char: l
got char: o
got char: .
got sentence:   Hello.
processing sentence:  1
waiting for processing sentence:   Hello.
sentence processed!
...

从输出可以看出,当 process_sentence 正在处理一个句子(即执行 await asyncio.sleep())时,字符流的生成(got char:)完全停止了。只有当前句子处理完毕后,sentences_generator 才能继续从 stream() 获取下一个字符,进而生成下一个句子。这并不是我们期望的并发行为。理想情况下,当一个句子正在被处理时,字符流应该能够持续生成,为下一个句子做准备。

问题根源:asyncio 中的 await 关键字会暂停当前协程的执行,并将控制权交还给事件循环,允许事件循环调度其他“已准备好”的协程。然而,在上述 main 函数中,async for sentence in sentences_generator(): 迭代器在每次循环中都紧跟着 await process_sentence(sentence)。这意味着 main 协程会完全等待 process_sentence 完成,才能再次从 sentences_generator 获取下一个句子。由于 sentences_generator 是在 main 内部同步迭代的,它也无法在 process_sentence 运行时继续推进。

2. 解决方案:利用 asyncio.Queue 和 asyncio.Event 实现生产者-消费者模式

为了实现生产者(生成句子)和消费者(处理句子)的并发执行,我们需要解耦它们,使它们能够独立运行。asyncio.Queue 和 asyncio.Event 是实现这一目标的理想工具

2.1 生产者-消费者模式概述

生产者-消费者模式是一种经典的多线程/多进程/多任务设计模式。生产者负责生成数据并将其放入一个共享缓冲区(队列),而消费者则从缓冲区中取出数据进行处理。这种模式的关键在于,生产者和消费者可以以不同的速度运行,并且互不干扰,只要队列中有足够的空间或数据。

2.2 asyncio.Queue 的作用

asyncio.Queue 是 asyncio 提供的异步队列,它具有以下特性:

  • 异步存取: await queue.put(item) 用于异步地将元素放入队列,如果队列已满,则会暂停直到有空间。await queue.get() 用于异步地从队列中取出元素,如果队列为空,则会暂停直到有元素可用。
  • 协程安全: 它是为 asyncio 协程设计的,保证了在并发访问时的正确性。
  • 作为缓冲区: 它充当生产者和消费者之间的缓冲区,允许它们独立运行。

2.3 asyncio.Event 的作用

asyncio.Event 是一个简单的同步原语,用于在 asyncio 任务之间进行信号通知。它主要用于:

  • 标记完成: 生产者任务在完成所有数据生产后,可以通过 event.set() 来设置事件,通知消费者数据流已经结束。
  • 等待信号: 消费者任务可以通过 await event.wait() 来等待事件被设置。

结合 asyncio.Queue 和 asyncio.Event,我们可以构建一个健壮的生产者-消费者系统,确保消费者在队列清空且生产者已完成工作后能够优雅地退出。

天工大模型
天工大模型

中国首个对标ChatGPT的双千亿级大语言模型

下载

3. 实战:重构 asyncio 任务以实现并发

我们将改造 sentences_generator 作为生产者,process_sentence 作为消费者,并使用 asyncio.gather() 来并发运行它们。

import asyncio

# 模拟全局变量,用于计数处理的句子
i = 1 

async def stream():
    char_string = "Hi. Hello. Thank you." # 更改了字符串以展示更长的流
    for char in char_string:
        await asyncio.sleep(0.1)  # 模拟耗时操作
        print("got char:", char)
        yield char

async def sentences_generator(q: asyncio.Queue[str], flag: asyncio.Event):
    """
    生产者:从字符流生成句子,并放入队列。
    当所有句子生成完毕后,设置事件标志。
    """
    sentence = ""
    async for char in stream():
        sentence += char
        if char in [".", "!", "?"]:
            print("got sentence: ", sentence)
            await q.put(sentence) # 将句子放入队列
            sentence = ""
    flag.set() # 生产者完成所有工作,设置事件标志

async def process_sentence(q: asyncio.Queue[str], flag: asyncio.Event):
    """
    消费者:从队列中取出句子并处理。
    当队列为空且生产者已完成时,退出。
    """
    global i # 使用全局计数器
    while True:
        # 检查退出条件:队列为空且生产者已设置完成标志
        if q.empty() and flag.is_set():
            break
        try:
            item = await asyncio.wait_for(q.get(), timeout=1.0) # 尝试从队列获取,设置超时避免无限等待
        except asyncio.TimeoutError:
            # 如果超时,再次检查退出条件,防止生产者完成但队列仍空的情况
            if q.empty() and flag.is_set():
                break
            continue # 继续尝试获取

        print("processing sentence: ", i)
        print("waiting for processing sentence: ", item)
        await asyncio.sleep(len(item) * 0.1) # 模拟句子处理耗时
        print("sentence processed!")
        i += 1

async def main():
    global i
    i = 1 # 重置计数器
    event = asyncio.Event() # 创建事件对象
    queue = asyncio.Queue[str]() # 创建队列对象

    # 创建生产者和消费者任务
    producer_task = sentences_generator(queue, event)
    consumer_task = process_sentence(queue, event)

    # 并发运行生产者和消费者任务
    await asyncio.gather(producer_task, consumer_task)

if __name__ == "__main__":
    asyncio.run(main())

代码解析:

  1. sentences_generator (生产者):

    • 不再 yield 句子,而是通过 await q.put(sentence) 将句子放入共享队列 q 中。
    • 在 async for char in stream(): 循环结束后,调用 flag.set() 标记生产者已完成所有数据生成。
  2. process_sentence (消费者):

    • 进入一个 while True 循环,持续从队列中获取句子。
    • await q.get() 会在队列为空时暂停,直到有新的句子可用。
    • 循环的退出条件是 q.empty() and flag.is_set()。这意味着只有当队列中没有待处理的句子,并且生产者也已经发出完成信号时,消费者才能安全退出。
    • 为了更健壮地处理消费者退出,我们使用了 asyncio.wait_for(q.get(), timeout=...),避免在生产者完成但队列仍有少量数据时,消费者可能因 q.get() 阻塞而无法及时检查 flag.is_set()。
  3. main 函数:

    • 初始化 asyncio.Event 和 asyncio.Queue 实例。
    • 将 sentences_generator 和 process_sentence 作为独立的协程(任务)创建,并将队列和事件对象作为参数传递。
    • 使用 await asyncio.gather(producer_task, consumer_task) 并发运行这两个任务。asyncio.gather 会等待所有传入的协程完成。

4. 运行效果与并发分析

运行优化后的代码,你将看到类似以下的输出:

got char: H
got char: i
got char: .
got sentence:  Hi.
processing sentence:  1
waiting for processing sentence:  Hi.
got char:  
got char: H
got char: e
got char: l
got char: l
got char: o
got char: .
got sentence:   Hello.
sentence processed!
processing sentence:  2
waiting for processing sentence:   Hello.
got char:  
got char: T
got char: h
got char: a
got char: n
got char: k
got char: .
got sentence:   Thank.
sentence processed!
processing sentence:  3
waiting for processing sentence:   Thank.
got char:  
got char: y
got char: o
got char: u
got char: .
got sentence:   you.
sentence processed!
processing sentence:  4
waiting for processing sentence:   you.
sentence processed!

并发分析:

从新的输出可以看出,当 process_sentence 正在等待(waiting for processing sentence: ...)时,sentences_generator 仍然在继续生成字符(got char: ...)和句子(got sentence: ...),并将它们放入队列。这正是我们期望的并发行为:生产者和消费者独立运行,通过队列进行异步通信,充分利用了 asyncio 的协作式多任务能力。

5. 注意事项与最佳实践

  • 选择合适的并发工具: asyncio.Queue 适用于 I/O 密集型任务的协作式并发,例如网络请求、文件读写等。对于 CPU 密集型任务,即使使用了 asyncio.Queue,由于 Python 的 GIL (全局解释器锁),真正的并行处理仍然需要借助 multiprocessing 模块或 concurrent.futures.ProcessPoolExecutor。
  • 队列容量: asyncio.Queue 可以在初始化时指定最大容量(maxsize)。如果队列已满,put() 操作会暂停;如果队列为空,get() 操作会暂停。合理设置队列容量可以防止内存无限增长,但也可能导致生产者阻塞。
  • 优雅地终止消费者: asyncio.Event 在生产者完成所有工作后通知消费者,是确保消费者在队列清空且无新数据时能够正确退出的关键。如果没有这样的信号机制,消费者可能会无限期地等待 q.get()。
  • 错误处理: 在实际应用中,需要考虑生产者或消费者任务中可能出现的异常。可以使用 try...except 块来捕获并处理错误,确保程序的健壮性。
  • 任务取消: asyncio 任务可以被取消。在设计生产者-消费者模式时,需要考虑如何响应任务取消,例如在取消时清空队列或确保资源被正确释放。

通过理解 await 的工作原理并巧妙地利用 asyncio.Queue 和 asyncio.Event,我们可以有效地构建高效、响应迅速的异步应用程序,实现复杂的任务调度和并发处理。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
while的用法
while的用法

while的用法是“while 条件: 代码块”,条件是一个表达式,当条件为真时,执行代码块,然后再次判断条件是否为真,如果为真则继续执行代码块,直到条件为假为止。本专题为大家提供while相关的文章、下载、课程内容,供大家免费下载体验。

106

2023.09.25

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

765

2023.08.10

Python 多线程与异步编程实战
Python 多线程与异步编程实战

本专题系统讲解 Python 多线程与异步编程的核心概念与实战技巧,包括 threading 模块基础、线程同步机制、GIL 原理、asyncio 异步任务管理、协程与事件循环、任务调度与异常处理。通过实战示例,帮助学习者掌握 如何构建高性能、多任务并发的 Python 应用。

377

2025.12.24

java多线程相关教程合集
java多线程相关教程合集

本专题整合了java多线程相关教程,阅读专题下面的文章了解更多详细内容。

32

2026.01.21

C++多线程相关合集
C++多线程相关合集

本专题整合了C++多线程相关教程,阅读专题下面的的文章了解更多详细内容。

29

2026.01.21

C# 多线程与异步编程
C# 多线程与异步编程

本专题深入讲解 C# 中多线程与异步编程的核心概念与实战技巧,包括线程池管理、Task 类的使用、async/await 异步编程模式、并发控制与线程同步、死锁与竞态条件的解决方案。通过实际项目,帮助开发者掌握 如何在 C# 中构建高并发、低延迟的异步系统,提升应用性能和响应速度。

103

2026.02.06

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

76

2026.03.11

Go高并发任务调度与Goroutine池化实践
Go高并发任务调度与Goroutine池化实践

本专题围绕 Go 语言在高并发任务处理场景中的实践展开,系统讲解 Goroutine 调度模型、Channel 通信机制以及并发控制策略。内容包括任务队列设计、Goroutine 池化管理、资源限制控制以及并发任务的性能优化方法。通过实际案例演示,帮助开发者构建稳定高效的 Go 并发任务处理系统,提高系统在高负载环境下的处理能力与稳定性。

38

2026.03.10

Kotlin Android模块化架构与组件化开发实践
Kotlin Android模块化架构与组件化开发实践

本专题围绕 Kotlin 在 Android 应用开发中的架构实践展开,重点讲解模块化设计与组件化开发的实现思路。内容包括项目模块拆分策略、公共组件封装、依赖管理优化、路由通信机制以及大型项目的工程化管理方法。通过真实项目案例分析,帮助开发者构建结构清晰、易扩展且维护成本低的 Android 应用架构体系,提升团队协作效率与项目迭代速度。

83

2026.03.09

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.5万人学习

Django 教程
Django 教程

共28课时 | 4.9万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号