python生产者消费者模型用队列(如queue.queue、multiprocessing.queue)解耦任务生成与处理,因其内置锁机制避免竞态条件;线程用queue.queue配task_done()/join(),进程用multiprocessing.queue并注意close()/join_thread();协程版用asyncio.queue适配i/o密集型。

Python中的生产者消费者模型是解决并发任务调度的经典设计模式,核心在于解耦任务生成与处理逻辑,通过队列协调多线程或多进程协作。
为什么用队列而不是全局变量
直接共享变量容易引发竞态条件和数据不一致。queue.Queue(线程安全)或multiprocessing.Queue(进程间安全)内部已实现锁机制,put()和get()自动加锁,避免手动同步的复杂性。
- 线程场景优先用queue.Queue,支持block、timeout和task_done()配合join()
- 多进程场景必须用multiprocessing.Queue,不能跨进程共享普通列表或字典
- 若需高吞吐,可考虑queue.SimpleQueue(无阻塞/超时,轻量但功能少)
标准线程版实现要点
关键不是写多少线程,而是控制好退出条件和资源清理。
UQCMS云商是一款B2B2C电子商务软件 ,非常适合初创的创业者,个人及中小型企业。程序采用PHP+MYSQL,模板采用smarty模板,二次开发,简单方便,无需学习其他框架就可以自行模板设计。永久免费使用,操作简单,安全稳定。支持PC+WAP+微信三种浏览方式,支持微信公众号。
- 生产者完成生产后,向队列放入None或预定义哨兵值,消费者读到即退出
- 调用queue.task_done()标记单个任务完成,主流程用queue.join()等待全部处理完毕
- 避免让消费者死循环轮询——用queue.get(timeout=1)代替无限get(),防止卡死
进程版要注意的坑
multiprocessing.Queue不支持close()后再次使用,且子进程异常退出可能导致队列卡住。
立即学习“Python免费学习笔记(深入)”;
- 务必在每个子进程里显式调用queue.close()和queue.join_thread()
- 生产者进程应限制总任务数,防止队列爆内存(可用queue.qsize()做粗略监控,但注意该方法在Unix和Windows下行为不同)
- 传递复杂对象时,确保可被pickle序列化;函数不能是lambda或嵌套定义
进阶:协程版(asyncio + asyncio.Queue)
适合I/O密集型场景,比线程更轻量,但要求所有操作都是异步的。
- 用await queue.put(item)和await queue.get(),非阻塞但协程友好
- 取消任务时,需主动cancel()对应task,并用try/except CancelledError清理
- 不要混用threading和asyncio —— 协程中调用阻塞IO会拖垮整个事件循环









