0

0

java中RabbitMQ高级应用方法

王林

王林

发布时间:2023-04-30 10:40:06

|

958人浏览过

|

来源于亿速云

转载

1、消息可靠性投递

 在使用 rabbitmq 的时候,生产者在进行消息投递的时候如果想知道消息是否成功的投递到对应的交换机和队列中,有两种方式可以用来控制消息投递的可靠性模式 。

java中RabbitMQ高级应用方法

 由上图的整个消息的投递过程来看,生产者的消息进入到中间件中会首先到达交换机,然后再从交换机传递到队列中去,也就是分为两步走战略。那么消息的丢失情况也就是会出现在这两个阶段中,RabbitMQ 贴心的为我们提供了针对于这两个部分的可靠新传递模式:

 利用这两个回调模式来确保消息的传递可靠。

 1.1、确认模式

 消息从生产者到交换机之间传递会返回一个 confirmCallback 的回调。可以直接在 rabbitTemplate 实例中进行确认逻辑的设置。如果是使用 XML 配置的话需要在工厂配置开启 publisher-confirms="true"YAML 的配置就直接 publisher-confirm-type: correlated,他默认是 NONE ,需要手动开启。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void producer() throws InterruptedException {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println();
                if (!b) {
                    //	消息重发之类的处理
                    System.out.println(s);
                } else {
                    System.out.println("交换机成功接收消息");
                }
            }
        });
        rabbitTemplate.convertAndSend("default_exchange", "default_queue",
                "hello world & beordie");
        TimeUnit.SECONDS.sleep(5);
    }
}

 上面的确认是由一个 confirm 的函数执行的,里面携带了三个参数,第一个是配置的相关信息,第二个表示交换机是否成功的接收到消息,第三个参数是指没有成功接收消息的原因。

 1.2、退回模式

 从交换机到消息队列投递失败会返回一个 returnCallback 。在工厂配置中开启回退模式 publisher-returns="true" ,设置交换机处理消息失败的模式(默认 false 直接将消息进行丢弃),添加退回处理的逻辑。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void producer() throws InterruptedException {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //  重发逻辑处理
                System.out.println(message.getBody() + " 投递消息队列失败");
            }
        });
        rabbitTemplate.convertAndSend("default_exchange", "default_queue",
                "hello world & beordie");
        TimeUnit.SECONDS.sleep(5);
    }
}

returnedMessage 中携带五个参数、分别指的是消息对象、错误码、错误信息、交换机、路由键。

 1.3、确认机制

 在消费者抓取消息队列中的数据取消费之后会有一个确认机制进行消息的确认,防止因为抓取消息之后但没有消费成功而导致的消息丢失。有三种确认方式:

  • 自动确认acknowledge="none"

  • 手动确认acknowledge="manual"

  • 根据异常情况确认acknowledge="auto"

 其中自动确认是指一旦消息被消费者抓取就自动默认成功,并将消息从消息队列中进行移除,如果这个时候消费端消费出现问题,那么也会是默认消息消费成功,但是实际上是没有消费成功的,也就是当前的消息丢失了。默认的情况就是自动确认机制。

 如果设置手动确认的方式,就需要在正常消费消息之后进行回调确认 channel.basicAck(),手动签收。如果业务处理过程中发生了异常则调用 channel.basicNack() 重新发送消息。

 首先需要在队列绑定时进行确认机制的配置,设置为手动签收。



    

 生产者一端不用更改,只需要改变消费者的实现进行消息自动签收就可以了,正常执行业务则签收消息,业务发生错误则选择消息拒签,消息重发或者丢弃。

