0

0

EMQ X Redis数据持久化怎么实现

WBOY

WBOY

发布时间:2023-06-02 11:43:37

|

1987人浏览过

|

来源于亿速云

转载

emq x 数据持久化简介

数据持久化的主要使用场景包括将客户端上下线状态,订阅主题信息,消息内容,消息抵达后发送消息回执等操作记录到 redis、mysql、postgresql、mongodb、cassandra、aws dynamodb 等各类数据库中供外部服务快速查询或在服务宕机/客户端异常离线时保留当前运行状态,连接恢复时恢复到之前状态;持久化亦可用于客户端代理订阅,设备客户端上线时,持久化模块直接从数据库加载预设的主题并完成代理订阅,降低系统设计复杂度和减少客户端订阅通信开销。

用户也可以通过订阅相关主题的方式来实现类似的功能,但是在企业版中内置的这些持久化的支持执行效率更高、可靠性更强,大大降低了开发者的工作量并提升了系统稳定性。

数据持久化是 EMQ X 的重要功能,仅在企业版支持。

持久化设计

持久化原理是配置事件钩子触发时调用处理函数(action),处理函数获取到相应的数据后按照配置的指令进行处理,实现数据的增、删、改、查。相同事件钩子在不同数据库中可用参数是一样的,但处理函数(action)因数据库特性不同有所差异。整个持久化工作模式和流程如下:

一对一消息存储

EMQ X Redis数据持久化怎么实现

  1. Publish 端发布一条消息;

  2. Backend 将消息记录数据库中;

  3. Subscribe 端订阅主题;

  4. Backend 从数据库中获取该主题的消息;

  5. 发送消息给 Subscribe 端;

  6. Subscribe 端确认后 Backend 从数据库中移除该消息;

一对多消息存储

EMQ X Redis数据持久化怎么实现

  1. PUB 端发布一条消息;

  2. Backend 将消息记录在数据库中;

  3. SUB1 和 SUB2 订阅主题;

  4. Backend 从数据库中获取该主题的消息;

  5. 发送消息给 SUB1 和 SUB2;

  6. Backend 记录 SUB1 和 SUB2 已读消息位置,下次获取消息从该位置开始。

Redis 数据持久化

本文以实际例子来说明如何通过 Redis 来存储相关的信息。

Redis 是完全开源免费遵守 BSD 协议的高性能 key-value 数据库。

相比其他 key-value 缓存产品 Redis 有以下特点:

  • Redis 性能极高,单机支持十万级别的读写速度。

  • Redis 支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用。

  • Redis 不仅仅支持简单的 key-value 类型的数据,同时还提供 list,set,zset,hash 等数据结构的存储。

  • Redis 支持数据的备份,即 master-slave 模式的数据备份。

读者可以参考 Redis 官方的 Quick Start 来安装 Redis(写本文的时候,Redis 版本为5.0),通过 redis-server 命令来启动 Redis 服务器。

配置 EMQ X 服务器

通过 RPM 方式安装的 EMQ X,Redis 相关的配置文件位于 /etc/emqx/plugins/emqx_backend_redis.conf,如果只是测试 Redis 持久化的功能,大部分配置不需要做更改。唯一需要更改的地方可能是 Redis 服务器的地址:如果读者安装的 Redis 不与 EMQ X 在同一服务器上,请指定正确的 Redis 服务器的地址与端口。如下所示:

## Redis Server 127.0.0.1:6379, Redis Sentinel: 127.0.0.1:26379
backend.redis.pool1.server = 127.0.0.1:6379

保持剩下部分的配置文件不变,然后启动该插件:

emqx_ctl plugins load emqx_backend_redis

客户端在线状态存储

客户端上下线时,更新在线状态、上下线时间、节点客户端列表至 Redis 数据库。

尽管 EMQ X 本身提供了设备在线状态 API,但在需要频繁获取客户端在线状态、上下线时间的场景下,直接从数据库获取该记录比调用 EMQ X API 更高效。

配置项

打开配置文件,配置 Backend 规则:

银河易创
银河易创

一站式AIGC创作平台,集成GPT-3.5、GPT-4、文心一言等对话模型、Midjourney、DallE等绘画工具、AI音乐、AI视频和AI PPT等功能!

下载
## 上线
backend.redis.hook.client.connected.1    =  { "action": { "function": "on_client_connected" }, "pool": "pool1"}

