0

0

redis如何实现队列 redis队列实现的3种经典模式

下次还敢

下次还敢

发布时间:2025-07-04 08:20:02

|

1138人浏览过

|

来源于php中文网

原创

redis实现队列有三种经典模式,分别适用于不同场景。1.list的lpush+rpop:优点是实现简单、性能高,但无持久化和确认机制,消息可能丢失,适用于对数据丢失不敏感、高性能需求的场景;2.list的lpush+brpop:支持阻塞读取,避免轮询浪费资源,但仍有数据丢失风险,适用于需减少cpu消耗的简单任务处理;3.stream的xadd+xreadgroup:支持持久化、消息确认、分组消费和广播,可靠性高但实现复杂、性能较低,适用于订单处理、支付通知等对数据可靠性要求高的场景。选择时应根据业务需求权衡性能与可靠性,并注意消息序列化、大小限制、监控及事务等问题。

redis如何实现队列 redis队列实现的3种经典模式

Redis实现队列,本质上是利用其数据结构的特性,例如List的LPUSH/RPOP或者Stream的XADD/XREADGROUP命令,来模拟队列的行为。选择哪种模式,取决于你的应用场景对数据可靠性、顺序性、以及复杂度的要求。

Redis队列实现的3种经典模式

Redis提供了多种方式来实现队列,下面将详细介绍三种经典模式,并分析其优缺点及适用场景。

为什么选择Redis实现队列?

在深入探讨实现方式之前,先聊聊为什么要用Redis做队列。消息队列有很多选择,比如RabbitMQ、Kafka等等,它们在消息的可靠性、持久性上通常做得更好。但Redis的优势在于:快!它的内存操作速度极快,而且部署简单,对于一些对性能要求高、但对数据丢失不敏感的场景,Redis队列是个不错的选择。当然,如果你的业务对消息的可靠性要求非常高,那还是应该选择专业的MQ。

模式一:List的LPUSH + RPOP

这是最简单的一种实现方式。利用Redis的List数据结构,LPUSH命令从列表头部插入元素,RPOP命令从列表尾部移除元素,模拟先进先出的队列。

优点:

小羊标书
小羊标书

一键生成百页标书,让投标更简单高效

下载
  • 实现简单,代码量少。
  • 性能高,基于Redis内存操作。

缺点:

  • 没有持久化机制,Redis宕机数据会丢失。
  • 没有消息确认机制,消费者如果处理消息失败,消息会丢失。
  • 不支持消息的广播和分组消费。
  • 阻塞式读取,如果没有消息,客户端会一直阻塞,浪费资源。

示例代码 (Python):

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

def enqueue(queue_name, message):
    r.lpush(queue_name, message)

def dequeue(queue_name, timeout=0): # timeout=0 表示非阻塞
    result = r.brpop(queue_name, timeout=timeout)
    if result:
        return result[1].decode('utf-8') # 返回消息内容
    else:
        return None # 超时返回None

# 示例用法
enqueue("my_queue", "message1")
enqueue("my_queue", "message2")

message = dequeue("my_queue")
print(f"Dequeued: {message}")

message = dequeue("my_queue", timeout=5) # 阻塞5秒
print(f"Dequeued with timeout: {message}")

适用场景:

  • 对数据丢失不敏感,允许少量数据丢失。
  • 对性能要求高,需要快速处理消息。
  • 简单的消息通知,例如实时统计、简单的任务调度。

模式二:List的LPUSH + BRPOP(阻塞式)

与第一种模式类似,但使用BRPOP(阻塞式RPOP)命令。BRPOP会在列表为空时阻塞客户端,直到有新的元素加入,或者超时。

优点:

  • 避免了客户端轮询,节省了CPU资源。
  • 实现简单。

缺点:

  • 仍然存在数据丢失的风险。
  • 仍然不支持消息的广播和分组消费。
  • 阻塞式读取,如果连接断开,可能会导致消息丢失(消费者未处理就断开)。

示例代码 (Python):

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

def enqueue(queue_name, message):
    r.lpush(queue_name, message)

def dequeue(queue_name, timeout=0):
    result = r.brpop(queue_name, timeout=timeout)
    if result:
        return result[1].decode('utf-8')
    else:
        return None

# 示例用法
enqueue("my_queue", "message3")
message = dequeue("my_queue", timeout=5) # 阻塞5秒
print(f"Dequeued with blocking: {message}")

适用场景:

  • 与第一种模式类似,但对CPU资源有更高要求,需要避免轮询。
  • 例如:简单的实时任务处理。

模式三:Stream的XADD + XREADGROUP

Redis 5.0 引入了 Stream 数据结构,它提供了更强大的队列功能,支持消息的持久化、分组消费、消息确认等。

优点:

  • 支持消息的持久化,即使Redis重启,消息也不会丢失。
  • 支持消息确认机制,消费者可以确认消息是否处理成功。
  • 支持消息的分组消费,多个消费者可以同时消费同一个Stream的消息。
  • 支持消息的广播,可以将消息发送给多个消费者。
  • 可以回溯历史消息。

缺点:

  • 实现相对复杂,代码量较多。
  • 性能相对较低,因为需要进行持久化。

