0

0

Java多线程并发消息发送与会话管理教程

聖光之護

聖光之護

发布时间:2025-11-11 12:12:08

|

179人浏览过

|

来源于php中文网

原创

java多线程并发消息发送与会话管理教程

本教程深入探讨了在Java多线程环境中,如何使用`wait()`和`notifyAll()`机制实现并发消息发送与会话重连的同步控制。文章分析了共享资源访问中的常见陷阱,特别是`ArrayIndexOutOfBoundsException`的根源,并提供了基于`wait/notifyAll`的正确同步方案。此外,教程还介绍了`volatile`关键字的重要性以及`java.util.concurrent`包中更高级的并发工具,旨在帮助开发者构建健壮、高效的多线程应用。

理解Java中的wait()和notifyAll()机制

在Java中,wait()、notify()和notifyAll()是Object类提供的核心方法,用于线程间的协作。它们必须在synchronized代码块内部调用,并且作用于同一个锁对象。

  • wait(): 使当前线程进入等待状态,并释放它所持有的锁。线程会一直等待,直到被其他线程调用notify()或notifyAll()唤醒,或者等待超时。被唤醒后,线程会尝试重新获取锁,然后从wait()方法返回。
  • notify(): 唤醒在该锁对象上等待的单个线程。如果有多个线程在等待,JVM会选择其中一个进行唤醒,具体是哪一个线程是不确定的。
  • notifyAll(): 唤醒在该锁对象上等待的所有线程。所有被唤醒的线程都会尝试重新获取锁,但只有一个线程能成功获取并继续执行。

这些机制是构建生产者-消费者模型或处理共享资源状态变化时不可或缺的工具。

场景描述:多线程消息发送与会话管理

设想一个场景:我们需要通过SMPP协议发送大量短信。存在多个“发送者”(Sender)线程并发地从一个共享的消息队列中取出消息并发送。同时,有一个独立的“会话管理器”(SessionProducer)线程负责维护SMPP会话的连接状态。当会话断开时,会话管理器需要重新建立连接,在此期间,所有发送者线程必须暂停发送。一旦会话恢复,发送者线程应继续工作。SMPPSession对象是所有线程共享的资源,其isBind()方法指示会话是否有效,reBind()方法用于重新建立会话。

立即学习Java免费学习笔记(深入)”;

原始代码分析与问题识别

给定的原始代码尝试使用wait/notifyAll来协调发送者和会话管理器。然而,它存在一个关键的同步问题,导致了ArrayIndexOutOfBoundsException。

原始代码结构(简化)

// SMPPSession 类 (共享资源)
public class SMPPSession {
    private boolean bind; // 会话绑定状态
    public int sendMessage(String msg) { /* ... */ }
    public void reBind() { /* ... */ this.bind = true; }
    public boolean isBind() { return this.bind; }
}

// Sender 线程
public class Sender extends Thread {
    private SMPPSession smppSession;
    // ... 构造函数
    @Override
    public void run() {
        while (!Client.messages.isEmpty()){ // 问题点1: 在同步块外部检查
            synchronized (Client.messages){ // 锁定共享消息列表
                if (smppSession.isBind()){
                    final String msg = Client.messages.remove(0); // 移除消息
                    // ... 发送消息
                    Client.messages.notifyAll(); // 问题点2: 在此调用notifyAll可能不合适
                } else {
                    try {
                        Client.messages.wait(); // 等待会话恢复
                    } catch (InterruptedException e) { /* ... */ }
                }
            }
        }
    }
}

// SessionProducer 线程
public class SessionProducer extends Thread {
    private SMPPSession smppSession;
    // ... 构造函数
    @Override
    public void run() {
        while (!Client.messages.isEmpty()){ // 问题点3: 生产者不应依赖消息列表是否为空
            synchronized (Client.messages){ // 锁定共享消息列表
                if (!smppSession.isBind()){
                    smppSession.reBind(); // 重新绑定
                    Client.messages.notifyAll(); // 唤醒所有等待线程
                } else{
                    try {
                        Client.messages.wait(); // 等待会话断开
                    } catch (InterruptedException e) { /* ... */ }
                }
            }
        }
    }
}

