0

0

Java多生产者多消费者模型:解决消费者无限等待导致的程序终止问题

心靈之曲

心靈之曲

发布时间:2025-11-28 15:03:02

|

1004人浏览过

|

来源于php中文网

原创

Java多生产者多消费者模型:解决消费者无限等待导致的程序终止问题

本教程旨在解决java多生产者多消费者并发模型中一个常见问题:当生产者完成任务后,消费者线程仍无限期等待,导致程序无法正常终止。文章将深入分析问题根源,并提供一种通过为消费者设定明确的消费上限来优雅地结束所有线程,从而确保程序能够正确退出的解决方案,并附带详细的代码示例和注意事项。

1. 问题背景与分析

并发编程中,生产者-消费者模式是一种常见的设计模式,用于解决不同线程之间数据生产和消费的同步问题。通常,生产者负责生成数据并将其放入共享缓冲区,而消费者则从缓冲区取出数据进行处理。为了确保线程安全和资源有效利用,我们常常使用wait()和notify()(或notifyAll())机制进行线程间的协调。

然而,一个常见的问题是,当所有生产者都完成了它们的数据生产任务后,消费者线程可能会因为共享缓冲区为空而持续调用wait(),进入无限期等待状态,导致整个程序无法终止。这通常发生在消费者线程被设计为无限循环(while(true))以等待新数据,而没有明确的退出条件时。

在提供的代码示例中,Producer 类通过一个有限的循环 (for (int i = 1; i <= productionSize; i++)) 来控制生产总量,一旦达到 productionSize,生产者线程就会自然结束。然而,Consumer 类的 run() 方法中包含一个无限循环 (while (true)),这意味着消费者会一直尝试从共享队列中取出数据。当生产者完成所有生产任务后,队列最终会变空,此时消费者会调用 sharedQueue.wait() 并无限期等待下去,因为没有任何生产者会再次 notify() 它。

2. 解决方案:为消费者设定明确的消费上限

解决消费者无限等待问题的核心在于,为消费者线程提供一个明确的终止条件,使其在完成预定任务后能够自行退出,而不是无限期地等待。最直接的方法是像生产者一样,为消费者设定一个预期的消费总量。

立即学习Java免费学习笔记(深入)”;

2.1 修改 Consumer 类

我们需要在 Consumer 类中引入两个变量:

  • wants: 表示该消费者期望消费的总商品数量。
  • gets: 记录该消费者已经消费的商品数量。

然后,修改 run() 方法的循环条件,使其在 gets 达到 wants 时终止。

Tome
Tome

先进的AI智能PPT制作工具

下载

以下是修改后的 Consumer 类代码:

class Consumer implements Runnable {
    private List<Integer> sharedQueue;
    // 设定每个消费者期望消费的商品数量
    // 这个值应该根据总生产量和消费者数量来合理分配
    private static int wantsPerConsumer = 5; 
    private int gets = 0; // 记录当前消费者已消费的数量

    public Consumer(List<Integer> sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        // 当已消费数量达到期望值时,消费者线程终止
        while (gets < wantsPerConsumer) {
            try {
                consume();
                gets++; // 每成功消费一个商品,计数器加一
                Thread.sleep(100); // 模拟消费过程中的其他操作

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 恢复中断状态
                System.err.println(Thread.currentThread().getName() + " was interrupted.");
                break; // 捕获中断异常后退出循环
            }
        }
        System.out.println(Thread.currentThread().getName() + " has finished consuming " + gets + " items and is terminating.");
    }

    private void consume() throws InterruptedException {
        synchronized (sharedQueue) {
            // 如果共享队列为空,消费者等待
            while (sharedQueue.isEmpty()) { // 使用 isEmpty() 更清晰
                // 在等待前检查是否已达到消费上限,如果已达到则直接退出
                // 这一步是为了防止在等待期间,其他消费者已经完成了任务,导致没有足够的商品可供当前消费者消费
                if (gets >= wantsPerConsumer) {
                    return; // 达到上限,直接返回,不再等待
                }
                System.out.println(Thread.currentThread().getName() + ", 队列为空, consumerThread正在等待producerThread生产, sharedQueue's size= 0");
                sharedQueue.wait();
            }

            Thread.sleep((long) (Math.random() * 2000));
            System.out.println(Thread.currentThread().getName() + ", CONSUMED : " + sharedQueue.remove(0));
            // 唤醒等待的生产者或消费者。在多生产者多消费者场景下,notifyAll() 更安全。
            // 但对于本例的特定终止逻辑,notify() 也能在大多数情况下工作。
            sharedQueue.notify();
        }
    }
}