## 下线
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

使用示例

浏览器打开 http://127.0.0.1:18083 EMQ X 管理控制台,在 工具 -> Websocket 中新建一个客户端连接,指定 clientid 为 sub_client:

EMQ X Redis数据持久化怎么实现

打开 redis-cli 命令行窗口,执行命令 keys *,结果如下所示,读者可以看到在 Redis 中存储了两个 key:

127.0.0.1:6379> keys *
1) "mqtt:node:emqx@127.0.0.1"
2) "mqtt:client:sub_client"

连接列表

插件以 mqtt:node:{node_name} 格式的 key 记录节点下客户端列表及连接时间戳信息,等效操作:

## redis key 为 mqtt:node:{node_name}
HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836

字段说明:

## 节点下在线设备信息
127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1
1) "sub_client1" # clientid
2) "1542272836" # 上线时间时间戳
3) "sub_client"
4) "1542272836"

连接详细信息

插件以 mqtt:client:{client_id} 格式的 key 记录客户端在线状态、上线时间,等效操作:

## redis key 为 mqtt:client:{client_id}
HMSET mqtt:client:sub_client state 1 online_at 1542272854

字段说明:

## 客户端在线状态
127.0.0.1:6379> HGETALL mqtt:client:sub_client
1) "state"
2) "0" # 0 离线 1 在线
3) "online_at"
4) "1542272854" # 上线时间戳
5) "offline_at"
6) "undefined" # 离线时间戳

客户端代理订阅

当客户端上线时,代理会加载订阅主题,而存储模块会直接从数据库中读取预设待订阅列表。应用程序可以通过数据层的设定或更改来控制代理订阅列表,以便在需要预定主题进行通信和接收消息的情况下与客户端进行通信。

配置项

打开配置文件,配置 Backend 规则:

## hook: client.connected
## action/function: on_subscribe_lookup
backend.redis.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}

使用示例

sub_client 设备上线时,需要为其订阅 sub_client/upstreamsub_client/downlink 两个 QoS 1 的主题:

  1. 插件以 mqtt:sub:{client_id} 格式 key 在 Redis 中初始化代理订阅 Hash:

## redis key 为 mqtt:sub:{client_id}
## HSET key {topic} {qos}
127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/upstream 1
(integer) 0

127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/downlink 1
(integer) 0
  1. EMQ X 管理控制台 WebSocket 页面,以 clientid sub_client 新建一个客户端连接,切换至订阅页面,可见当前客户端自动订阅了 sub_client/upstreamsub_client/downlink 两个 QoS 1 的主题:

EMQ X Redis数据持久化怎么实现

  1. 切换回管理控制台 WebSocket 页面,向 sub_client/downlink 主题发布消息,可在消息订阅列表收到发布的消息。

持久化发布消息

配置项

打开配置文件,配置 Backend 规则,支持使用 topic 参数进行消息过滤,此处使用 # 通配符存储任意主题消息:

## hook: message.publish
## action/function: on_message_publish

backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

使用示例

在 EMQ X 管理控制台 WebSocket 页面中,使用 clientid sub_client 建立连接,向主题 upstream_topic 发布多条消息。针对每条消息, EMQ X 将持久化消息列表、消息详情两条记录。

消息列表

EMQ X 将消息列表以 message id 持久化至 mqtt:msg:{topic} Redis 集合中:

## 获取 upstream_topic 主题集合中所有 message id
127.0.0.1:6379> ZRANGE mqtt:msg:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
2) "2VFszTClyjpVtLDLrn1u"
3) "2VFszozkwkYOcbEy8QN9"
4) "2VFszpEc7DfbEqC97I3g"
5) "2VFszpSzRviADmcOeuXd"
6) "2VFszpm3kvvLkJTcdmGU"
7) "2VFt0kuNrOktefX6m4nP"
127.0.0.1:6379>

消息详情

每条消息详情将以 mqtt:msg:{message_id} 格式的 key 存储在 Redis Hash 中:

## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
 1) "id"
 2) "2VFt0kuNrOktefX6m4nP" ## message id
 3) "from"
 4) "sub_client" ## client id
 5) "qos"
 6) "2"
 7) "topic"
 8) "up/upstream_topic"
 9) "payload"
