0

0

RocketMQ消息为什么会被重复消费?

星夢妙者

星夢妙者

发布时间:2025-07-12 09:40:01

|

616人浏览过

|

来源于php中文网

原创

在使用rocketmq时,消息可能会被重复消费。让我们从全局视角探讨消息发送和消费的过程。rocketmq-dashboard是一个非常实用的图形化界面工具

RocketMQ消息为什么会被重复消费?

首先,我们在RocketMQ-Dashboard上创建一个topic,每个topic下有4个队列。

每个topic是一类消息的集合,topic下再细分queue是为了提高消息消费的并发度。

RocketMQ消息为什么会被重复消费?

「当producer发送topic消息时,应该发送到topic下的哪个queue呢?」

producer会采用轮询策略来发送消息。

「那么consumer应该消费哪个queue下的消息呢?」

当只有一个消费者时,它会消费所有queue中的消息。

RocketMQ消息为什么会被重复消费?

「如果有多个消费者呢?」

只需根据各种负载均衡策略将队列分配给消费者即可,如下图展示了两种负载均衡方式。

RocketMQ消息为什么会被重复消费?RocketMQ消息为什么会被重复消费?RocketMQ消息为什么会被重复消费?

你问我这两种负载策略是如何实现的?去查看源码吧,我就不详细分析了。

「如果消费者数量超过队列的数量会发生什么?」

多余的消费者将不会消费任何队列。

RocketMQ消息为什么会被重复消费?

为什么一个consumer只能消费一个queue呢?」

多个消费者消费一个queue肯定会有并发问题,所以需要加锁,这样还不如将topic下的队列数量设置得更多一些。

「我在运行过程中可以设置topic下queue的数量吗?」

当然可以。不仅可以重新设置queue的数量,还可以实时增减consumer,以应对不同流量的场景。

「那这样说当queue或者consumer的数量发生变化的时候,需要重新执行负载均衡吧?」

是的,大家一般把这个过程称为重平衡。

下面我们来分享一下详细的细节。

消息发送流程主要有三种方式:单向发送(只发送,不管结果)、同步发送和异步发送。

RocketMQ消息为什么会被重复消费?

消息消费流程基于推还是拉?消息消费的模式有两种方式:

拉取:Consumer不断从Broker拉取。 推送:Broker向Consumer推送。

这两种方式都有各自的缺点:

拉取:拉取的间隔不好确定,间隔太短没消息时会造成带宽浪费,间隔太长又会造成消息不能及时被消费。 推送:「推送和速率难以适配消费速率」,推的太快,消费者消费不过来怎么办?推的太慢消息不能及时被消费。

「看起来拉取和推送难以抉择。」

炉米Lumi
炉米Lumi

字节跳动推出的AI模型分享社区和模型训练平台

下载

然后就有大佬把拉取模式改了一下,即不会造成带宽浪费,也能基于消费的速率来决定拉取的频率!

「你猜怎么改的?」

其实很简单,Consumer发送拉取请求到Broker端,如果Broker有数据则返回,Consumer端再次拉取。如果Broker端没有数据,不立即返回,而是等待一段时间(例如5s)。

如果在等待的这段时间,有要拉取的消息,则将消息返回,Consumer端再次拉取。如果等待超时,也会直接返回,不会将这个请求一直hold住,Consumer端再次拉取。「对了,这种策略就叫做长轮询。」

「RocketMQ中有拉和推两种消费方式,但是推是基于长轮询做的。」

具体消费流程如下图所示。

RocketMQ消息为什么会被重复消费?

「拉取到消息后是怎么处理的呢?」

PullRequest类的成员变量如下图所示。

RocketMQ消息为什么会被重复消费?

当拉取到消息后,消息会被放入msgTreeMap,其中key为消息的offset,value为消息实体。

「另外还有一个重要的属性dropped,和重平衡相关,重平衡的时候会造成消息的重复消费,具体机制不分析了,看专栏吧。」

msgCount(未消费消息总数)和msgSize(未消费消息大小)是和流控相关的。

「什么是流控呢?」

就是流量控制,当消费者消费的比较慢时,减缓拉取的速度。如下图所示。

RocketMQ消息为什么会被重复消费?

当从阻塞队列中获取PullRequest时,并不会直接发起网络请求,而是先看看是否触发流控的规则,比如未消费的消息总数超过一定值,未消费的消息大小超过一定值等。

RocketMQ消息为什么会被重复消费?

接着就是收到响应,处理消息,并将PullRequest再次放入阻塞队列。

「是不是落了一个步骤?就是Consumer告诉Broker这部分消息我消费了?」

嗯嗯,你是不是以为提交offset的过程是同步的?其实并不是,「是异步的。」

Consumer怎么提交offset?

RocketMQ消息为什么会被重复消费?

当consumer消费完消息只是将offset存在本地,通过定时任务将offset提交到broker,另外broker收到提交offset的请求后,也仅仅是将offset存在map中,通过定时任务持久化到文件中。

「这样就会造成消息的重复消费。」

Consumer消费完消息并不是实时同步到Broker的,而是将offset先保存在本地map中,通过定时任务持久化上去。这就导致消息被消费了,但是此时消费者宕机了导致offset没提交,下次没提交offset的这部分消息会被再次消费。

即使offset被提交到了Broker,在还没来得及持久化的时候Broker宕机了,当重启的时候Broker会读取consumerOffset.json中保存的offset信息,这就会导致没持久化offset的这部分消息会被再次消费。

相关专题

更多
json数据格式
json数据格式

JSON是一种轻量级的数据交换格式。本专题为大家带来json数据格式相关文章,帮助大家解决问题。

412

2023.08.07

json是什么
json是什么

JSON是一种轻量级的数据交换格式,具有简洁、易读、跨平台和语言的特点,JSON数据是通过键值对的方式进行组织,其中键是字符串,值可以是字符串、数值、布尔值、数组、对象或者null,在Web开发、数据交换和配置文件等方面得到广泛应用。本专题为大家提供json相关的文章、下载、课程内容,供大家免费下载体验。

533

2023.08.23

jquery怎么操作json
jquery怎么操作json

操作的方法有:1、“$.parseJSON(jsonString)”2、“$.getJSON(url, data, success)”;3、“$.each(obj, callback)”;4、“$.ajax()”。更多jquery怎么操作json的详细内容,可以访问本专题下面的文章。

310

2023.10.13

go语言处理json数据方法
go语言处理json数据方法

本专题整合了go语言中处理json数据方法,阅读专题下面的文章了解更多详细内容。

74

2025.09.10

golang map内存释放
golang map内存释放

本专题整合了golang map内存相关教程,阅读专题下面的文章了解更多相关内容。

75

2025.09.05

golang map相关教程
golang map相关教程

本专题整合了golang map相关教程,阅读专题下面的文章了解更多详细内容。

36

2025.11.16

golang map原理
golang map原理

本专题整合了golang map相关内容,阅读专题下面的文章了解更多详细内容。

59

2025.11.17

java判断map相关教程
java判断map相关教程

本专题整合了java判断map相关教程,阅读专题下面的文章了解更多详细内容。

38

2025.11.27

高德地图升级方法汇总
高德地图升级方法汇总

本专题整合了高德地图升级相关教程,阅读专题下面的文章了解更多详细内容。

72

2026.01.16

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Node.js 教程
Node.js 教程

共57课时 | 8.9万人学习

【web前端】Node.js快速入门
【web前端】Node.js快速入门

共16课时 | 2万人学习

Node.js-前端工程化必学
Node.js-前端工程化必学

共19课时 | 3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号