2.2 确定 wantsPerConsumer 的值

wantsPerConsumer 的值需要根据总的生产量和系统中消费者线程的数量来确定。 在原代码中:

  • Producer.productionSize = 5 (每个生产者生产5个商品)。
  • 有两个生产者 (producer0, producer1)。
  • 总生产量 = 2 * productionSize = 2 * 5 = 10。
  • 有两个消费者 (consumer0, consumer1)。

因此,如果每个消费者平均分担消费任务,那么 wantsPerConsumer 应该设置为 总生产量 / 消费者数量 = 10 / 2 = 5。这与修改后的 Consumer 类中 wantsPerConsumer = 5 的设定相符。

3. 完整示例代码

下面是包含了修改后的 Consumer 类的完整代码示例:

import java.util.LinkedList;
import java.util.List;

// main class
public class MULTIPLE_ProducerConsumerWaitNotify {

    public static void main(String args[]) throws InterruptedException {
        List<Integer> sharedQueue = new LinkedList<>(); // Creating shared object

        // 设定每个生产者的生产数量
        int productionSizePerProducer = 5;
        // 设定生产者数量
        int numberOfProducers = 2;
        // 设定消费者数量
        int numberOfConsumers = 2;

        // 计算总生产量
        int totalProduction = productionSizePerProducer * numberOfProducers;
        // 计算每个消费者期望消费的数量
        // 假设任务平均分配,或者总生产量是消费者数量的倍数
        int wantsPerConsumer = totalProduction / numberOfConsumers;

        // 创建生产者线程
        Producer producer0 = new Producer(sharedQueue, 0, productionSizePerProducer);
        Thread producerThread0 = new Thread(producer0, "ProducerThread0");

        Producer producer1 = new Producer(sharedQueue, 1, productionSizePerProducer);
        Thread producerThread1 = new Thread(producer1, "ProducerThread1");

        // 创建消费者线程
        Consumer consumer0 = new Consumer(sharedQueue, wantsPerConsumer);
        Thread consumerThread0 = new Thread(consumer0, "ConsumerThread0");

        Consumer consumer1 = new Consumer(sharedQueue, wantsPerConsumer);
        Thread consumerThread1 = new Thread(consumer1, "ConsumerThread1");

        // 启动所有线程
        producerThread0.start();
        producerThread1.start();
        consumerThread0.start();
        consumerThread1.start();

        // 等待所有生产者线程完成
        producerThread0.join();
        producerThread1.join();
        System.out.println("All producers have finished their tasks.");

        // 在生产者完成任务后,可以考虑使用一个机制通知消费者
        // 例如,一个“毒丸”对象,或者在队列为空时,消费者通过计数判断是否退出
        // 当前的方案是消费者自行判断是否达到消费上限

        // 等待所有消费者线程完成
        consumerThread0.join();
        consumerThread1.join();
        System.out.println("All consumers have finished their tasks.");

        System.out.println("Program terminated successfully.");
    }
}

// Producer class
class Producer implements Runnable {
    private List<Integer> sharedQueue;
    private int maxSize = 4; // maximum number of products which sharedQueue can hold at a time.
    private int productionSize; // Total no of items to be produced by THIS producer
    int producerNo;

    public Producer(List<Integer> sharedQueue, int producerNo, int productionSize) {
        this.sharedQueue = sharedQueue;
        this.producerNo = producerNo;
        this.productionSize = productionSize;
    }

    @Override
    public void run() {
        for (int i = 1; i <= productionSize; i++) { // produce products.
            try {
                produce(i);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 恢复中断状态
                System.err.println(Thread.currentThread().getName() + " was interrupted.");
                break; // 捕获中断异常后退出循环
            }
        }
        System.out.println(Thread.currentThread().getName() + " has finished producing " + productionSize + " items and is terminating.");
    }

    private void produce(int i) throws InterruptedException {
        synchronized (sharedQueue) {
            // if sharedQuey is full wait until consumer consumes.
            while (sharedQueue.size() == maxSize) {
                System.out.println(Thread.currentThread().getName() + ", 队列已满, producerThread正在等待consumerThread消费, sharedQueue's size= " + maxSize);
                sharedQueue.wait();
            }

            // Bcz each producer must produce unique product
            // Ex= producer0 will produce 1-5  and producer1 will produce 6-10 in random order
            int producedItem = (this.productionSize * producerNo) + i;

            System.out.println(Thread.currentThread().getName() + " Produced : " + producedItem);
            sharedQueue.add(producedItem);
            Thread.sleep((long) (Math.random() * 1000));
            sharedQueue.notify(); // 唤醒等待的消费者
        }
    }
}