public class ConsumerAck implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //  消息唯一ID
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            String msg = new String(message.getBody(), "utf-8");
            channel.basicAck(tag, true);
            System.out.println("接收消息: " + msg);
        } catch (Exception e) {
            System.out.println("接收消息异常");
            channel.basicNack(tag, true, true);
            e.printStackTrace();
        }
    }
}

 里面涉及三个简单的签收函数,一是正确签收的 basicAck ,二是单条拒签的 basicReject ,三是批量拒签的 basicNack

  • basicAck 第一个参数表示消息在通道中的唯一ID,只针对当前的 Channel;第二个参数表示是否批量同意,如果是 false 的话只会同意签收当前ID的一条消息,将其从消息队列中进行删除,而如果是 true 的话将会把此ID之前的消息一起给同意签收了。

  • basicReject 第一个参数依旧表示消息的唯一ID,第二个参数表示是否重新回队发送,false 表示直接丢弃该条消息或者有死信队列可以接收, true 则表示重新回队进行消息发送,所有操作只针对当前的消息。

  • basicNack 比第二个多了一个参数,也就是处于中间位置的布尔值,表示是否批量进行。

    亿众购物系统
    亿众购物系统

    一套设计完善、高效的web商城解决方案,独有SQL注入防范、对非法操作者锁定IP及记录功能,完整详细的记录了非法操作情况,管理员可以随时查看网站安全日志以及解除系统自动锁定的IP等前台简介:  1)系统为会员制购物,无限会员级别。  2)会员自动升级、相应级别所享有的折扣不同。  3)产品可在缺货时自动隐藏。  4)自动统计所有分类中商品数量,并在商品分类后面显示。  5)邮件列表功能,可在线订阅

    下载

2、消费端限流

 在用户请求和DB服务处理之间增加消息中间件的隔离,使得突发流量全部让消息队列来抗,降低服务端被冲垮的可能性。让所有的请求都往队列中存,消费端只需要匀速的取出消息进行消费,这样就能保证运行效率,也不会因为后台的阻塞而导致客户端得不到正常的响应(当然指的是一些不需要同步回显的任务)。

java中RabbitMQ高级应用方法

 只需要在消费者绑定消息队列时指定取出消息的速率即可,需要使用手动签收的方式,每进行一次的签收才会从队列中再取出下一条数据。



    

3、消息过期时间

 消息队列提供了存储在队列中消息的过期时间,分为两个方向的实现,一个是针对于整个队列中的所有消息,也就是队列的过期时间,另一个是针对当前消息的过期时间,也就是针对于单条消息单独设置。

 队列的过期时间设置很简单,只需要在创建队列时进行过期时间的指定即可,也可以通过控制台直接创建指定过期时间。一旦队列过期时间到了,队列中还未被消费的消息都将过期,进行队列的过期处理。


    
        
    

 单条消息的过期时间需要在发送的时候进行单独的指定,发送的时候指定配置的额外信息,配置的编写由配置类完成。

 如果一条消息的过期时间到了,但是他此时处于队列的中间,那么他将不会被处理,只有当之后处理到时候才会进行判断是否过期。

MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws
        AmqpException {
        //	设置 message 的过期时间
        message.getMessageProperties().setExpiration("5000");
        //	返回该消息
        return message;
    }
};
rabbitTemplate.convertAndSend("exchange", "route", "msg", messagePostProcessor);

 如果说同时设置了消息的过期时间和队列的过期时间,那么最终的过期时间由最短的时间进行决定,也就是说如果当前消息的过期时间没到,但是整个队列的过期时间到了,那么队列中的所有消息也自然就过期了,执行过期的处理策略。

4、死信队列

 4.1、死信概念

死信队列指的是死信交换机,当一条消息成为死信之后可以重新发送到另一个交换机进行处理,而进行处理的这个交换机就叫做死信交换机。