// Client 类 (主程序)
public class Client {
    public static final List<String> messages = new CopyOnWriteArrayList<>(); // 共享消息列表
    // ... main 方法启动线程
}

导致ArrayIndexOutOfBoundsException的原因

异常的根本原因在于Sender线程在synchronized (Client.messages)块外部执行了while (!Client.messages.isEmpty())检查。

Programming Helper
Programming Helper

AI代码自动生成器,在AI的帮助下更快地编程

下载
  1. 竞态条件: 假设Client.messages列表中还剩一条消息。
  2. 多个发送者线程: 多个Sender线程(例如Sender1, Sender2, Sender3, Sender4)同时检查!Client.messages.isEmpty(),都发现列表不为空,因此都尝试进入synchronized (Client.messages)块。
  3. 顺序执行: 线程会依次获取锁。假设Sender1首先获取锁。
  4. Sender1执行: Sender1进入同步块,检查smppSession.isBind()为真,然后成功执行Client.messages.remove(0),将列表清空。它发送消息后,释放锁。
  5. 其他Sender线程: 紧接着,Sender2获取锁。当它进入同步块时,Client.messages已经为空。然而,它并没有再次检查isEmpty(),而是直接尝试执行Client.messages.remove(0)。由于列表已空,这将抛出ArrayIndexOutOfBoundsException。Sender3和Sender4也会遇到同样的问题。

此外,SessionProducer线程的while (!Client.messages.isEmpty())循环条件也不合理。作为会话管理器,它应该持续运行以监控和维护会话状态,而不是仅仅在消息队列不为空时才工作。

改进的同步方案

为了解决上述问题,我们需要确保所有对共享状态(smppSession.isBind()和Client.messages)的检查和修改都发生在同步块内部,并且使用while循环来包裹wait()调用,以处理虚假唤醒和条件变化。

1. SMPPSession的改进

为了确保bind状态的可见性,应将其声明为volatile。

public class SMPPSession {
    private volatile boolean bind = false; // 初始化为false,并声明为volatile
    private static final Random idGenerator = new Random();

    public int sendMessage(String msg){
       try{
           Thread.sleep(1000L);
           System.out.println("Sending message: " + msg);
           return Math.abs(idGenerator.nextInt());
       } catch (InterruptedException e){
           Thread.currentThread().interrupt(); // 重新设置中断标志
           e.printStackTrace();
       }
        return -1;
    }

    public void reBind(){
        try{
            System.out.println("Rebinding...");
            Thread.sleep(1000L);
            this.bind = true; // 更新绑定状态
            System.out.println("Session established!");
        } catch (InterruptedException e){
            Thread.currentThread().interrupt(); // 重新设置中断标志
            e.printStackTrace();
        }
    }

    public boolean isBind(){
        return this.bind;
    }
}

2. Sender线程的改进

Sender线程应该在获取锁后,在一个while循环中检查两个条件:会话是否绑定,以及消息队列是否为空。只有当这两个条件都满足时,才能尝试发送消息。

public class Sender extends Thread{

    private SMPPSession smppSession;

    public Sender(String name, SMPPSession smppSession){
        this.setName(name);
        this.smppSession = smppSession;
    }

    @Override
    public void run(){
        while (true){ // 持续运行,直到所有消息发送完毕或被中断
            String msgToSend = null;
            synchronized (Client.messages){ // 锁定共享消息列表
                // 条件检查:会话未绑定 或 消息列表为空
                while (!smppSession.isBind() || Client.messages.isEmpty()){
                    // 如果消息列表为空且会话已绑定,说明所有消息已发送,此发送者线程可以退出
                    if (Client.messages.isEmpty() && smppSession.isBind()){
                        System.out.println(getName() + " finished sending all messages. Terminating.");
                        return; // 退出线程
                    }
                    try {
                        System.out.println(getName() + " waiting. Bind status: " + smppSession.isBind() + ", Messages empty: " + Client.messages.isEmpty());
                        Client.messages.wait(); // 等待条件满足
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt(); // 重新设置中断标志
                        System.out.println(getName() + " interrupted while waiting. Terminating.");
                        return; // 退出线程
                    }
                }
                // 条件满足:会话已绑定且有消息可发送
                msgToSend = Client.messages.remove(0); // 在同步块内安全移除消息
                // 注意:这里不需要notifyAll,因为Sender线程移除消息不会改变SessionProducer的等待条件,
                // 也不会让其他Sender线程立即从等待中恢复(除非它们也等待消息出现,但这里它们等待的是isBind和消息非空)
            }

            // 在同步块外部执行耗时的消息发送操作
            final int msgId = smppSession.sendMessage(msgToSend);
            System.out.println(getName() + " sent msg and received msgId: " + msgId);
        }
    }
}

