0

0

Java RabbitMQ消息队列常见问题及解决方案分析

王林

王林

发布时间:2023-04-23 09:49:06

|

2977人浏览过

|

来源于亿速云

转载

消息堆积

消息堆积的产生场景:

  • 生产者产生的消息速度大于消费者消费的速度。解决:增加消费者的数量或速度。

  • 没有消费者进行消费的时候。解决:死信队列、设置消息有效期。相当于对我们的消息设置有效期,在规定的时间内如果没有消费的话,自动过期,过期的时候会执行客户端回调监听的方法将消息存放到数据库表记录,后期实现补偿。

保证消息不丢失

1、生产者使用消息确认机制保证消息百分之百能够将消息投递到MQ成功。

2、MQ服务器端应该将消息持久化到硬盘

立即学习Java免费学习笔记(深入)”;

3、消费者使用手动ack机制确认消息消费成功

如果MQ服务器容量满了怎么办?

使用死信队列将消息存到数据库中去,后期补偿消费。

死信队列

RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。

产生背景:

  • 消息投递到MQ中存放 消息已经过期

  • 队列达到最大的长度 (队列容器已经满了)生产者拒绝接收消息

  • 消费者消费多次消息失败,就会转移存放到死信队列中

代码案例:

maven依赖


        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.apache.commons
            commons-lang3
        
        
        
            com.alibaba
            fastjson
            1.2.49
        
    

yml配置

server:
#  服务启动端口配置
  port: 8081
  servlet:
#    应用访问路径
    context-path: /
spring:
  #增加application.druid.yml 的配置文件
#  profiles:
#    active: rabbitmq
  rabbitmq:
    ####连接地址
    host: www.kaicostudy.com
    ####端口号
    port: 5672
    ####账号
    username: kaico
    ####密码
    password: kaico
    ### 地址
    virtual-host: /kaicoStudy
###模拟演示死信队列
kaico:
  dlx:
    exchange: kaico_order_dlx_exchange
    queue: kaico_order_dlx_queue
    routingKey: kaico.order.dlx
  ###备胎交换机
  order:
    exchange: kaico_order_exchange
    queue: kaico_order_queue
    routingKey: kaico.order

队列配置类