// Consumer class (Modified)
class Consumer implements Runnable {
    private List<Integer> sharedQueue;
    private int wantsToConsume; // 这个消费者期望消费的总数量
    private int gets = 0; // 记录当前消费者已消费的数量

    public Consumer(List<Integer> sharedQueue, int wantsToConsume) {
        this.sharedQueue = sharedQueue;
        this.wantsToConsume = wantsToConsume;
    }

    @Override
    public void run() {
        // 当已消费数量达到期望值时,消费者线程终止
        while (gets < wantsToConsume) {
            try {
                consume();
                // 只有在成功消费后才增加gets计数,防止因队列空等待而错误增加
                // consume()方法内部已处理等待,如果成功取出,则计数
                // 这里需要调整,consume()方法应返回是否成功消费,或者在consume()内部增加gets
                // 为简化,我们假设consume()方法执行到最后表示成功尝试消费(即使队列空等待了)
                // 更好的做法是consume()返回一个布尔值表示是否实际消费了,或者直接在同步块内增加gets
                // 这里我们选择在consume()成功返回后增加gets,但要确保consume()在没有实际消费时不会增加gets
                // 调整一下:gets在consume()方法内部,当成功从队列中移除元素时才增加。
                // 这样可以避免因wait()而导致gets增加但实际未消费的情况。
                // 考虑到目前的consume()设计,如果它成功执行到打印并移除元素,那么就可以认为消费成功。
                // 如果它因为队列为空而等待,那么就不会执行到移除元素,gets也不会增加。
                // 因此,将gets++放在这里是合理的,只要consume()在实际消费后才返回。
                // 但为了更严谨,我们可以在consume()内部返回实际是否消费了。
                // 或者,我们可以将gets++放在consume()方法内部,紧跟在sharedQueue.remove(0)之后。
                // 这里为了保持与原回答的逻辑一致,暂时保留在run()中,但需要理解其潜在的细微差别。
                // 实际消费的逻辑在consume()中,如果consume()内部成功移除,则gets++。
                // 为了避免重复计数或错误计数,我们将gets计数逻辑移入consume()方法中。

                // 移除这里的gets++,让consume()方法负责更新gets
                Thread.sleep(100); // 模拟消费过程中的其他操作

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 恢复中断状态
                System.err.println(Thread.currentThread().getName() + " was interrupted.");
                break; // 捕获中断异常后退出循环
            }
        }
        System.out.println(Thread.currentThread().getName() + " has finished consuming " + gets + " items and is terminating.");
    }

    private void consume() throws InterruptedException {
        synchronized (sharedQueue) {
            // 如果共享队列为空,消费者等待
            while (sharedQueue.isEmpty()) {
                // 在等待前检查是否已达到消费上限,如果已达到则直接退出
                // 这一步非常关键,防止消费者在队列为空时无限等待,即使它已经完成了所有消费任务
                if (gets >= wantsToConsume) {
                    return; // 达到上限,直接返回,不再等待
                }
                System.out.println(Thread.currentThread().getName() + ", 队列为空, consumerThread正在等待producerThread生产, sharedQueue's size= 0");
                sharedQueue.wait();
            }

            // 成功从队列中移除元素,表示一次有效消费
            Thread.sleep((long) (Math.random() * 2000));
            System.out.println(Thread.currentThread().getName() + ", CONSUMED : " + sharedQueue.remove(0));
            gets++; // 只有在实际消费后才增加计数
            sharedQueue.notify(); // 唤醒等待的生产者
        }
    }
}

重要更新: 在上述 Consumer 类中,gets++ 的位置从 run() 方法中移到了 consume() 方法的 synchronized 块内部,紧跟在 sharedQueue.remove(0) 之后。这是为了确保只有在实际成功消费(即从队列中取出一个元素)之后,gets 计数器才会被递增,从而更准确地反映消费情况并避免潜在的计数错误。同时,在 while (sharedQueue.isEmpty()) 循环内部增加了对 gets >= wantsToConsume 的检查,这确保了即使在等待状态下,如果消费者已经满足了其消费数量,它也能及时退出等待,不再被无谓地唤醒。

