0

0

优化asyncio中嵌套异步任务的并发调度

聖光之護

聖光之護

发布时间:2025-11-27 11:37:19

|

491人浏览过

|

来源于php中文网

原创

优化asyncio中嵌套异步任务的并发调度

本文探讨了在`asyncio`中处理嵌套异步生成器时,如何通过传统`await`模式导致的串行执行问题。针对`await`的阻塞特性,文章提出并详细阐述了利用`asyncio.queue`和`asyncio.event`构建生产者-消费者模式的解决方案,从而实现任务间的解耦和真正的并发执行,显著提升异步应用的效率和响应性。

理解asyncio中的await与并发限制

在asyncio编程中,await关键字是调度协程的核心机制。当一个协程遇到await表达式时,它会暂停自身的执行,将控制权交还给事件循环,并等待被await的协程完成。一旦被await的协程完成并返回结果,原协程才会从暂停点继续执行。这种机制虽然实现了协作式多任务,但如果设计不当,也可能导致非预期的串行执行。

考虑以下场景:一个异步任务(main)需要从一个异步生成器(sentences_generator)获取数据,然后将数据传递给另一个异步任务(process_sentence)进行处理。如果main函数在每次获取到数据后,都直接await process_sentence的完成,那么在process_sentence执行期间,sentences_generator将无法继续生成新的数据。这违背了我们期望的并发处理,即当process_sentence在处理当前数据时,sentences_generator应该能够同时准备下一批数据。

以下是原始代码示例及其输出,展示了这种串行阻塞行为:

import asyncio

async def stream():
    char_string = "Hi. Hello. Hello."
    for char in char_string:
        await asyncio.sleep(0.1)  # 模拟耗时操作
        print("got char:", char)
        yield char

async def sentences_generator():
    sentence = ""
    async for char in stream():
        sentence += char
        if char in [".", "!", "?"]:
            print("got sentence: ", sentence)
            yield sentence
            sentence = ""

async def process_sentence(sentence: str):
    print("waiting for processing sentence: ", sentence)
    await asyncio.sleep(len(sentence)*0.1) # 模拟耗时处理
    print("sentence processed!")

async def main():
    i=0
    async for sentence in sentences_generator():
        print("processing sentence: ", i)
        await process_sentence(sentence) # 这里的await导致阻塞
        i += 1

# asyncio.run(main())

原始输出示例:

got char: H
got char: i
got char: .
got sentence:  Hi.
processing sentence:  0
waiting for processing sentence:  Hi.
sentence processed!
got char:  
got char: H
got char: e
got char: y
got char: .
got sentence:   Hey.
processing sentence:  1
waiting for processing sentence:   Hey.
sentence processed!
...

从输出可以看出,只有当process_sentence完全处理完一个句子后,stream和sentences_generator才能继续生成下一个字符和句子。这并不是我们期望的并发效果。

解决方案:使用asyncio.Queue实现生产者-消费者模式

为了实现真正的并发,我们需要解耦数据的生产和消费过程,使它们能够独立运行。asyncio.Queue是实现这种生产者-消费者模式的理想工具

核心思想:

故事AI绘图神器
故事AI绘图神器

文本生成图文视频的AI工具,无需配音,无需剪辑,快速成片,角色固定。

下载
  1. 生产者(Producer):一个或多个异步任务负责生成数据,并将数据放入asyncio.Queue中。
  2. 消费者(Consumer):一个或多个异步任务从asyncio.Queue中取出数据进行处理。
  3. 独立运行:生产者和消费者作为独立的协程,由asyncio事件循环调度,它们之间通过队列进行通信,互不阻塞。

此外,为了实现优雅的关闭和通知消费者数据已全部生产完毕,我们可以引入asyncio.Event。生产者在完成所有数据生产后设置Event,消费者则可以结合队列是否为空和Event状态来判断何时停止。

优化后的代码实现

我们将修改sentences_generator作为生产者,将生成的句子放入队列;process_sentence作为消费者,从队列中取出句子进行处理。main函数将负责启动这两个独立的协程。

import asyncio

# 定义全局变量用于计数,方便观察
i = 1

async def stream():
    char_string = "Hi. Hello. Thank you." # 增加一些内容以更好地展示并发
    for char in char_string:
        await asyncio.sleep(0.1) # 模拟耗时操作
        print("got char:", char)
        yield char

async def sentences_generator(q: asyncio.Queue[str], flag: asyncio.Event):
    """
    生产者协程:从字符流生成句子,并放入队列。
    当所有句子生成完毕后,设置flag通知消费者。
    """
    sentence = ""
    async for char in stream():
        sentence += char
        if char in [".", "!", "?"]:
            print("got sentence: ", sentence)
            await q.put(sentence) # 将生成的句子放入队列
            sentence = ""
    # 确保最后一个不以标点符号结尾的句子也被处理(如果需要)
    if sentence:
        print("got sentence: ", sentence)
        await q.put(sentence)
    flag.set() # 生产完毕,设置事件标志

async def process_sentence(q: asyncio.Queue[str], flag: asyncio.Event):
    """
    消费者协程:从队列中获取句子并进行处理。
    当队列为空且生产者已设置flag时,停止消费。
    """
    global i
    while True:
        # 检查是否应该停止:队列为空且生产者已完成
        if q.empty() and flag.is_set():
            break

        # 尝试从队列获取项目,如果队列为空则等待
        item = await q.get()

        print("processing sentence: ", i)
        print("waiting for processing sentence: ", item)
        await asyncio.sleep(len(item) * 0.1) # 模拟耗时处理
        print("sentence processed!")

        q.task_done() # 通知队列此任务已完成
        i += 1