示例代码 (Python):

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

STREAM_NAME = "my_stream"
GROUP_NAME = "my_group"
CONSUMER_NAME = "consumer_1"

# 创建消费者组 (如果不存在)
try:
    r.xgroup_create(STREAM_NAME, GROUP_NAME, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
    if str(e).startswith("BUSYGROUP"):
        print("Group already exists, skipping creation.")
    else:
        raise e


def enqueue(stream_name, message):
    r.xadd(stream_name, {'data': message})

def dequeue(stream_name, group_name, consumer_name, block=0): # block=0 非阻塞
    response = r.xreadgroup(groupname=group_name, consumername=consumer_name, streams={stream_name: '>'}, block=block, count=1)

    if response:
        stream, messages = response[0]
        message_id, message_data = messages[0]
        return message_id.decode('utf-8'), message_data[b'data'].decode('utf-8')
    else:
        return None, None


def acknowledge(stream_name, group_name, message_id):
    r.xack(stream_name, group_name, message_id)


# 示例用法
enqueue(STREAM_NAME, "message4")

message_id, message = dequeue(STREAM_NAME, GROUP_NAME, CONSUMER_NAME, block=5000) # 阻塞5秒
if message:
    print(f"Dequeued (Stream): {message}, ID: {message_id}")
    acknowledge(STREAM_NAME, GROUP_NAME, message_id) # 确认消息
else:
    print("No message received within timeout.")

适用场景:

  • 对数据可靠性要求较高,不允许数据丢失。
  • 需要支持消息的分组消费和广播。
  • 例如:订单处理、支付通知、日志收集。

如何选择合适的队列模式?

选择哪种模式取决于你的具体需求。如果只是简单的消息通知,对数据丢失不敏感,可以选择List的LPUSH + RPOP。如果需要避免客户端轮询,可以选择List的LPUSH + BRPOP。如果对数据可靠性要求较高,需要支持消息的持久化和分组消费,可以选择Stream的XADD + XREADGROUP。

还有其他需要注意的点吗?

  • 消息序列化: Redis存储的是字符串,所以你需要将消息序列化成字符串,例如使用JSON。
  • 消息大小: Redis对单个value的大小有限制,需要注意消息的大小。
  • 监控: 需要对Redis队列进行监控,例如队列长度、消费速度等。
  • 过期时间: 对于一些不需要持久化的消息,可以设置过期时间,避免占用过多内存。
  • 事务: 在某些场景下,可能需要使用Redis的事务来保证操作的原子性。

总而言之,Redis队列是一种轻量级的消息队列解决方案,适用于对性能要求高、但对数据可靠性要求不高的场景。 在选择使用Redis队列时,需要仔细评估业务需求,并选择合适的模式。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

207

2024.02.23

Java 消息队列与异步架构实战
Java 消息队列与异步架构实战

本专题系统讲解 Java 在消息队列与异步系统架构中的核心应用,涵盖消息队列基本原理、Kafka 与 RabbitMQ 的使用场景对比、生产者与消费者模型、消息可靠性与顺序性保障、重复消费与幂等处理,以及在高并发系统中的异步解耦设计。通过实战案例,帮助学习者掌握 使用 Java 构建高吞吐、高可靠异步消息系统的完整思路。

47

2026.01.28

json数据格式
json数据格式

JSON是一种轻量级的数据交换格式。本专题为大家带来json数据格式相关文章,帮助大家解决问题。

453

2023.08.07

json是什么
json是什么

JSON是一种轻量级的数据交换格式,具有简洁、易读、跨平台和语言的特点,JSON数据是通过键值对的方式进行组织,其中键是字符串,值可以是字符串、数值、布尔值、数组、对象或者null,在Web开发、数据交换和配置文件等方面得到广泛应用。本专题为大家提供json相关的文章、下载、课程内容,供大家免费下载体验。

546

2023.08.23

jquery怎么操作json
jquery怎么操作json

操作的方法有:1、“$.parseJSON(jsonString)”2、“$.getJSON(url, data, success)”;3、“$.each(obj, callback)”;4、“$.ajax()”。更多jquery怎么操作json的详细内容,可以访问本专题下面的文章。

331

2023.10.13

go语言处理json数据方法
go语言处理json数据方法

本专题整合了go语言中处理json数据方法,阅读专题下面的文章了解更多详细内容。

82

2025.09.10

kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

175

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

157

2024.02.23

JavaScript浏览器渲染机制与前端性能优化实践
JavaScript浏览器渲染机制与前端性能优化实践

本专题围绕 JavaScript 在浏览器中的执行与渲染机制展开,系统讲解 DOM 构建、CSSOM 解析、重排与重绘原理,以及关键渲染路径优化方法。内容涵盖事件循环机制、异步任务调度、资源加载优化、代码拆分与懒加载等性能优化策略。通过真实前端项目案例,帮助开发者理解浏览器底层工作原理,并掌握提升网页加载速度与交互体验的实用技巧。

1

2026.03.06

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
进程与SOCKET
进程与SOCKET

共6课时 | 0.4万人学习

Redis+MySQL数据库面试教程
Redis+MySQL数据库面试教程

共72课时 | 7.1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号