0

0

Spring Boot整合RabbitMQ延迟队列教程

看不見的法師

看不見的法師

发布时间:2025-07-11 18:48:03

|

983人浏览过

|

来源于php中文网

原创

spring boot整合rabbitmq延迟队列主要有两种方式。1. 基于ttl和dlx的实现:通过设置消息的存活时间和死信交换机,使消息过期后被转发到延迟处理队列;2. 使用rabbitmq延迟消息插件:通过安装rabbitmq_delayed_message_exchange插件,声明x-delayed-message类型的交换机并发送时设置延迟时间。延迟队列适用于订单超时、定时任务、重试机制、延时通知等场景,能有效解耦业务流程,提升异步处理能力。选择方案时需考虑插件部署条件、消息顺序要求及配置复杂度,推荐在可控环境中使用插件方式。生产环境中需关注消息堆积、幂等性、可靠性及延迟时间管理,应通过合理评估延迟时间、消费者扩容、持久化、监控告警、幂等设计、确认机制和分桶策略进行优化,其中幂等性处理尤为关键。

Spring Boot整合RabbitMQ延迟队列教程

Spring Boot整合RabbitMQ延迟队列,核心在于实现消息在指定时间后才被消费者处理的机制,这对于订单超时、定时任务、延时通知等场景至关重要。它能有效解耦业务流程,提升系统异步处理能力。

Spring Boot整合RabbitMQ延迟队列教程

Spring Boot整合RabbitMQ延迟队列,通常有两种主流实现方式,各有优劣,我个人在实际项目中都用过,体验确实不同。

解决方案

Spring Boot整合RabbitMQ延迟队列教程

1. 基于 TTL (Time-To-Live) 和 DLX (Dead Letter Exchange) 的实现

这是RabbitMQ原生支持的一种方案,不需要额外插件,通用性很强。它的基本思路是:消息在普通队列中设置一个存活时间(TTL),当消息过期后,如果该队列配置了死信交换机(DLX),消息就会被“死信”到DLX,再由DLX路由到一个专门的延迟处理队列。

Spring Boot整合RabbitMQ延迟队列教程

配置核心组件:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMqDelayConfig {

    // 普通业务交换机
    public static final String DELAY_EXCHANGE_NAME = "delay.business.exchange";
    // 普通业务队列
    public static final String DELAY_QUEUE_NAME = "delay.business.queue";
    // 路由键
    public static final String DELAY_ROUTING_KEY = "delay.business.routingkey";

    // 死信交换机
    public static final String DEAD_LETTER_EXCHANGE_NAME = "delay.dead.letter.exchange";
    // 死信队列 (即真正的延迟消费队列)
    public static final String DEAD_LETTER_QUEUE_NAME = "delay.dead.letter.queue";
    // 死信路由键
    public static final String DEAD_LETTER_ROUTING_KEY = "delay.dead.letter.routingkey";

    /**
     * 声明普通业务交换机
     */
    @Bean
    public DirectExchange delayExchange() {
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }

    /**
     * 声明死信交换机
     */
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
    }

    /**
     * 声明普通业务队列
     * 设置死信交换机和死信路由键
     * 设置消息过期时间 (这里不设置,由发送者动态设置)
     */
    @Bean
    public Queue delayQueue() {
        Map args = new HashMap<>();
        // 绑定死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
        // 绑定死信路由键
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
        // 设置队列的最大长度等,这里先不加
        // args.put("x-max-length", 10000);
        return new Queue(DELAY_QUEUE_NAME, true, false, false, args);
    }

    /**
     * 声明死信队列 (即延迟消息最终到达的队列)
     */
    @Bean
    public Queue deadLetterQueue() {
        return new Queue(DEAD_LETTER_QUEUE_NAME, true);
    }

    /**
     * 普通业务队列与业务交换机绑定
     */
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY);
    }

    /**
     * 死信队列与死信交换机绑定
     */
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY);
    }
}

