0

0

Spring Boot整合RocketMQ事务消息教程

看不見的法師

看不見的法師

发布时间:2025-07-03 15:27:02

|

231人浏览过

|

来源于php中文网

原创

spring boot整合rocketmq事务消息的核心在于利用其两阶段提交机制解决分布式系统中的数据一致性问题。1. 引入rocketmq spring boot starter依赖简化配置;2. 在application.yml中配置nameserver地址和生产者组;3. 实现rocketmqlocaltransactionlistener接口,重写executelocaltransaction和checklocaltransaction方法处理本地事务及状态回查;4. 在业务代码中使用rocketmqtemplate发送事务消息。rocketmq通过“半消息”机制确保消息发送与本地事务的原子性:发送半消息后执行本地事务,成功则提交,失败则回滚,若状态未知则由broker定期回查。关键点包括注解@rocketmqtransactionlistener的正确使用、本地事务的完整执行、checklocaltransaction的幂等设计。实际应用中需应对幂等性、事务超时、异常监控和性能开销等问题,合理配置参数并结合日志监控保障最终一致性。

Spring Boot整合RocketMQ事务消息教程

Spring Boot整合RocketMQ事务消息,说白了,就是为了解决分布式系统里数据一致性的那个老大难问题。我们都知道,在微服务架构下,一个操作可能涉及到多个服务和多个数据库,如果其中一个环节出错了,怎么保证整个业务流程的数据状态是正确的、一致的?RocketMQ的事务消息机制,提供了一个两阶段提交的变种方案,让这个事情变得相对可靠。它不是万能药,但确实是处理特定场景下分布式事务的一个非常实用的工具

Spring Boot整合RocketMQ事务消息教程

解决方案

整合Spring Boot和RocketMQ事务消息,核心在于利用RocketMQ提供的两阶段提交能力,确保本地事务和消息发送的原子性。

Spring Boot整合RocketMQ事务消息教程

首先,你需要引入Spring Boot RocketMQ Starter的依赖。这个是基础,省去了很多繁琐的配置。


    org.apache.rocketmq
    rocketmq-spring-boot-starter
    2.2.2 

接着,在你的application.ymlapplication.properties里配置RocketMQ的NameServer地址和一些生产者组信息。

Spring Boot整合RocketMQ事务消息教程
rocketmq:
  name-server: 127.0.0.1:9876 # 你的NameServer地址
  producer:
    group: my_transaction_producer_group # 事务消息专用的生产者组
    send-message-timeout: 3000

然后,关键一步是实现RocketMQLocalTransactionListener接口。这个接口有两个方法,executeLocalTransactioncheckLocalTransaction,它们是事务消息机制的核心。

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@RocketMQTransactionListener(txProducerGroup = "my_transaction_producer_group")
public class OrderTransactionListener implements RocketMQLocalTransactionListener {

    // 假设这是你的本地服务,用来处理业务逻辑和查询状态
    // @Autowired
    // private OrderService orderService;

