0

0

rabbitmq 怎么避免消息丢失?

煙雲

煙雲

发布时间:2025-10-08 11:03:02

|

942人浏览过

|

来源于php中文网

原创

避免消息丢失需从生产者、Broker、消费者三方面入手:生产者启用Publisher Confirm机制并处理回调;Broker端对Exchange、Queue、Message进行持久化,并配置镜像队列;消费者开启手动ACK,确保消息处理成功后确认。同时配置死信队列处理异常消息,结合监控排查问题,保障消息可靠传递。

rabbitmq 怎么避免消息丢失?

rabbitmq 避免消息丢失的核心在于从生产者到 Broker,Broker 自身,以及 Broker 到消费者的整个链路中,都要采取相应的策略来保证消息的可靠传递。这需要生产者确认机制、持久化设置、消费者确认机制以及镜像队列等多种手段的配合。

解决方案

避免 RabbitMQ 消息丢失需要从生产者、Broker 和消费者三个方面入手,构建一个完整的可靠消息传递机制。

生产者端:开启 Publisher Confirm 机制

生产者是消息的起点,确保消息成功发送到 RabbitMQ Broker 至关重要。Publisher Confirm 机制允许 Broker 在收到消息后向生产者发送确认,告知消息已被正确接收。

  • 开启 Publisher Confirm: 在 RabbitTemplate 中设置 publisher-confirm-typecorrelatedsimplecorrelated 模式需要生产者维护一个消息序号与回调函数的映射,simple 模式则只能通过返回值来判断是否成功。推荐使用 correlated 模式,因为它能提供更详细的确认信息。

  • 处理 Confirm 回调: 实现 ConfirmCallback 接口,在回调方法中处理 Broker 返回的确认信息。如果消息被成功接收,则执行相应的业务逻辑;如果消息发送失败,则需要进行重发或者记录日志,方便后续排查问题。

@Component
public class MyConfirmCallback implements CorrelationData.ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息发送成功,correlationData: " + correlationData);
        } else {
            System.out.println("消息发送失败,correlationData: " + correlationData + ", cause: " + cause);
            // TODO: 重发消息或记录日志
        }
    }
}

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private MyConfirmCallback myConfirmCallback;

public void sendMessage(String message) {
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.setConfirmCallback(myConfirmCallback);
    rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData);
}

Broker 端:消息持久化与镜像队列

Broker 作为消息的中转站,需要保证消息在 Broker 内部的可靠存储,以及 Broker 集群间的数据同步。

  • 消息持久化: 将 Exchange、Queue 和 Message 都设置为持久化。Exchange 和 Queue 的持久化确保 Broker 重启后,这些元数据不会丢失。Message 的持久化确保消息在 Broker 内部以持久化的方式存储,即使 Broker 发生故障,消息也不会丢失。
@Bean
public Queue myQueue() {
    return new Queue("myQueue", true); // true 表示持久化
}

@Bean
public DirectExchange myExchange() {
    return new DirectExchange("myExchange", true, false); // true 表示持久化
}

在发送消息时,需要设置 MessagePropertiesdeliveryModePERSISTENT

MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message("Hello, RabbitMQ!".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message);
  • 镜像队列: 使用镜像队列可以实现 Broker 集群中队列数据的同步备份。当主节点发生故障时,镜像节点可以自动接管,保证消息的可靠性。镜像队列可以通过 RabbitMQ 的策略配置来实现。
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

这个命令会将所有队列设置为镜像队列,所有节点都会同步队列的数据。

消费者端:开启 ACK 机制

消费者是消息的终点,需要确保消息被正确消费,并且在消费失败时能够进行重试。

  • 开启 ACK 机制:spring.rabbitmq.listener.acknowledge-mode 设置为 manual。这意味着消费者需要手动发送 ACK 确认消息已被正确消费。

  • 处理 ACK: 在消息处理成功后,调用 channel.basicAck() 方法发送 ACK。如果消息处理失败,可以调用 channel.basicNack()channel.basicReject() 方法拒绝消息,并将消息重新放入队列,以便后续重试。

