Java实现生产者消费者模型应优先使用BlockingQueue而非wait/notify,因其已封装线程安全、阻塞语义和容量控制;手写易出错,如虚假唤醒、锁不一致、if误用、状态检查缺失等,导致卡死或数据丢失。

Java 里实现生产者消费者模型,核心不是自己手写 wait/notify,而是优先用 BlockingQueue —— 它已封装线程安全、阻塞语义和容量控制,出错率低、可维护性强。
为什么别直接用 wait/notify 手写?
手写容易漏掉几个关键点:虚假唤醒没处理、锁对象不一致、条件判断用 if 而非 while、未在同步块内检查状态。一旦出错,程序会卡死或数据丢失,且难以复现。
典型错误现象:
- 生产者往满队列塞数据时没阻塞,抛
IllegalStateException或直接覆盖 - 消费者从空队列取数据时返回
null而非等待,导致空指针 - 多个生产者/消费者下出现重复消费或漏消费
BlockingQueue 的三种典型用法场景
选哪种取决于你对「容量控制」「阻塞策略」「响应性」的要求:
立即学习“Java免费学习笔记(深入)”;
-
ArrayBlockingQueue:固定大小、公平锁可选,适合明确容量上限的场景(如日志缓冲区) -
LinkedBlockingQueue:默认无界(实际是Integer.MAX_VALUE),吞吐高但可能 OOM;指定容量后行为接近ArrayBlockingQueue -
SynchronousQueue:不存储元素,每个put必须配一个take,适合任务交接型场景(如线程池的DirectHandoff)
一个最小可运行的双线程示例
以下代码演示一个生产者向队列塞整数、消费者从中取并打印,使用 ArrayBlockingQueue 确保严格容量控制:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
public class ProducerConsumerDemo {
private static final BlockingQueue queue = new ArrayBlockingQueue<>(3);
public static void main(String[] args) {
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
System.out.println("Producing: " + i);
queue.put(i); // 队列满时自动阻塞
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
Integer item = queue.take(); // 队列空时自动阻塞
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
注意:put() 和 take() 是阻塞式方法;若需超时或非阻塞,改用 offer(e, timeout, unit) 或 poll(timeout, unit)。
容易被忽略的边界点
实际项目中这几个细节常引发问题:
-
BlockingQueue不保证跨 JVM 进程可见性 —— 它只是线程间通信工具,不是分布式队列 - 若消费者处理逻辑抛异常未捕获,线程会退出,后续生产的数据将永远堆积在队列中
-
size()返回的是近似值,高并发下可能不准,不能用它做业务逻辑判断(比如“如果 size > 5 就告警”) - 用
LinkedBlockingQueue且未设容量时,put()几乎不会阻塞,但内存持续增长风险极高
真正复杂的场景(如多生产者多消费者、优先级消费、失败重试、监控埋点),建议直接上 Disruptor 或消息中间件,而不是在 BlockingQueue 上叠补丁。