@Configuration
public class DeadLetterMQConfig {
    /**
     * 订单交换机
     */
    @Value("${kaico.order.exchange}")
    private String orderExchange;
    /**
     * 订单队列
     */
    @Value("${kaico.order.queue}")
    private String orderQueue;
    /**
     * 订单路由key
     */
    @Value("${kaico.order.routingKey}")
    private String orderRoutingKey;
    /**
     * 死信交换机
     */
    @Value("${kaico.dlx.exchange}")
    private String dlxExchange;
    /**
     * 死信队列
     */
    @Value("${kaico.dlx.queue}")
    private String dlxQueue;
    /**
     * 死信路由
     */
    @Value("${kaico.dlx.routingKey}")
    private String dlxRoutingKey;
    /**
     * 声明死信交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }
    /**
     * 声明死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }
    /**
     * 声明订单业务交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(orderExchange);
    }
    /**
     * 绑定死信队列到死信交换机
     *
     * @return Binding
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with(dlxRoutingKey);
    }
    /**
     * 声明订单队列,并且绑定死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        // 订单队列绑定我们的死信交换机
        Map arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange", dlxExchange);
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(orderQueue, true, false, false, arguments);
    }
    /**
     * 绑定订单队列到订单交换机
     *
     * @return Binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(orderRoutingKey);
    }
}

死信队列消费者

@Component
public class OrderDlxConsumer {
    /**
     * 死信队列监听队列回调的方法
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_dlx_queue")
    public void orderDlxConsumer(String msg) {
        System.out.println("死信队列消费订单消息" + msg);
    }
}

普通队列消费者

@Component
public class OrderConsumer {
    /**
     * 监听队列回调的方法
     *
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_queue")
    public void orderConsumer(String msg) {
        System.out.println("正常订单消费者消息msg:" + msg);
    }
}

后台队列管理页面如下:

Java RabbitMQ消息队列常见问题实例分析

部署方式:死信队列不能够和正常队列存在同一个服务器中,应该分服务器存放。

延迟队列

订单30分钟未支付,系统自动超时关闭的实现方案。

基于任务调度实现,效率是非常低。

基于redis过期key实现,key失效时会回调客户端一个方法。

用户下单的时候,生成一个令牌(有效期)30分钟,存放到我们redis;缺点:非常冗余,会在表中存放一个冗余字段。

基于mq的延迟队列(最佳方案)rabbitmq情况下。

原理:在我们下单的时候,往mq投递一个消息设置有效期为30分钟,但该消息失效的时候(没有被消费的情况下),执行我们客户端一个方法告诉我们该消息已经失效,这时候查询这笔订单是否已经支付。

实现逻辑:

主要使用死信队列来实现。

Java RabbitMQ消息队列常见问题实例分析

想要的代码:就是正常的消费者不消费消息,或者没有正常的消费者,在设置的时间后进入死信队列中,然后死信消费者实现相应的业务逻辑。

RabbitMQ消息幂等问题

RabbitMQ消息自动重试机制

当消费者业务逻辑代码中,抛出异常自动实现重试 (默认是无数次重试)

Magician
Magician

Figma插件,AI生成图标、图片和UX文案

下载

应该对RabbitMQ重试次数实现限制,比如最多重试5次,每次间隔3s;重试多次还是失败的情况下,存放到死信队列或者存放到数据库表中记录后期人工补偿。因为重试失败次数之后,队列会自动删除这个消息。

消息重试原理: 在重试的过程中,使用aop拦截我们的消费监听方法,也不会打印这个错误日志。如果重试多次还是失败,达到最大失败次数的时候才会打印错误日志。

如果消费多次还是失败的情况下:

1、自动删除该消息;(消息可能丢失)

解决办法:

如果充实多次还是失败的情况下,最终存放到死信队列;

采用表日志记,消费失败错误日志的日志记录,后期人工自动对该消息实现补偿。

合理的选择重试机制

消费者获取消息后,调用第三方接口(HTTP请求),但是调用第三方接口失败呢?是否需要重试 ?

答:有时是因为网络异常调用失败,应该需要重试几次。

消费者获取消息后,应该代码问题抛出数据异常,是否需要重试?

答:不需要重试,代码异常需要重新修改代码发布项目。

消费者开启手动ack模式

第一步、springboot项目配置需要开启ack模式

acknowledge-mode: manual

第二步、消费者Java代码

int result = orderMapper.addOrder(orderEntity);
if (result >= 0) {
    // 开启消息确认机制
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

rabbitMQ如何解决消息幂等问题

什么是消息幂等性?MQ消费者如何保证幂等性?

产生的原因:就是因为消费者可能会开启自动重试,重试过程中可能会导致消费者业务逻辑代码重复执行。此刻消息已经消费了,因为业务报错导致消息重新消费,这时会出现

解决方案:采用消息全局id根据业务来定,根据业务id(全局唯一id)消费者可以判断这条消息已经消费了。

消费者代码逻辑:

Java RabbitMQ消息队列常见问题实例分析

RabbitMQ解决分布式事务问题

分布式事务:在分布式系统中,因为跨服务调用接口,存在多个不同的事务,每个事务都互不影响。就存在分布式事务的问题。

解决分布式事务核心思想:数据最终一致性。

分布式领域中名词:

强一致性 :要么同步速度非常快或者采用锁的机制 不允许出现脏读;

强一致性解决方案:要么数据库A非常迅速的将数据同步给数据B,或者数据库A没有同步完成之前数据库B不能够读取数据。

弱一致性: 允许读取的数据为原来的脏数据,允许读取的结果不一致性。

最终一致性: 在我们的分布式系统中,因为数据之间同步通过网络实现通讯,短暂的数据延迟是允许的,但是最终数据必须要一致性。

基于RabbitMQ解决分布式事务的思路

基于RabbitMQ解决分布式事务的思路:(采用最终一致性的方案)

  • 确认我们的生产者消息一定要投递到MQ中(消息确认机制)投递失败 就继续重试

  • 消费者采用手动ack的形式确认消息实现消费 注意幂等性问题,消费失败的情况下,mq自动帮消费者重试。

  • 保证我们的生产者第一事务先执行,如果执行失败采用补单队列(给生产者自己事务补充,确保生产者第一事务执行完成【数据最终一致性】)。

解决思路图:核心是利用mq发送消息给其他系统将数据修改回来。

Java RabbitMQ消息队列常见问题实例分析

相关文章

java速学教程(入门到精通)
java速学教程(入门到精通)

java怎么学习?java怎么入门?java在哪学?java怎么学才快?不用担心,这里为大家提供了java速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

202

2024.02.23

Java 消息队列与异步架构实战
Java 消息队列与异步架构实战

本专题系统讲解 Java 在消息队列与异步系统架构中的核心应用,涵盖消息队列基本原理、Kafka 与 RabbitMQ 的使用场景对比、生产者与消费者模型、消息可靠性与顺序性保障、重复消费与幂等处理,以及在高并发系统中的异步解耦设计。通过实战案例,帮助学习者掌握 使用 Java 构建高吞吐、高可靠异步消息系统的完整思路。

11

2026.01.28

什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

329

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

235

2023.10.07

什么是中间件
什么是中间件

中间件是一种软件组件,充当不兼容组件之间的桥梁,提供额外服务,例如集成异构系统、提供常用服务、提高应用程序性能,以及简化应用程序开发。想了解更多中间件的相关内容,可以阅读本专题下面的文章。

178

2024.05.11

Golang 中间件开发与微服务架构
Golang 中间件开发与微服务架构

本专题系统讲解 Golang 在微服务架构中的中间件开发,包括日志处理、限流与熔断、认证与授权、服务监控、API 网关设计等常见中间件功能的实现。通过实战项目,帮助开发者理解如何使用 Go 编写高效、可扩展的中间件组件,并在微服务环境中进行灵活部署与管理。

215

2025.12.18

Java Maven专题
Java Maven专题

本专题聚焦 Java 主流构建工具 Maven 的学习与应用,系统讲解项目结构、依赖管理、插件使用、生命周期与多模块项目配置。通过企业管理系统、Web 应用与微服务项目实战,帮助学员全面掌握 Maven 在 Java 项目构建与团队协作中的核心技能。

0

2025.09.15

硬盘接口类型介绍
硬盘接口类型介绍

硬盘接口类型有IDE、SATA、SCSI、Fibre Channel、USB、eSATA、mSATA、PCIe等等。详细介绍:1、IDE接口是一种并行接口,主要用于连接硬盘和光驱等设备,它主要有两种类型:ATA和ATAPI,IDE接口已经逐渐被SATA接口;2、SATA接口是一种串行接口,相较于IDE接口,它具有更高的传输速度、更低的功耗和更小的体积;3、SCSI接口等等。

1132

2023.10.19

java入门学习合集
java入门学习合集

本专题整合了java入门学习指南、初学者项目实战、入门到精通等等内容,阅读专题下面的文章了解更多详细学习方法。

1

2026.01.29

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 3万人学习

C# 教程
C# 教程

共94课时 | 7.9万人学习

Java 教程
Java 教程

共578课时 | 52.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号