@RabbitListener(queues = "myQueue")
public void processMessage(Message message, Channel channel) throws IOException {
    try {
        // 处理消息
        System.out.println("Received message: " + new String(message.getBody()));
        // 发送 ACK
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        // 处理失败,重新放入队列
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        // 或者直接丢弃消息
        // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    }
}

消息丢失了,如何排查问题?

音剪
音剪

喜马拉雅旗下的一站式AI音频创作平台,强大的在线剪辑能力,帮你轻松创作优秀的音频作品

下载

排查消息丢失问题需要从生产者、Broker 和消费者三个方面入手,逐一排除可能的原因。

  1. 生产者端:

    • 检查 Publisher Confirm 机制是否正确开启和处理。
    • 检查消息是否成功发送到 Broker。可以通过抓包工具或者 RabbitMQ 的管理界面来确认。
    • 检查消息的 mandatory 参数是否设置为 true。如果设置为 true,当消息无法路由到任何队列时,Broker 会返回一个错误给生产者。
  2. Broker 端:

    • 检查 Exchange 和 Queue 是否设置为持久化。
    • 检查消息是否设置为持久化。
    • 检查镜像队列是否正确配置。
    • 检查 Broker 的日志,查看是否有异常信息。
  3. 消费者端:

    • 检查 ACK 机制是否正确开启和处理。
    • 检查消费者是否成功消费消息。
    • 检查消费者是否在处理消息时发生异常,导致消息丢失。

如何保证消息的顺序性?

在某些场景下,消息的顺序性非常重要。RabbitMQ 默认情况下并不能保证消息的绝对顺序性,但可以通过一些策略来尽量保证消息的顺序性。

  • 单一队列: 将所有需要保证顺序性的消息发送到同一个队列。这样可以保证消息在队列中的顺序与发送的顺序一致。

  • 单一消费者: 使用单一消费者来消费队列中的消息。这样可以避免多个消费者并发消费导致的消息顺序错乱。

  • 消息分片: 将消息按照一定的规则进行分片,然后将每个分片发送到不同的队列。每个队列使用单一消费者来消费,保证每个分片内部的顺序性。然后将所有分片按照顺序进行组装,得到完整的消息。

如何处理死信队列?

死信队列(Dead Letter Queue,DLQ)用于存储无法被正常消费的消息。这些消息通常是因为以下原因被放入死信队列:

  • 消息被拒绝(basic.rejectbasic.nack)且 requeue 参数设置为 false
  • 消息过期。
  • 队列达到最大长度。

通过配置死信队列,可以方便地对无法被正常消费的消息进行处理,例如:

  • 记录日志: 将死信队列中的消息记录到日志中,方便后续排查问题。
  • 人工干预: 将死信队列中的消息发送给人工处理。
  • 重发消息: 将死信队列中的消息重新发送到队列中,进行重试。

配置死信队列需要在创建队列时指定 x-dead-letter-exchangex-dead-letter-routing-key 参数。

Map args = new HashMap<>();
args.put("x-dead-letter-exchange", "deadLetterExchange");
args.put("x-dead-letter-routing-key", "deadLetterRoutingKey");
Queue queue = new Queue("myQueue", true, false, false, args);

然后创建一个 Exchange 和 Queue 用于存储死信消息。

@Bean
public DirectExchange deadLetterExchange() {
    return new DirectExchange("deadLetterExchange");
}

@Bean
public Queue deadLetterQueue() {
    return new Queue("deadLetterQueue");
}

@Bean
public Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("deadLetterRoutingKey");
}

总而言之,避免 RabbitMQ 消息丢失是一个系统工程,需要从生产者、Broker 和消费者三个方面进行综合考虑,采取相应的策略来保证消息的可靠传递。同时,需要建立完善的监控和告警机制,及时发现和处理潜在的问题。

相关专题

更多
spring框架介绍
spring框架介绍

本专题整合了spring框架相关内容,想了解更多详细内容,请阅读专题下面的文章。

103

2025.08.06

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

201

2024.02.23

硬盘接口类型介绍
硬盘接口类型介绍

硬盘接口类型有IDE、SATA、SCSI、Fibre Channel、USB、eSATA、mSATA、PCIe等等。详细介绍:1、IDE接口是一种并行接口,主要用于连接硬盘和光驱等设备,它主要有两种类型:ATA和ATAPI,IDE接口已经逐渐被SATA接口;2、SATA接口是一种串行接口,相较于IDE接口,它具有更高的传输速度、更低的功耗和更小的体积;3、SCSI接口等等。

1024

2023.10.19

PHP接口编写教程
PHP接口编写教程

本专题整合了PHP接口编写教程,阅读专题下面的文章了解更多详细内容。

66

2025.10.17

php8.4实现接口限流的教程
php8.4实现接口限流的教程

PHP8.4本身不内置限流功能,需借助Redis(令牌桶)或Swoole(漏桶)实现;文件锁因I/O瓶颈、无跨机共享、秒级精度等缺陷不适用高并发场景。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

450

2025.12.29

java接口相关教程
java接口相关教程

本专题整合了java接口相关内容,阅读专题下面的文章了解更多详细内容。

4

2026.01.19

Golang channel原理
Golang channel原理

本专题整合了Golang channel通信相关介绍,阅读专题下面的文章了解更多详细内容。

246

2025.11.14

golang channel相关教程
golang channel相关教程

本专题整合了golang处理channel相关教程,阅读专题下面的文章了解更多详细内容。

342

2025.11.17

PS使用蒙版相关教程
PS使用蒙版相关教程

本专题整合了ps使用蒙版相关教程,阅读专题下面的文章了解更多详细内容。

23

2026.01.19

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
PHP自制框架
PHP自制框架

共8课时 | 0.6万人学习

Swoft2.x速学之http api篇课程
Swoft2.x速学之http api篇课程

共16课时 | 0.9万人学习

PHP入门到实战消息队列RabbitMQ
PHP入门到实战消息队列RabbitMQ

共22课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号