async def main():
    global i
    i = 1 # 重置计数器
    event = asyncio.Event() # 用于生产者通知消费者结束
    queue = asyncio.Queue[str]() # 生产者和消费者之间的通信队列

    # 启动生产者和消费者作为独立的协程任务
    producer_task = asyncio.create_task(sentences_generator(queue, event))
    consumer_task = asyncio.create_task(process_sentence(queue, event))

    # 等待所有任务完成
    await asyncio.gather(producer_task, consumer_task)

    # 可选:等待队列中所有任务被标记为完成,确保所有数据都被处理
    await queue.join()

asyncio.run(main())

预期输出示例:

got char: H
got char: i
got char: .
got sentence:  Hi.
got char:  
got char: H
processing sentence:  1
waiting for processing sentence:  Hi.
got char: e
got char: l
got char: l
got char: o
got char: .
got sentence:   Hello.
sentence processed!
got char:  
got char: T
processing sentence:  2
waiting for processing sentence:   Hello.
got char: h
got char: a
got char: n
got char: k
got char:  
got char: y
got char: o
got char: u
got char: .
got sentence:  Thank you.
sentence processed!
processing sentence:  3
waiting for processing sentence:  Thank you.
sentence processed!

从这个输出可以看出,当process_sentence正在处理第一个句子时,stream和sentences_generator已经继续生成了后续的字符和句子,并将其放入队列。这正是我们期望的并发行为。

关键点和注意事项

  1. asyncio.Queue的作用:它提供了一个线程安全的(在asyncio中是协程安全的)FIFO队列。put()操作在队列满时会暂停,get()操作在队列空时会暂停,直到有新的数据可用。
  2. asyncio.Event的作用:它是一个简单的同步原语,用于一个协程通知另一个协程某个事件已经发生。生产者在完成所有数据生产后调用flag.set(),消费者则通过flag.is_set()来检查生产者的状态。
  3. asyncio.gather():用于并发运行多个协程或任务,并等待它们全部完成。
  4. q.task_done() 和 q.join()
    • q.task_done():消费者在完成对从队列中获取的项目的处理后调用,通知队列该项目已处理完毕。
    • q.join():main函数可以调用await queue.join()来等待队列中所有项目都被get并task_done。这确保了在程序退出前所有数据都已得到处理。在我们的示例中,虽然gather已经等待了所有协程,但queue.join()提供了一个更明确的机制来等待所有队列中的工作完成。
  5. 消费者退出条件:消费者协程的退出逻辑至关重要。一个常见的模式是while True循环,内部判断q.empty() and flag.is_set()来决定是否退出。这确保了在生产者完成且队列中所有待处理项都已消费后,消费者才能安全退出。
  6. 错误处理:在实际应用中,生产者和消费者内部应添加适当的错误处理机制,例如try-except块。
  7. 背压(Backpressure):asyncio.Queue可以有容量限制。如果生产者生产速度远快于消费者,队列会逐渐填满,最终q.put()会暂停,从而对生产者施加背压,防止内存无限增长。

总结

通过将异步任务分解为独立的生产者和消费者,并利用asyncio.Queue进行通信,我们成功地将原本串行执行的逻辑转换为了并发执行。这种模式不仅提高了资源利用率,也使得代码结构更加清晰,易于维护和扩展。在设计复杂的asyncio应用时,当存在数据流动的依赖但又希望实现任务并行时,生产者-消费者模式与asyncio.Queue是解决这类问题的强大工具。

相关专题

更多
while的用法
while的用法

while的用法是“while 条件: 代码块”,条件是一个表达式,当条件为真时,执行代码块,然后再次判断条件是否为真,如果为真则继续执行代码块,直到条件为假为止。本专题为大家提供while相关的文章、下载、课程内容,供大家免费下载体验。

90

2023.09.25

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

482

2023.08.10

Java JVM 原理与性能调优实战
Java JVM 原理与性能调优实战

本专题系统讲解 Java 虚拟机(JVM)的核心工作原理与性能调优方法,包括 JVM 内存结构、对象创建与回收流程、垃圾回收器(Serial、CMS、G1、ZGC)对比分析、常见内存泄漏与性能瓶颈排查,以及 JVM 参数调优与监控工具(jstat、jmap、jvisualvm)的实战使用。通过真实案例,帮助学习者掌握 Java 应用在生产环境中的性能分析与优化能力。

19

2026.01.20

PS使用蒙版相关教程
PS使用蒙版相关教程

本专题整合了ps使用蒙版相关教程,阅读专题下面的文章了解更多详细内容。

61

2026.01.19

java用途介绍
java用途介绍

本专题整合了java用途功能相关介绍,阅读专题下面的文章了解更多详细内容。

87

2026.01.19

java输出数组相关教程
java输出数组相关教程

本专题整合了java输出数组相关教程,阅读专题下面的文章了解更多详细内容。

39

2026.01.19

java接口相关教程
java接口相关教程

本专题整合了java接口相关内容,阅读专题下面的文章了解更多详细内容。

10

2026.01.19

xml格式相关教程
xml格式相关教程

本专题整合了xml格式相关教程汇总,阅读专题下面的文章了解更多详细内容。

13

2026.01.19

PHP WebSocket 实时通信开发
PHP WebSocket 实时通信开发

本专题系统讲解 PHP 在实时通信与长连接场景中的应用实践,涵盖 WebSocket 协议原理、服务端连接管理、消息推送机制、心跳检测、断线重连以及与前端的实时交互实现。通过聊天系统、实时通知等案例,帮助开发者掌握 使用 PHP 构建实时通信与推送服务的完整开发流程,适用于即时消息与高互动性应用场景。

19

2026.01.19

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Go 教程
Go 教程

共32课时 | 4万人学习

Go语言实战之 GraphQL
Go语言实战之 GraphQL

共10课时 | 0.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号