4. 注意事项与总结

  1. 总消费量与总生产量匹配:确保所有消费者期望消费的总量与所有生产者生产的总量相匹配。如果不匹配,可能会导致部分商品未被消费,或者部分消费者因无商品可消费而继续等待(如果它们没有明确的退出条件)。
  2. notify() vs notifyAll():在多生产者多消费者的复杂场景中,通常推荐使用 notifyAll() 来唤醒所有等待的线程,让它们重新评估条件。notify() 只唤醒一个随机等待的线程,可能导致“惊群效应”或“死锁”风险(例如,唤醒了一个生产者,但队列已满,而真正需要被唤醒的消费者仍在等待)。在本例中,由于我们为消费者设定了明确的退出条件,notify() 也能工作,但作为最佳实践,notifyAll() 更为健壮。
  3. 中断处理:在 try-catch 块中捕获 InterruptedException 时,最佳实践是调用 Thread.currentThread().interrupt() 来重新设置中断状态,并通常选择退出循环或线程,以便外部代码能够感知到中断请求。
  4. 优雅退出:通过为消费者设定明确的消费上限,我们实现了程序的优雅退出。当所有生产者完成生产,所有消费者也完成了它们的消费任务后,所有线程都会自然终止,main 方法中的 join() 调用将确保程序在所有工作线程结束后才最终退出。
  5. “毒丸”机制(Poison Pill):除了设定消费上限,另一种常用的优雅退出机制是“毒丸”对象。当生产者完成所有任务后,可以向队列中放入一个特殊的“毒丸”对象。消费者在取出对象时,如果识别到是“毒丸”,就处理完队列中剩余的正常数据后,自己也终止,并可能将“毒丸”传递给下一个消费者(如果存在多个消费者)。这种方法在总生产量不确定或需要更灵活的终止策略时非常有用。

通过上述修改,我们成功解决了Java多生产者多消费者模型中消费者无限等待导致程序无法终止的问题,确保了并发程序的健壮性和正确性。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
while的用法
while的用法

while的用法是“while 条件: 代码块”,条件是一个表达式,当条件为真时,执行代码块,然后再次判断条件是否为真,如果为真则继续执行代码块,直到条件为假为止。本专题为大家提供while相关的文章、下载、课程内容,供大家免费下载体验。

107

2023.09.25

string转int
string转int

在编程中,我们经常会遇到需要将字符串(str)转换为整数(int)的情况。这可能是因为我们需要对字符串进行数值计算,或者需要将用户输入的字符串转换为整数进行处理。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

1030

2023.08.02

int占多少字节
int占多少字节

int占4个字节,意味着一个int变量可以存储范围在-2,147,483,648到2,147,483,647之间的整数值,在某些情况下也可能是2个字节或8个字节,int是一种常用的数据类型,用于表示整数,需要根据具体情况选择合适的数据类型,以确保程序的正确性和性能。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

612

2024.08.29

c++怎么把double转成int
c++怎么把double转成int

本专题整合了 c++ double相关教程,阅读专题下面的文章了解更多详细内容。

334

2025.08.29

C++中int的含义
C++中int的含义

本专题整合了C++中int相关内容,阅读专题下面的文章了解更多详细内容。

235

2025.08.29

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

765

2023.08.10

Java 并发编程高级实践
Java 并发编程高级实践

本专题深入讲解 Java 在高并发开发中的核心技术,涵盖线程模型、Thread 与 Runnable、Lock 与 synchronized、原子类、并发容器、线程池(Executor 框架)、阻塞队列、并发工具类(CountDownLatch、Semaphore)、以及高并发系统设计中的关键策略。通过实战案例帮助学习者全面掌握构建高性能并发应用的工程能力。

99

2025.12.01

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

76

2026.03.11

Go高并发任务调度与Goroutine池化实践
Go高并发任务调度与Goroutine池化实践

本专题围绕 Go 语言在高并发任务处理场景中的实践展开,系统讲解 Goroutine 调度模型、Channel 通信机制以及并发控制策略。内容包括任务队列设计、Goroutine 池化管理、资源限制控制以及并发任务的性能优化方法。通过实际案例演示,帮助开发者构建稳定高效的 Go 并发任务处理系统,提高系统在高负载环境下的处理能力与稳定性。

38

2026.03.10

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 4.4万人学习

C# 教程
C# 教程

共94课时 | 11.2万人学习

Java 教程
Java 教程

共578课时 | 81.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号