
在许多实际应用中,我们可能会遇到这样的场景:一个核心计算任务需要耗费大量时间(例如数小时),而另一个任务则需要频繁地(例如每隔几秒)获取并使用这个计算结果的最新值。如果简单地串行执行,实时性需求将无法满足,因为频繁获取结果的任务必须等待耗时计算完成后才能进行。本文将详细介绍如何利用python的multiprocessing模块来优雅地解决这一问题,实现长时间计算与实时结果输出的异步并行。
假设我们有两个函数:
如果Sum函数直接等待Calculate_a的返回值,那么它将不得不等待5小时,这显然不符合实时输出的要求。我们需要一种机制,让Sum函数能够持续运行,并始终使用Calculate_a函数已经计算出的最新a值,即使Calculate_a正在进行新的、尚未完成的计算。
解决这类问题的关键在于并发执行和进程间通信(IPC)。Python的multiprocessing模块允许我们创建独立的进程,每个进程拥有自己的内存空间,从而能够真正地并行执行任务,并且不受全局解释器锁(GIL)的限制,这对于CPU密集型任务尤为重要。
为了在不同进程之间共享数据,multiprocessing提供了多种IPC机制,其中Manager和Namespace组合非常适合本场景。Manager可以创建一个服务进程,管理共享对象,而Namespace则是一种简单的共享对象,允许通过属性访问共享数据。
立即学习“Python免费学习笔记(深入)”;
具体步骤如下:
这样,进程B无需等待进程A完成当前的所有计算,它总是能获取到进程A最近一次更新的a值,从而满足实时性要求。
multiprocessing模块提供了一个Process类,用于创建和管理子进程。每个Process实例都代表一个独立的操作系统进程,拥有独立的内存空间。这意味着它们可以并行运行,尤其适用于CPU密集型任务,因为它们不受Python GIL的限制。
下面我们将通过一个完整的示例来演示如何实现上述解决方案。为了方便演示,我们将“5小时”的计算时间缩短为几秒,并将“每5秒输出”改为“每1秒输出”,但核心逻辑保持不变。
import time
import random
from multiprocessing import Process, Manager
# 模拟耗时计算函数:计算 'a' 的值
def calculate_a_task(manager_namespace):
"""
此函数在独立进程中运行,模拟长时间计算并更新共享变量 'a'。
"""
current_a = 0
iteration = 0
# 使用一个共享的 'running' 标志来控制进程的优雅停止
while manager_namespace.running:
iteration += 1
print(f"[{time.strftime('%H:%M:%S')}] Process A (Calc): Starting calculation {iteration} for 'a'...")
# 模拟长时间计算,例如5秒(原问题中的5小时)
# 实际应用中这里是复杂的计算逻辑
time.sleep(5)
# 模拟新的计算结果
current_a = random.randint(100, 200) + iteration * 10
manager_namespace.a = current_a # 更新共享的 'a' 值
print(f"[{time.strftime('%H:%M:%S')}] Process A (Calc): 'a' updated to {manager_namespace.a}")
# 稍微暂停一下,避免CPU空转过快,实际应用中可能不需要
# time.sleep(0.1)
# 模拟实时输出函数:计算 a + b 并输出
def sum_ab_task(manager_namespace, b_value):
"""
此函数在独立进程中运行,持续读取共享变量 'a' 并与 'b' 求和输出。
"""
print(f"[{time.strftime('%H:%M:%S')}] Process B (Sum): Starting to output sum every 1 second (b={b_value})...")
# 使用一个共享的 'running' 标志来控制进程的优雅停止
while manager_namespace.running:
# 确保 'a' 已经被初始化,避免启动时读取到未定义的变量
if hasattr(manager_namespace, 'a'):
current_a = manager_namespace.a # 读取共享的 'a' 值
s = current_a + b_value
print(f"[{time.strftime('%H:%M:%S')}] Process B (Sum): Current 'a' = {current_a}, Sum (a+b) = {s}")
else:
print(f"[{time.strftime('%H:%M:%S')}] Process B (Sum): Waiting for initial 'a' value...")
# 每隔1秒输出一次结果(原问题中的5秒)
time.sleep(1)
if __name__ == '__main__':
# 1. 初始化 Manager 和 Namespace
# Manager 用于管理可以在进程间共享的对象
manager = Manager()
# Namespace 是一个简单的共享对象,允许通过属性访问数据
global_ns = manager.Namespace()
# 2. 初始化共享变量 'a' 和控制进程运行的标志
# 确保 'a' 有一个初始值,避免 Process B 启动时出错
global_ns.a = 0
# 添加一个共享的标志,用于控制子进程的循环,实现优雅停止
global_ns.running = True
# 3. 定义常量 'b' 的值
b_value = 50
# 4. 创建并启动子进程
# Process A: 负责计算 'a'
p1 = Process(target=calculate_a_task, args=(global_ns,))
# Process B: 负责实时求和并输出
p2 = Process(target=sum_ab_task, args=(global_ns, b_value))
p1.start() # 启动进程 A
p2.start() # 启动进程 B
print(f"[{time.strftime('%H:%M:%S')}] Main Process: Child processes started. Running for 20 seconds for demonstration...")
# 主进程等待一段时间,让子进程运行
# 实际应用中,主进程可能需要做其他事情,或者等待外部信号来停止子进程
time.sleep(20)
print(f"[{time.strftime('%H:%M:%S')}] Main Process: Signalling child processes to stop...")
# 5. 优雅地停止子进程
# 通过修改共享的 'running' 标志,通知子进程退出循环
global_ns.running = False
# 等待子进程结束。设置超时,避免无限等待
p1.join(timeout=5)
p2.join(timeout=5)
# 如果子进程在超时时间内未能结束,则强制终止
if p1.is_alive():
print(f"[{time.strftime('%H:%M:%S')}] Main Process: Process A is still alive, terminating forcefully.")
p1.terminate()
if p2.is_alive():
print(f"[{time.strftime('%H:%M:%S')}] Main Process: Process B is still alive, terminating forcefully.")
p2.terminate()
print(f"[{time.strftime('%H:%M:%S')}] Main Process: All child processes stopped.")
manager.shutdown() # 关闭 Manager 服务进程代码解析:
通过multiprocessing模块,特别是Process和Manager.Namespace的结合使用,我们能够有效地将长时间运行的计算任务与需要实时更新的输出任务解耦。这种模式使得一个进程可以在后台
以上就是Python多进程:实现长时间计算与实时结果的异步更新与共享的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号