10) "{ "cmd": "reboot" }"
11) "ts"
12) "1542338754" ## pub 时间戳
13) "retain"
14) "false"

获取离线消息

配置项

打开配置文件,配置 Backend 规则:

## hook: session.subscribed
## action/function: on_message_fetch_for_queue、on_message_fetch_for_pubsub

## 一对一离线消息
backend.redis.hook.session.subscribed.1  = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}

## 一对多离线消息
backend.redis.hook.session.subscribed.2  = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}

使用示例

MQTT 离线消息需满足以下条件:

  1. 以 clean_session = false 连接

  2. 订阅 QoS > 0

  3. 发布 QoS > 0

在 EMQ X 管理控制台中以如下配置建立连接,

EMQ X Redis数据持久化怎么实现

持久化 Retain 消息

配置项

打开配置文件,配置 Backend 规则:

## hook: message.publish
## action/function: on_client_connected、on_message_retain

backend.redis.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

backend.redis.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

消息列表

EMQ X 将消息列表以 message id 持久化至 mqtt:retain:{topic} Redis Hash 中:

## 获取 upstream_topic 主题集合中所有 message id
127.0.0.1:6379> ZRANGE mqtt:retain:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
127.0.0.1:6379>

消息详情

每条消息详情将以 mqtt:msg:{message_id} 格式的 key 存储在 Redis Hash 中:

## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
 1) "id"
 2) "2VFt0kuNrOktefX6m4nP" ## message id
 3) "from"
 4) "sub_client" ## client id
 5) "qos"
 6) "2"
 7) "topic"
 8) "up/upstream_topic"
 9) "payload"
10) "{ "cmd": "reboot" }"
11) "ts"
12) "1542338754" ## pub 时间戳
13) "retain"
14) "false"

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
mysql修改数据表名
mysql修改数据表名

MySQL修改数据表:1、首先查看数据库中所有的表,代码为:‘SHOW TABLES;’;2、修改表名,代码为:‘ALTER TABLE 旧表名 RENAME [TO] 新表名;’。php中文网还提供MySQL的相关下载、相关课程等内容,供大家免费下载使用。

686

2023.06.20

MySQL创建存储过程
MySQL创建存储过程

存储程序可以分为存储过程和函数,MySQL中创建存储过程和函数使用的语句分别为CREATE PROCEDURE和CREATE FUNCTION。使用CALL语句调用存储过程智能用输出变量返回值。函数可以从语句外调用(通过引用函数名),也能返回标量值。存储过程也可以调用其他存储过程。php中文网还提供MySQL创建存储过程的相关下载、相关课程等内容,供大家免费下载使用。

513

2023.06.21

mongodb和mysql的区别
mongodb和mysql的区别

mongodb和mysql的区别:1、数据模型;2、查询语言;3、扩展性和性能;4、可靠性。本专题为大家提供mongodb和mysql的区别的相关的文章、下载、课程内容,供大家免费下载体验。

287

2023.07.18

mysql密码忘了怎么查看
mysql密码忘了怎么查看

MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS 应用软件之一。那么mysql密码忘了怎么办呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

519

2023.07.19

mysql创建数据库
mysql创建数据库

MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS 应用软件之一。那么mysql怎么创建数据库呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

267

2023.07.25

mysql默认事务隔离级别
mysql默认事务隔离级别

MySQL是一种广泛使用的关系型数据库管理系统,它支持事务处理。事务是一组数据库操作,它们作为一个逻辑单元被一起执行。为了保证事务的一致性和隔离性,MySQL提供了不同的事务隔离级别。php中文网给大家带来了相关的教程以及文章欢迎大家前来学习阅读。

392

2023.08.08

sqlserver和mysql区别
sqlserver和mysql区别

SQL Server和MySQL是两种广泛使用的关系型数据库管理系统。它们具有相似的功能和用途,但在某些方面存在一些显著的区别。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

542

2023.08.11

mysql忘记密码
mysql忘记密码

MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。那么忘记mysql密码我们该怎么解决呢?php中文网给大家带来了相关的教程以及其他关于mysql的文章,欢迎大家前来学习阅读。

666

2023.08.14

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

3

2026.03.11

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
进程与SOCKET
进程与SOCKET

共6课时 | 0.4万人学习

Redis+MySQL数据库面试教程
Redis+MySQL数据库面试教程

共72课时 | 7.1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号