生产者发送延迟消息:

import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class DelayMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayMessage(String message, long delayTime) {
        // messagePostProcessor用于设置消息属性,比如TTL
        MessagePostProcessor messagePostProcessor = msg -> {
            msg.getMessageProperties().setExpiration(String.valueOf(delayTime)); // 设置消息过期时间,单位毫秒
            return msg;
        };
        rabbitTemplate.convertAndSend(RabbitMqDelayConfig.DELAY_EXCHANGE_NAME,
                                      RabbitMqDelayConfig.DELAY_ROUTING_KEY,
                                      message,
                                      messagePostProcessor);
        System.out.println("发送延迟消息: " + message + ", 延迟时间: " + delayTime + "ms");
    }
}

消费者监听延迟消息:

Tellers AI
Tellers AI

Tellers是一款自动视频编辑工具,可以将文本、文章或故事转换为视频。

下载
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DelayMessageConsumer {

    @RabbitListener(queues = RabbitMqDelayConfig.DEAD_LETTER_QUEUE_NAME)
    public void receiveDelayMessage(String message) {
        System.out.println("收到延迟消息: " + message + ", 实际接收时间: " + System.currentTimeMillis());
        // 处理业务逻辑
    }
}

2. 基于 RabbitMQ Delayed Message Exchange Plugin 的实现

这种方式更直接,但需要RabbitMQ服务器安装 rabbitmq_delayed_message_exchange 插件。它引入了一种新的交换机类型 x-delayed-message,可以直接在发送消息时指定延迟时间,而无需经过TTL和DLX的复杂设置。我个人更喜欢这种方式,因为它逻辑更清晰,配置也简单不少。

安装插件 (RabbitMQ服务器):

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

配置核心组件:

import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMqPluginDelayConfig {

    // 延迟交换机名称
    public static final String DELAY_PLUGIN_EXCHANGE_NAME = "delay.plugin.exchange";
    // 延迟队列名称
    public static final String DELAY_PLUGIN_QUEUE_NAME = "delay.plugin.queue";
    // 路由键
    public static final String DELAY_PLUGIN_ROUTING_KEY = "delay.plugin.routingkey";

    /**
     * 声明自定义延迟交换机 (类型为 x-delayed-message)
     */
    @Bean
    public CustomExchange delayPluginExchange() {
        Map args = new HashMap<>();
        args.put("x-delayed-type", "direct"); // 延迟类型,可以是 direct, topic, fanout
        return new CustomExchange(DELAY_PLUGIN_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    /**
     * 声明延迟队列
     */
    @Bean
    public Queue delayPluginQueue() {
        return new Queue(DELAY_PLUGIN_QUEUE_NAME, true);
    }

    /**
     * 延迟队列与延迟交换机绑定
     */
    @Bean
    public Binding delayPluginBinding() {
        return BindingBuilder.bind(delayPluginQueue()).to(delayPluginExchange()).with(DELAY_PLUGIN_ROUTING_KEY).noargs();
    }
}

生产者发送延迟消息:

import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class DelayPluginMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayMessage(String message, long delayTime) {
        MessagePostProcessor messagePostProcessor = msg -> {
            msg.getMessageProperties().setHeader("x-delay", delayTime); // 设置延迟时间,单位毫秒
            return msg;
        };
        rabbitTemplate.convertAndSend(RabbitMqPluginDelayConfig.DELAY_PLUGIN_EXCHANGE_NAME,
                                      RabbitMqPluginDelayConfig.DELAY_PLUGIN_ROUTING_KEY,
                                      message,
                                      messagePostProcessor);
        System.out.println("发送插件延迟消息: " + message + ", 延迟时间: " + delayTime + "ms");
    }
}

消费者监听延迟消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DelayPluginMessageConsumer {

    @RabbitListener(queues = RabbitMqPluginDelayConfig.DELAY_PLUGIN_QUEUE_NAME)
    public void receiveDelayMessage(String message) {
        System.out.println("收到插件延迟消息: " + message + ", 实际接收时间: " + System.currentTimeMillis());
        // 处理业务逻辑
    }
}

为什么我们需要延迟队列?它在实际业务中扮演什么角色?

在我看来,延迟队列是构建健壮异步系统的关键一环,它解决了“现在发出,未来执行”的场景痛点。在很多业务场景下,我们不能立即处理某个任务,而是需要等待一段时间。比如,电商平台常见的“订单15分钟未支付自动取消”功能,这就是一个典型的延迟任务。用户下单后,消息进入延迟队列,15分钟后才被消费者取出并检查订单状态,如果未支付就执行取消操作。

除了订单超时,它在实际业务中还有非常多的应用:

  • 定时任务调度: 比如,我需要每天凌晨1点发送一份报表邮件,或者在特定日期给用户发送生日祝福。虽然有Quartz这样的调度框架,但对于微服务架构,基于消息队列的延迟任务能更好地解耦服务。
  • 重试机制: 某些第三方接口调用失败后,我们不想立即重试,而是希望等待几秒或几分钟后再次尝试。将失败消息放入延迟队列,一段时间后再重新投递,能有效避免瞬时故障导致的大面积失败。
  • 延时通知: 用户注册后,我们可能希望30分钟后发送一条“欢迎使用”的短信;或者在商品即将失效前1小时提醒用户。
  • 数据同步: 某些数据变更后,需要延迟一段时间再同步到其他系统,以确保数据的一致性或避免高并发下的瞬时压力。

这些场景都要求消息不是即时消费,而是“按时消费”。没有延迟队列,我们可能需要轮询数据库、使用定时器或者引入复杂的调度系统,这些方案要么效率低下,要么耦合度高,而延迟队列则优雅地解决了这个问题。

TTL + DLX 模式与 RabbitMQ 延迟消息插件,我该如何选择?

这两种方案各有千秋,我个人在项目初期,或者当团队对RabbitMQ插件部署有顾虑时,会倾向于TTL + DLX模式。它最大的优点是无需额外插件,这意味着只要你的RabbitMQ服务是标准的,就能直接使用,部署和维护相对简单。然而,它的缺点也比较明显:

  • 配置复杂: 需要额外配置死信交换机、死信队列,以及将普通队列与死信机制关联起来,逻辑上多了一层“弯弯绕”。
  • 消息顺序问题: 在同一个队列中,如果先发送了一个TTL很长的消息,再发送一个TTL很短的消息,短TTL的消息可能会被长TTL的消息“堵住”,因为它必须等到长TTL消息过期后才能成为队头消息被死信。这可能导致消息实际延迟时间比预期的长,甚至出现乱序。虽然可以通过为每个延迟时间段创建独立的队列来缓解,但这又增加了配置的复杂性。

相比之下,RabbitMQ延迟消息插件 (x-delayed-message) 则显得简洁直观得多。它的优势在于:

  • 配置简单: 只需要声明一个 x-delayed-message 类型的交换机,发送消息时在消息头直接设置 x-delay 属性即可。
  • 消息顺序保持: 插件会根据 x-delay 属性在内部管理消息的延迟,消息到达队列的顺序与发送顺序基本保持一致(在相同延迟时间下),不会出现TTL+DLX那种“长消息堵塞短消息”的问题。
  • 灵活性高: 可以在发送消息时动态指定任意延迟时间,而不需要预先定义不同TTL的队列。

那么,我该如何选择呢?

  • 如果你对RabbitMQ服务器的插件安装有完全的控制权,并且希望简化代码逻辑、提高开发效率,同时对消息的精确延迟和顺序有较高要求,那么我强烈推荐使用 rabbitmq_delayed_message_exchange 插件。 这是我个人在大部分新项目中更倾向的选择。
  • 如果你的RabbitMQ环境是共享的,或者由于安全、运维等原因无法安装第三方插件,或者你的业务对消息的精确顺序要求不高(比如每个延迟任务都是独立的),那么TTL + DLX模式是一个可靠的备选方案。 尽管它配置略显繁琐,但其原生特性保证了广泛的兼容性。

最终的选择,往往是技术可行性、运维便利性和业务需求之间的一个权衡。

延迟队列在生产环境中可能遇到哪些挑战和优化策略?

在生产环境中,延迟队列的应用并非一帆风顺,我曾遇到过一些棘手的问题,这让我深思如何更好地应对挑战。

1. 消息堆积与性能瓶颈:

  • 挑战: 当业务量激增,或者消费者处理能力跟不上时,延迟队列中可能出现大量消息堆积。对于TTL+DLX模式,这可能导致死信队列也堆积,进而影响整体性能;对于插件模式,虽然内部处理更高效,但如果延迟时间设置过长且消息量巨大,依然会占用大量内存和CPU资源。
  • 优化策略:
    • 合理评估延迟时间: 避免设置不必要的超长延迟,如果一个任务需要非常长的延迟(比如几天),考虑使用更专业的调度服务而不是纯粹的MQ延迟队列。
    • 消费者扩容与限流: 确保消费者具备足够的处理能力,可以通过增加消费者实例、优化消费者业务逻辑来提升吞吐量。同时,在极端情况下,可以考虑在生产者侧进行限流,避免过度发送消息导致队列崩溃。
    • 队列持久化: 确保队列和消息都设置为持久化,防止RabbitMQ服务重启导致消息丢失。
    • 监控与告警: 实时监控队列的消息数量、消费者积压情况,一旦达到阈值立即告警,及时介入处理。

2. 消息的幂等性与重复消费:

  • 挑战: 无论是网络抖动、消费者重启,还是RabbitMQ的重试机制,都可能导致消息被重复投递到消费者。对于延迟队列,如果消费者没有做好幂等性处理,重复消费可能引发业务错误(例如订单重复取消、积分重复发放)。
  • 优化策略:
    • 业务层幂等性设计: 这是最核心的策略。为每个延迟任务设计一个唯一的业务ID(例如订单ID、任务ID),在消费者处理时,先检查该ID是否已被处理过。常用的方法有:
      • 数据库唯一索引:将业务ID作为唯一键存入数据库,插入失败则说明已处理。
      • 分布式锁:在处理前获取基于业务ID的分布式锁。
      • 状态机流转:确保只有特定状态下的任务才能被处理。
    • 手动确认机制: 消费者处理完消息后,务必手动发送ACK确认,确保RabbitMQ知道消息已成功处理。

3. 消息丢失与可靠性:

  • 挑战: 在极端情况下,如RabbitMQ服务器宕机、网络故障、或者生产者/消费者配置不当,消息可能在发送、存储或消费过程中丢失。
  • 优化策略:
    • 生产者确认机制 (Publisher Confirms): 确保消息从生产者成功到达RabbitMQ服务器。开启 publisher-confirms,生产者会收到RabbitMQ的ACK或NACK,如果收到NACK或超时未收到,则进行重试或告警。
    • 消费者确认机制 (Consumer Acknowledgements): 消费者处理完消息后,必须显式发送ACK。如果消费者在处理过程中崩溃,未发送ACK的消息会被重新投递。
    • 持久化队列与消息: 如前所述,队列和消息都应设置为持久化,即使RabbitMQ重启也能恢复。
    • 死信队列的二次利用: 对于消费失败的消息,可以将其重新路由到一个专门的死信队列(不同于延迟队列的死信队列),用于人工介入、日志分析或后续的补偿机制。

4. 复杂延迟时间管理 (针对TTL+DLX模式):

  • 挑战: 如果需要多种不同的延迟时间,且时间跨度很大,TTL+DLX模式可能需要创建大量的队列,每个队列对应一个特定的TTL值,这会增加配置和管理的复杂性。
  • 优化策略:
    • 插件模式: 如果条件允许,直接切换到 x-delayed-message 插件,可以动态设置任意延迟时间。
    • 时间段分桶: 如果必须使用TTL+DLX,可以根据延迟时间将消息分到不同的“桶”队列,例如“1分钟延迟队列”、“5分钟延迟队列”、“1小时延迟队列”,减少队列数量。

在我看来,最容易被忽视的环节是消息的幂等性处理。很多开发者在初期只关注了消息的发送和接收,却忽略了“万一消息重复了怎么办”的问题。在生产环境,任何“万一”都有可能发生,所以提前做好幂等性设计,是保证业务正确性的基石。

相关专题

更多
spring框架介绍
spring框架介绍

本专题整合了spring框架相关内容,想了解更多详细内容,请阅读专题下面的文章。

104

2025.08.06

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

201

2024.02.23

spring boot框架优点
spring boot框架优点

spring boot框架的优点有简化配置、快速开发、内嵌服务器、微服务支持、自动化测试和生态系统支持。本专题为大家提供spring boot相关的文章、下载、课程内容,供大家免费下载体验。

135

2023.09.05

spring框架有哪些
spring框架有哪些

spring框架有Spring Core、Spring MVC、Spring Data、Spring Security、Spring AOP和Spring Boot。详细介绍:1、Spring Core,通过将对象的创建和依赖关系的管理交给容器来实现,从而降低了组件之间的耦合度;2、Spring MVC,提供基于模型-视图-控制器的架构,用于开发灵活和可扩展的Web应用程序等。

389

2023.10.12

Java Spring Boot开发
Java Spring Boot开发

本专题围绕 Java 主流开发框架 Spring Boot 展开,系统讲解依赖注入、配置管理、数据访问、RESTful API、微服务架构与安全认证等核心知识,并通过电商平台、博客系统与企业管理系统等项目实战,帮助学员掌握使用 Spring Boot 快速开发高效、稳定的企业级应用。

68

2025.08.19

Java Spring Boot 4更新教程_Java Spring Boot 4有哪些新特性
Java Spring Boot 4更新教程_Java Spring Boot 4有哪些新特性

Spring Boot 是一个基于 Spring 框架的 Java 开发框架,它通过 约定优于配置的原则,大幅简化了 Spring 应用的初始搭建、配置和开发过程,让开发者可以快速构建独立的、生产级别的 Spring 应用,无需繁琐的样板配置,通常集成嵌入式服务器(如 Tomcat),提供“开箱即用”的体验,是构建微服务和 Web 应用的流行工具。

33

2025.12.22

Java Spring Boot 微服务实战
Java Spring Boot 微服务实战

本专题深入讲解 Java Spring Boot 在微服务架构中的应用,内容涵盖服务注册与发现、REST API开发、配置中心、负载均衡、熔断与限流、日志与监控。通过实际项目案例(如电商订单系统),帮助开发者掌握 从单体应用迁移到高可用微服务系统的完整流程与实战能力。

114

2025.12.24

什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

325

2023.08.11

Java JVM 原理与性能调优实战
Java JVM 原理与性能调优实战

本专题系统讲解 Java 虚拟机(JVM)的核心工作原理与性能调优方法,包括 JVM 内存结构、对象创建与回收流程、垃圾回收器(Serial、CMS、G1、ZGC)对比分析、常见内存泄漏与性能瓶颈排查,以及 JVM 参数调优与监控工具(jstat、jmap、jvisualvm)的实战使用。通过真实案例,帮助学习者掌握 Java 应用在生产环境中的性能分析与优化能力。

19

2026.01.20

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Django 教程
Django 教程

共28课时 | 3.3万人学习

MySQL 初学入门(mosh老师)
MySQL 初学入门(mosh老师)

共3课时 | 0.3万人学习

微信小程序开发之API篇
微信小程序开发之API篇

共15课时 | 1.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号