java中RabbitMQ高级应用方法

  • 消息成为死信消息有几种情况

    队列的消息长度达到限制

    消费者拒接消息的时候不把消息重新放入队列中

    队列存在消息过期设置,消息超时未被消费

    消息存在过期时间,在投递给消费者时发现过期

 在创建队列时可以在配置中指定相关的信息,例如死信交换机、队列长度等等,之后的一系列工作就不由程序员进行操作了,MQ 会自己完成配置过的事件响应。


    
        
        
        
        
        
        
        
        
    

 4.2、延迟队列

 延迟队列指的是消息在进入队列后不会立即被消费,只有到达指定时间之后才会被消费,也就是需要有一个时间的判断条件。

 消息队列实际上是没有提供对延迟队列的实现的,但是可以通过 TTL + 死信队列 的方式完成,设置一个队列,不被任何的消费者所消费,所有的消息进入都会被保存在里面,设置队列的过期时间,一旦队列过期将所有的消息过渡到绑定的死信队列中。

 再由具体的消费者来消费死信队列中的消息,这样就实现了延迟队列的功能。

 例如实现一个下单超时支付取消订单的功能:

java中RabbitMQ高级应用方法

相关文章

java速学教程(入门到精通)
java速学教程(入门到精通)

java怎么学习?java怎么入门?java在哪学?java怎么学才快?不用担心,这里为大家提供了java速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

202

2024.02.23

Java 消息队列与异步架构实战
Java 消息队列与异步架构实战

本专题系统讲解 Java 在消息队列与异步系统架构中的核心应用,涵盖消息队列基本原理、Kafka 与 RabbitMQ 的使用场景对比、生产者与消费者模型、消息可靠性与顺序性保障、重复消费与幂等处理,以及在高并发系统中的异步解耦设计。通过实战案例,帮助学习者掌握 使用 Java 构建高吞吐、高可靠异步消息系统的完整思路。

11

2026.01.28

什么是中间件
什么是中间件

中间件是一种软件组件,充当不兼容组件之间的桥梁,提供额外服务,例如集成异构系统、提供常用服务、提高应用程序性能,以及简化应用程序开发。想了解更多中间件的相关内容,可以阅读本专题下面的文章。

178

2024.05.11

Golang 中间件开发与微服务架构
Golang 中间件开发与微服务架构

本专题系统讲解 Golang 在微服务架构中的中间件开发,包括日志处理、限流与熔断、认证与授权、服务监控、API 网关设计等常见中间件功能的实现。通过实战项目,帮助开发者理解如何使用 Go 编写高效、可扩展的中间件组件,并在微服务环境中进行灵活部署与管理。

217

2025.12.18

pdf怎么转换成xml格式
pdf怎么转换成xml格式

将 pdf 转换为 xml 的方法:1. 使用在线转换器;2. 使用桌面软件(如 adobe acrobat、itext);3. 使用命令行工具(如 pdftoxml)。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

1903

2024.04.01

xml怎么变成word
xml怎么变成word

步骤:1. 导入 xml 文件;2. 选择 xml 结构;3. 映射 xml 元素到 word 元素;4. 生成 word 文档。提示:确保 xml 文件结构良好,并预览 word 文档以验证转换是否成功。想了解更多xml的相关内容,可以阅读本专题下面的文章。

2092

2024.08.01

xml是什么格式的文件
xml是什么格式的文件

xml是一种纯文本格式的文件。xml指的是可扩展标记语言,标准通用标记语言的子集,是一种用于标记电子文件使其具有结构性的标记语言。想了解更多相关的内容,可阅读本专题下面的相关文章。

1081

2024.11.28

Golang channel原理
Golang channel原理

本专题整合了Golang channel通信相关介绍,阅读专题下面的文章了解更多详细内容。

248

2025.11.14

C++ 设计模式与软件架构
C++ 设计模式与软件架构

本专题深入讲解 C++ 中的常见设计模式与架构优化,包括单例模式、工厂模式、观察者模式、策略模式、命令模式等,结合实际案例展示如何在 C++ 项目中应用这些模式提升代码可维护性与扩展性。通过案例分析,帮助开发者掌握 如何运用设计模式构建高质量的软件架构,提升系统的灵活性与可扩展性。

14

2026.01.30

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 3万人学习

C# 教程
C# 教程

共94课时 | 8万人学习

Java 教程
Java 教程

共578课时 | 53.5万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号