生产者消费者模式是解耦任务生成与执行的并发模型,由生产者线程向线程安全队列put任务、消费者线程get并处理,配合task_done和join实现同步,适用于日志处理等高吞吐场景。

什么是生产者消费者模式
生产者消费者模式是一种经典的并发编程模型,用于解耦任务的生成与执行。在Python任务调度场景中,它表现为:一个或多个线程(或进程)负责生成任务(生产者),把任务放入共享队列;另一个或多个工作线程(消费者)持续从队列中取出任务并执行。这种结构天然适合异步、批量、高吞吐的任务调度系统,比如日志处理、定时爬虫、消息推送等。
核心组件:队列 + 线程/进程协作
Python标准库中的 queue.Queue 是实现该模式的关键——它线程安全,内置阻塞与超时机制,无需手动加锁。搭配 threading 或 multiprocessing 模块即可快速搭建调度骨架。
- 使用 queue.Queue(maxsize=0) 创建无限长队列;设为正整数可控制缓冲区大小,避免内存溢出
- 生产者调用 put(item) 入队,可选 block=True, timeout=2 防止无限等待
- 消费者调用 get() 出队,配合 task_done() 和 join() 实现任务完成同步
- 推荐用守护线程(daemon=True)运行消费者,主程序退出时自动终止
一个轻量可用的调度示例
下面是一个基于多线程的最小可行调度器,模拟“每秒生成3个任务,由2个工人并发处理”:
<font size="2"><pre class="brush:php;toolbar:false;">import queue
import threading
import time
import random
<h1>共享任务队列</h1><p>task_queue = queue.Queue()</p><p><span>立即学习</span>“<a href="https://pan.quark.cn/s/00968c3c2c15" style="text-decoration: underline !important; color: blue; font-weight: bolder;" rel="nofollow" target="_blank">Python免费学习笔记(深入)</a>”;</p><p>def producer():
for i in range(10):
task = f"task-{i}"
print(f"[生产] {task}")
task_queue.put(task)
time.sleep(1) # 模拟不均匀生产节奏</p><p>def worker(worker_id):
while True:
try:
task = task_queue.get(timeout=3) # 3秒无任务则退出
print(f"[工人{worker_id}] 正在处理 {task}")
time.sleep(random.uniform(0.5, 2)) # 模拟耗时操作
task_queue.task_done()
except queue.Empty:
break</p><h1>启动1个生产者、2个消费者</h1><p>threading.Thread(target=producer).start()
for i in range(2):
threading.Thread(target=worker, args=(i+1,), daemon=True).start()</p><h1>等待所有任务完成</h1><p>task_queue.join()
print("全部任务执行完毕")
进阶建议:应对真实生产环境
简单示例满足学习,但上线需考虑健壮性与可观测性:








