
本教程详细探讨了如何在python中实现主脚本与多个独立后台任务的并发执行,并有效管理并发任务数量。文章介绍了从基础线程(`threading`)到线程池(`concurrent.futures.threadpoolexecutor`)的应用,以及如何通过信号量(`threading.semaphore`和`asyncio.semaphore`)精确控制任务的调度和并发上限。此外,还涵盖了异步编程(`asyncio`)作为处理i/o密集型后台任务的强大替代方案,旨在帮助开发者构建高效、稳定的并发系统。
在现代应用程序开发中,经常会遇到主程序需要持续运行,同时又需要触发一些耗时但又不影响主程序流程的后台任务的场景。例如,一个监控机器人需要每隔一段时间检查库存状态,一旦发现商品有货,就立即发送通知给一部分用户,并延迟一段时间后发送给另一部分用户。这种情况下,延迟发送的通知任务不应阻塞主监控循环,且可能存在多个此类延迟任务同时进行,但需要限制其总数。本文将深入探讨Python中实现此类并发任务管理的多种策略。
核心需求可以概括为:
针对这些需求,Python提供了多种并发编程模型,包括多线程、多进程和异步IO。
最直接的实现方式是使用Python的threading模块创建新线程来执行后台任务。每个后台任务都在一个独立的线程中运行,不会阻塞主线程。
立即学习“Python免费学习笔记(深入)”;
import time
import random
from threading import Thread
def background_task():
"""后台任务:模拟10秒延迟后完成操作"""
print("后台任务:开始休眠10秒...")
time.sleep(10)
print("后台任务:休眠结束,操作完成。")
def main_script():
"""主脚本:每3秒检查一次条件,满足则启动后台任务"""
while True:
print("主脚本:开始休眠3秒...")
time.sleep(3)
print("主脚本:休眠结束。")
# 模拟一个随机条件触发后台任务
if random.random() > 0.5:
print("主脚本:条件满足,启动后台任务。")
t = Thread(target=background_task)
t.start()
else:
print("主脚本:条件不满足。")
if __name__ == "__main__":
main_script()说明:
注意事项: 这种方法虽然简单,但存在一个潜在问题:如果主脚本频繁触发后台任务,可能会创建无限数量的线程,导致系统资源耗尽。因此,需要一种机制来限制并发任务的数量。
为了更好地管理和限制并发线程的数量,Python提供了concurrent.futures模块,其中的ThreadPoolExecutor可以创建一个线程池。线程池会自动管理线程的创建和销毁,并限制同时运行的线程数量。
from concurrent.futures import ThreadPoolExecutor
import random
import time
# 创建一个线程池,最大并发线程数为3(根据实际需求调整)
# 示例中为了演示效果,使用较小的池大小,实际应用中可根据任务特性和系统资源调整
thread_pool = ThreadPoolExecutor(max_workers=3)
def background_task():
"""后台任务:模拟10秒延迟后完成操作"""
print("后台任务:开始休眠10秒...")
time.sleep(10)
print("后台任务:休眠结束,操作完成。")
def main_script_with_pool():
"""主脚本:使用线程池管理后台任务"""
while True:
print("主脚本:开始休眠3秒...")
time.sleep(3)
print("主脚本:休眠结束。")
if random.random() > 0.5:
print("主脚本:条件满足,提交后台任务到线程池。")
thread_pool.submit(background_task)
else:
print("主脚本:条件不满足。")
if __name__ == "__main__":
main_script_with_pool()说明:
注意事项:ThreadPoolExecutor确实限制了同时运行的线程数量。然而,submit方法会将任务放入一个内部队列。这意味着,即使所有线程都在忙碌,主脚本仍然可以无限地向线程池提交任务,导致队列无限增长,这可能仍然会消耗大量内存。如果需要限制已调度任务(包括正在运行和等待队列中的任务)的总数,就需要引入信号量。
信号量(Semaphore)是一种同步原语,用于限制对共享资源的并发访问数量。在这里,我们可以用它来限制正在执行或等待执行的后台任务的总数。当信号量计数器达到上限时,主脚本在尝试获取信号量时会被阻塞,直到有任务完成并释放信号量。
from concurrent.futures import ThreadPoolExecutor
from threading import Semaphore
import random
import time
# 创建一个线程池,最大并发线程数为3
thread_pool = ThreadPoolExecutor(max_workers=3)
# 创建一个信号量,允许最多5个任务(包括正在运行和等待的)被调度
# 信号量的大小应大于或等于线程池的大小,以允许线程池充分利用
task_semaphore = Semaphore(5)
def background_task_with_semaphore():
"""后台任务:模拟10秒延迟后完成操作,完成后释放信号量"""
try:
print("后台任务:开始休眠10秒...")
time.sleep(10)
print("后台任务:休眠结束,操作完成。")
finally:
# 无论任务成功或失败,都必须释放信号量
task_semaphore.release()
print("后台任务:信号量已释放。")
def main_script_with_semaphore():
"""主脚本:使用线程池和信号量管理后台任务"""
while True:
print("主脚本:开始休眠3秒...")
time.sleep(3)
print("主脚本:休眠结束。")
if random.random() > 0.5:
print("主脚本:条件满足,尝试获取信号量...")
# 尝试获取信号量,如果当前任务数已达上限,主脚本会在此处阻塞
task_semaphore.acquire()
print("主脚本:信号量已获取,提交后台任务到线程池。")
# 提交任务到线程池,任务完成后会自行释放信号量
thread_pool.submit(background_task_with_semaphore)
else:
print("主脚本:条件不满足。")
if __name__ == "__main__":
main_script_with_semaphore()说明:
优点: 这种方法能够精确控制同时处于“活跃”状态(运行中或等待中)的任务总数,防止任务队列无限增长,从而有效管理系统资源。
对于I/O密集型任务(如网络请求、文件读写、延迟等待),Python的asyncio库提供了一种基于协程的并发模型。它通过单个线程的事件循环来高效地管理大量并发操作,避免了线程切换的开销。
import asyncio
import random
# 创建一个异步信号量,限制最多3个并发任务
async_task_semaphore = asyncio.Semaphore(3)
async def async_background_task():
"""异步后台任务:模拟10秒延迟后完成操作,完成后释放信号量"""
try:
print("异步后台任务:开始休眠10秒...")
await asyncio.sleep(10) # 使用await进行非阻塞休眠
print("异步后台任务:休眠结束,操作完成。")
finally:
async_task_semaphore.release()
print("异步后台任务:信号量已释放。")
async def async_main_script():
"""异步主脚本:使用异步信号量管理后台任务"""
while True:
print("异步主脚本:开始休眠3秒...")
await asyncio.sleep(3) # 使用await进行非阻塞休眠
print("异步主脚本:休眠结束。")
if random.random() > 0.5:
print("异步主脚本:条件满足,尝试获取异步信号量...")
# 尝试获取异步信号量,如果任务数已达上限,主脚本会在此处暂停(非阻塞)
await async_task_semaphore.acquire()
print("异步主脚本:异步信号量已获取,创建异步任务。")
# 创建并调度一个异步任务
asyncio.create_task(async_background_task())
else:
print("异步主脚本:条件不满足。")
if __name__ == "__main__":
asyncio.run(async_main_script())说明:
优点:asyncio特别适合处理大量I/O密集型任务,因为它避免了多线程/多进程的上下文切换开销,能够以更低的资源消耗实现高并发。
虽然上述示例主要聚焦于线程和异步IO,但多进程(multiprocessing)也是一种实现并发的方式。当任务是CPU密集型时(例如大量计算),由于Python的全局解释器锁(GIL)限制了同一时刻只有一个线程能执行Python字节码,多进程能更好地利用多核CPU。
concurrent.futures模块也提供了ProcessPoolExecutor,其用法与ThreadPoolExecutor类似,可以用来创建进程池并管理并发进程。与线程类似,进程池也可以结合multiprocessing.Semaphore来限制任务的调度。
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Semaphore # 注意这里是multiprocessing的Semaphore
import random
import time
# 创建一个进程池,最大并发进程数为3
process_pool = ProcessPoolExecutor(max_workers=3)
# 创建一个进程信号量
process_semaphore = Semaphore(5)
def cpu_bound_task():
"""模拟一个CPU密集型任务"""
print("进程任务:开始计算...")
# 模拟计算
sum(range(10**7))
print("进程任务:计算完成。")
process_semaphore.release()
def main_script_with_process_pool():
while True:
print("主脚本:开始休眠3秒...")
time.sleep(3)
print("主脚本:休眠结束。")
if random.random() > 0.5:
print("主脚本:条件满足,尝试获取进程信号量...")
process_semaphore.acquire()
print("主脚本:进程信号量已获取,提交进程任务到进程池。")
process_pool.submit(cpu_bound_task)
else:
print("主脚本:条件不满足。")
if __name__ == "__main__":
main_script_with_process_pool()选择建议:
本文详细介绍了在Python中实现主脚本与独立后台任务并发执行,并有效控制并发数量的多种方法。
最佳实践:
通过合理选择和组合这些工具,开发者可以构建出高效、稳定且资源友好的并发应用程序。
以上就是Python中实现主脚本与独立后台任务并发执行及并发限制的教程的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号