最稳妥的方式是使用BlockingQueue及其子类;它封装了线程安全、阻塞逻辑和容量控制,避免手写wait/notify易出现的虚假唤醒、错误唤醒和临界区遗漏等问题。

Java 里实现生产者消费者模式,最稳妥的方式不是手写 wait/notify,而是直接用 BlockingQueue 及其子类——它把线程安全、阻塞逻辑、容量控制全封装好了。
为什么别手写 wait/notify 实现
手写容易出错:漏掉 while 循环判断条件(导致虚假唤醒)、notifyAll() 写成 notify()(可能唤醒错类型线程)、锁范围没包住整个临界区。哪怕代码能跑通,也极难在高并发下稳定。
- 必须用
while检查条件,不能用if -
notify()只唤醒一个等待线程,但生产者和消费者共用同一把锁时,可能唤醒同类型线程(比如两个消费者都在等,唤醒另一个消费者毫无意义) - 所有共享状态读写必须在同步块内,包括判断、修改、通知
ArrayBlockingQueue 是最常用的选择
它基于数组、有界、线程安全、支持公平锁选项,适合对内存占用和吞吐量有明确预期的场景。相比 LinkedBlockingQueue,它的内存更可控,不会因突发流量无限扩张堆内存。
- 构造时必须指定容量:
new ArrayBlockingQueue(1024) - 默认非公平,加
true参数可启用公平模式:new ArrayBlockingQueue,但会降低吞吐(1024, true) -
put()和take()是阻塞版;offer()和poll()是带超时或立即返回的非阻塞版
public class ProducerConsumerExample {
private static final BlockingQueue queue = new ArrayBlockingQueue<>(8);
public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
queue.put("item-" + i);
System.out.println("Produced: item-" + i);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}, "Producer").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
String item = queue.take();
System.out.println("Consumed: " + item);
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}, "Consumer").start();
}
}
多个生产者或消费者时要注意什么
多个线程调用 put() 或 take() 本身是安全的,BlockingQueue 内部已处理竞态。但如果你在生产者里做耗时预处理(如 IO、计算),别把它塞进同步逻辑里——这会拖慢整个队列吞吐。
立即学习“Java免费学习笔记(深入)”;
- 预处理(如读文件、解析 JSON)应在
put()前完成,避免阻塞队列内部锁 - 如果需要顺序消费,
ArrayBlockingQueue保证 FIFO,但多个消费者线程会打破“全局顺序”,只保证单个消费者看到的顺序 - 异常处理要重置中断状态:
Thread.currentThread().interrupt(),否则上层无法感知中断意图
真正容易被忽略的是:队列满时生产者阻塞,队列空时消费者阻塞——这个“阻塞”是通过线程挂起实现的,不消耗 CPU,但会占用线程栈资源。如果用的是固定大小线程池(比如 Executors.newFixedThreadPool(4)),而所有线程都卡在 put/take 上,新任务就永远得不到执行。