    /**
     * 执行本地事务
     * 在发送半消息成功后,Broker会回调这个方法
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String messageBody = new String((byte[]) msg.getPayload());
        String transactionId = (String) msg.getHeaders().get("MQ_TRANSACTION_ID"); // 获取事务ID

        try {
            // 1. 解析消息,获取业务参数
            // 2. 执行本地事务,比如:创建订单,扣减库存等
            //    boolean success = orderService.createOrderAndDeductStock(messageBody, transactionId);

            System.out.println("执行本地事务,消息体: " + messageBody + ", 事务ID: " + transactionId);

            // 模拟本地事务执行结果
            boolean success = true; // 假设本地事务成功
            if (success) {
                // 如果本地事务执行成功,返回COMMIT,Broker会投递消息
                return RocketMQLocalTransactionState.COMMIT;
            } else {
                // 如果本地事务执行失败,返回ROLLBACK,Broker会删除半消息
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            // 出现异常,返回UNKNOW,让Broker进行回查
            System.err.println("本地事务执行异常: " + e.getMessage());
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    /**
     * 检查本地事务状态
     * 当Broker没有收到COMMIT/ROLLBACK指令,或者Producer宕机后重启,Broker会回调这个方法
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String messageBody = new String((byte[]) msg.getPayload());
        String transactionId = (String) msg.getHeaders().get("MQ_TRANSACTION_ID");

        // 1. 根据消息的唯一标识(通常是业务ID或事务ID)查询本地事务的真实状态
        //    比如:查询订单是否已创建成功,或者库存是否已扣减
        //    OrderState state = orderService.getOrderState(transactionId);

        System.out.println("检查本地事务状态,消息体: " + messageBody + ", 事务ID: " + transactionId);

        // 模拟根据事务ID查询本地事务状态
        // 假设通过transactionId可以查询到本地事务是否已成功
        boolean transactionCompleted = true; // 假设本地事务已经成功完成

        if (transactionCompleted) {
            // 如果本地事务已成功,返回COMMIT
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            // 如果本地事务未完成或失败,返回ROLLBACK
            // 这里要特别注意,如果业务逻辑是幂等的,即使重复执行checkLocalTransaction也不会有问题
            return RocketMQLocalTransactionState.ROLLBACK;
            // 也可以返回UNKNOWN,让Broker稍后再次回查,但通常建议直接判断最终状态
        }
    }
}

最后,在你的业务代码中,使用RocketMQTemplate发送事务消息。

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class OrderService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void createOrder(String orderId, String userId, double amount) {
        // 构建消息体
        String messageBody = String.format("{\"orderId\":\"%s\", \"userId\":\"%s\", \"amount\":%s}", orderId, userId, amount);
        Message message = MessageBuilder.withPayload(messageBody)
                                                .setHeader("orderId", orderId) // 可以在这里设置业务ID,方便回查
                                                .build();

        // 发送事务消息,指定事务生产者组和目标Topic
        // 第二个参数arg可以传递给executeLocalTransaction方法,用于传递一些额外上下文信息
        rocketMQTemplate.sendMessageInTransaction(
            "my_transaction_producer_group", // 对应监听器上的txProducerGroup
            "order_created_topic",           // 消息的Topic
            message,
            null // 附加参数,这里可以为空,或者传递业务相关数据
        );

        System.out.println("已发送订单创建事务消息: " + orderId);
    }
}

这样一套流程下来,当createOrder方法被调用时:

  1. RocketMQ会先发送一个“半消息”到Broker。
  2. 半消息发送成功后,Broker会回调OrderTransactionListenerexecuteLocalTransaction方法,此时你执行本地的订单创建和库存扣减等业务逻辑。
  3. 根据本地事务的执行结果,返回COMMIT(本地事务成功,消息可投递)、ROLLBACK(本地事务失败,消息删除)或UNKNOWN(状态不明,待回查)。
  4. 如果返回UNKNOWN,或者Producer在返回COMMIT/ROLLBACK之前宕机,Broker会定期调用checkLocalTransaction方法来查询本地事务的最终状态,以决定是提交还是回滚消息。

如何理解RocketMQ事务消息的核心机制?

RocketMQ的事务消息,我个人觉得它最精妙的地方就在于那个“半消息”和“回查”机制。它不像传统的分布式事务协议那么重,但又能在一定程度上保证消息发送和本地事务的原子性。

想象一下这个过程:当你的生产者要发一条事务消息时,它并不是直接把消息发出去让消费者立马就能看到。它首先发的是一个所谓的“半消息”(Half Message)。这个半消息,消费者是看不到的,它躺在Broker那里,处于一种“待定”状态。Broker收到半消息后,会给生产者一个确认,告诉它“我收到了”。

接下来,生产者就会去执行自己的本地事务,比如你创建订单、扣减库存这些数据库操作。这个本地事务的成功与否,是决定半消息命运的关键。

如果本地事务成功了,生产者会通知Broker:“好了,我这边搞定了,那个半消息可以转正了,你把它投递给消费者吧!”Broker收到这个“提交”指令,就会把半消息变成普通消息,消费者就能消费了。

如果本地事务失败了,生产者就会通知Broker:“哎呀,我这边没搞定,那个半消息就别发了,直接删了吧!”Broker收到“回滚”指令,就会把半消息删掉。

Teleporthq
Teleporthq

一体化AI网站生成器,能够快速设计和部署静态网站

下载

但这里有个细节,你得搞清楚:万一生产者在执行完本地事务后,还没来得及告诉Broker是提交还是回滚,它自己就宕机了呢?或者网络突然抖了一下,指令没发出去呢?这时候,Broker会很聪明地启动一个“回查”机制。它会定期地去问生产者:“喂,你那个半消息到底是个什么情况?是提交还是回滚?”此时,生产者(或者说,生产者重启后)就会通过实现checkLocalTransaction方法来回答Broker。在这个方法里,你需要根据消息里带的业务唯一标识(比如订单ID),去查询你的本地数据库,看看对应的业务操作到底成功了没有。如果成功了,就告诉Broker提交;如果失败了,就告诉Broker回滚。

所以,这个checkLocalTransaction方法,在我看来,就是整个RocketMQ事务消息的“灵魂”所在。它解决了生产者在提交或回滚指令发出前宕机的极端情况,确保了最终的一致性。没有它,事务消息的可靠性就会大打折扣。

在Spring Boot中实现事务监听器有哪些关键点?

在Spring Boot里实现RocketMQLocalTransactionListener,确实有几个地方是需要特别注意的,否则很容易踩坑。

首先,@RocketMQTransactionListener这个注解是核心。你必须把它加到你的监听器类上,并且txProducerGroup这个属性一定要和你在RocketMQTemplate里调用sendMessageInTransaction时传入的生产者组名称保持一致。这是RocketMQ用来识别哪个监听器对应哪个事务生产者的关键。如果名字对不上,Broker是无法正确回调你的监听器的。

其次,就是executeLocalTransaction方法。这个方法是你在发送半消息后,立即执行本地业务逻辑的地方。这里面的代码,应该是一个完整的本地事务单元。比如,如果你要创建订单并扣减库存,那这两个操作应该在一个数据库事务里完成。这个方法最终返回的RocketMQLocalTransactionState,直接决定了半消息的命运。

  • 返回COMMIT:意味着你的本地事务成功了,Broker可以放心地把消息投递出去。
  • 返回ROLLBACK:意味着你的本地事务失败了,Broker应该删除半消息,不让它被投递。
  • 返回UNKNOWN:这是个很重要的状态。通常在你无法确定本地事务结果(比如代码抛异常了,或者依赖的服务调用超时了)时返回。返回UNKNOWN会让Broker稍后发起回查,给你一个补救的机会。所以,异常捕获在这里非常重要,不要轻易地把所有异常都直接导致ROLLBACK,有时候UNKNOWN是更好的选择。

再来就是checkLocalTransaction方法。这个方法是幂等性设计和最终一致性的保障。当Broker回查时,它会把之前发送的半消息传给你。在这个方法里,你必须能够根据消息中的业务唯一标识(比如订单号、业务流水号等),去你的本地数据库查询该业务的真实状态。

  • 如果查询到业务已经成功完成,就返回COMMIT
  • 如果查询到业务确实失败了(比如订单创建失败),就返回ROLLBACK
  • 理论上,你也可以在这里返回UNKNOWN,让Broker再次回查。但实际应用中,如果能明确判断出最终状态,直接返回COMMITROLLBACK会更高效,也能避免不必要的多次回查。

一个常见的误区是,有人会把executeLocalTransaction里的业务逻辑写得过于简单,或者没有做好异常处理,导致返回UNKNOWN的场景被忽视。而checkLocalTransaction的实现如果不够健壮,不能准确判断本地事务状态,那么RocketMQ的事务消息机制就形同虚设了,最终还是可能导致数据不一致。确保这两个方法能正确、幂等地反映本地事务的真实状态,是实现事务消息的关键。

RocketMQ事务消息在实际应用中会遇到哪些挑战及应对策略?

RocketMQ事务消息虽然好用,但在实际落地中,我们还是会遇到一些挑战,需要提前考虑并做好应对策略。

首先,幂等性是绕不开的话题。这不仅仅是消费者需要考虑的,在事务消息的checkLocalTransaction回调中,本地事务查询也需要具备幂等性。因为Broker可能会多次回查,或者Producer在发送COMMIT/ROLLBACK指令前多次尝试发送半消息。你的本地业务操作(比如创建订单、扣减库存)必须能够承受重复执行的风险。常见的做法是,利用业务唯一ID(如订单号、业务流水号)在数据库中做唯一约束,或者在更新时加入状态判断,避免重复处理。比如,插入数据前先查询是否存在,或者更新时只更新状态为“待处理”的记录。

其次是事务超时与检查频率。RocketMQ Broker对事务消息有默认的超时时间,超过这个时间如果Producer没有给出明确指令,就会触发回查。同时,回查的频率也是可配置的。在实际业务中,如果你的本地事务执行时间可能比较长,或者依赖的服务响应慢,就可能导致频繁的UNKNOWN状态和回查。你需要根据业务特点合理配置这些超时参数,并且确保你的checkLocalTransaction方法能够快速、准确地返回结果,避免成为性能瓶颈。如果本地事务确实需要长时间才能完成,可能需要考虑更复杂的异步处理或状态机模式,而不是单纯依赖事务消息的短时回查。

再一个挑战是异常处理与监控。在executeLocalTransactioncheckLocalTransaction方法中,任何未捕获的异常都可能导致意外的行为。我们应该尽可能地捕获异常,并根据异常类型返回ROLLBACKUNKNOWN。同时,对事务消息的整个生命周期进行有效的监控非常重要。你需要能够实时知道有多少半消息处于UNKNOWN状态,有多少回查失败,或者有多少事务最终被回滚。通过日志、Metrics和告警系统,及时发现并处理这些异常情况,避免潜在的数据不一致。比如,可以针对checkLocalTransaction中返回UNKNOWN的次数或持续时间设置告警,提示人工介入排查。

最后,性能考量也是一个实际问题。事务消息相比普通消息,增加了两阶段提交的开销,这会带来一定的性能损耗。并不是所有的消息发送都需要强一致性保障。在设计系统时,需要权衡业务对一致性的要求和系统性能的需求。对于那些可以接受最终一致性的场景,使用普通消息结合消费者幂等性设计可能更简单高效。只有那些对数据一致性要求极高、本地事务和消息发送必须原子性的场景,才应该考虑使用事务消息。过度使用事务消息,反而可能成为系统的瓶颈。

相关专题

更多
spring框架介绍
spring框架介绍

本专题整合了spring框架相关内容,想了解更多详细内容,请阅读专题下面的文章。

104

2025.08.06

spring boot框架优点
spring boot框架优点

spring boot框架的优点有简化配置、快速开发、内嵌服务器、微服务支持、自动化测试和生态系统支持。本专题为大家提供spring boot相关的文章、下载、课程内容,供大家免费下载体验。

135

2023.09.05

spring框架有哪些
spring框架有哪些

spring框架有Spring Core、Spring MVC、Spring Data、Spring Security、Spring AOP和Spring Boot。详细介绍:1、Spring Core,通过将对象的创建和依赖关系的管理交给容器来实现,从而降低了组件之间的耦合度;2、Spring MVC,提供基于模型-视图-控制器的架构,用于开发灵活和可扩展的Web应用程序等。

389

2023.10.12

Java Spring Boot开发
Java Spring Boot开发

本专题围绕 Java 主流开发框架 Spring Boot 展开,系统讲解依赖注入、配置管理、数据访问、RESTful API、微服务架构与安全认证等核心知识,并通过电商平台、博客系统与企业管理系统等项目实战,帮助学员掌握使用 Spring Boot 快速开发高效、稳定的企业级应用。

68

2025.08.19

Java Spring Boot 4更新教程_Java Spring Boot 4有哪些新特性
Java Spring Boot 4更新教程_Java Spring Boot 4有哪些新特性

Spring Boot 是一个基于 Spring 框架的 Java 开发框架,它通过 约定优于配置的原则,大幅简化了 Spring 应用的初始搭建、配置和开发过程,让开发者可以快速构建独立的、生产级别的 Spring 应用,无需繁琐的样板配置,通常集成嵌入式服务器(如 Tomcat),提供“开箱即用”的体验,是构建微服务和 Web 应用的流行工具。

33

2025.12.22

Java Spring Boot 微服务实战
Java Spring Boot 微服务实战

本专题深入讲解 Java Spring Boot 在微服务架构中的应用,内容涵盖服务注册与发现、REST API开发、配置中心、负载均衡、熔断与限流、日志与监控。通过实际项目案例(如电商订单系统),帮助开发者掌握 从单体应用迁移到高可用微服务系统的完整流程与实战能力。

114

2025.12.24

什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

325

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

233

2023.10.07

Java JVM 原理与性能调优实战
Java JVM 原理与性能调优实战

本专题系统讲解 Java 虚拟机(JVM)的核心工作原理与性能调优方法,包括 JVM 内存结构、对象创建与回收流程、垃圾回收器(Serial、CMS、G1、ZGC)对比分析、常见内存泄漏与性能瓶颈排查,以及 JVM 参数调优与监控工具(jstat、jmap、jvisualvm)的实战使用。通过真实案例,帮助学习者掌握 Java 应用在生产环境中的性能分析与优化能力。

19

2026.01.20

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
RunnerGo从入门到精通
RunnerGo从入门到精通

共22课时 | 1.7万人学习

尚学堂Mahout视频教程
尚学堂Mahout视频教程

共18课时 | 3.2万人学习

Linux优化视频教程
Linux优化视频教程

共14课时 | 3.1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号