3. SessionProducer线程的改进

SessionProducer的主要职责是确保SMPPSession始终处于绑定状态。它应该持续运行,并在会话未绑定时进行重连。

public class SessionProducer extends Thread{

    private SMPPSession smppSession;

    public SessionProducer(String name, SMPPSession smppSession){
        this.setName(name);
        this.smppSession = smppSession;
    }

    @Override
    public void run(){
        while (true){ // 持续运行,监控并维护会话状态
            synchronized (Client.messages){ // 锁定共享消息列表
                // 条件检查:如果会话已绑定,则等待
                while (smppSession.isBind()){
                    // 如果所有消息都已发送,且会话已绑定,SessionProducer可以等待新的消息出现或等待终止信号
                    // 暂时让它等待,因为它的核心职责是确保会话可用。
                    if (Client.messages.isEmpty()) {
                        System.out.println(getName() + " session is bound and messages are empty. Waiting for potential new messages or session break.");
                    } else {
                        System.out.println(getName() +

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
while的用法
while的用法

while的用法是“while 条件: 代码块”,条件是一个表达式,当条件为真时,执行代码块,然后再次判断条件是否为真,如果为真则继续执行代码块,直到条件为假为止。本专题为大家提供while相关的文章、下载、课程内容,供大家免费下载体验。

107

2023.09.25

c++中volatile关键字的作用
c++中volatile关键字的作用

本专题整合了c++中volatile关键字的相关内容,阅读专题下面的文章了解更多详细内容。

75

2025.10.23

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

765

2023.08.10

Python 多线程与异步编程实战
Python 多线程与异步编程实战

本专题系统讲解 Python 多线程与异步编程的核心概念与实战技巧,包括 threading 模块基础、线程同步机制、GIL 原理、asyncio 异步任务管理、协程与事件循环、任务调度与异常处理。通过实战示例,帮助学习者掌握 如何构建高性能、多任务并发的 Python 应用。

377

2025.12.24

java多线程相关教程合集
java多线程相关教程合集

本专题整合了java多线程相关教程,阅读专题下面的文章了解更多详细内容。

32

2026.01.21

C++多线程相关合集
C++多线程相关合集

本专题整合了C++多线程相关教程,阅读专题下面的的文章了解更多详细内容。

29

2026.01.21

C# 多线程与异步编程
C# 多线程与异步编程

本专题深入讲解 C# 中多线程与异步编程的核心概念与实战技巧,包括线程池管理、Task 类的使用、async/await 异步编程模式、并发控制与线程同步、死锁与竞态条件的解决方案。通过实际项目,帮助开发者掌握 如何在 C# 中构建高并发、低延迟的异步系统,提升应用性能和响应速度。

103

2026.02.06

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

76

2026.03.11

Go高并发任务调度与Goroutine池化实践
Go高并发任务调度与Goroutine池化实践

本专题围绕 Go 语言在高并发任务处理场景中的实践展开,系统讲解 Goroutine 调度模型、Channel 通信机制以及并发控制策略。内容包括任务队列设计、Goroutine 池化管理、资源限制控制以及并发任务的性能优化方法。通过实际案例演示,帮助开发者构建稳定高效的 Go 并发任务处理系统,提高系统在高负载环境下的处理能力与稳定性。

38

2026.03.10

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 4.4万人学习

C# 教程
C# 教程

共94课时 | 11.2万人学习

Java 教程
Java 教程

共578课时 | 81.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号