Python连接MQ核心是选对客户端库、建立可靠连接、正确收发消息并做好异常与确认处理;主流MQ对应库包括RabbitMQ用pika、Kafka用kafka-python、Redis用redis-py、RocketMQ用rocketmq-client-python。

Python连接消息队列(MQ)系统,核心是选对客户端库、建立可靠连接、正确收发消息,并做好异常与确认处理。不同MQ系统协议和API略有差异,但通用逻辑一致。
一、选择MQ系统与对应Python客户端
主流MQ及推荐库:
- RabbitMQ → 使用 pika(官方推荐,支持AMQP协议)
- Kafka → 使用 kafka-python(纯Python实现,兼容Kafka 0.10+)
- Redis(作为轻量级MQ)→ 使用 redis-py(通过List或Pub/Sub模式)
- Apache RocketMQ → 使用 rocketmq-client-python(官方Python SDK)
安装示例(以RabbitMQ为例):
pip install pika
二、RabbitMQ基础连接与消息发送(AMQP流程)
典型步骤:建立连接 → 创建信道 → 声明交换机/队列 → 发布消息
立即学习“Python免费学习笔记(深入)”;
import pika1. 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()
2. 确保队列存在(不存在则自动创建)
channel.queue_declare(queue='task_queue', durable=True)
3. 发送消息(持久化 + 消息确认)
channel.basic_publish( exchange='', routing_key='task_queue', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # 消息持久化,重启后不丢失 ) ) print(" [x] Sent 'Hello World!'") connection.close()
三、消费端:可靠接收与手动确认
避免消息丢失的关键是关闭自动确认(auto_ack=False),并显式调用 basic_ack。
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
# 模拟处理耗时任务
import time
time.sleep(2)
# 手动确认消息已处理完成
ch.basic_ack(delivery_tag=method.delivery_tag)
关闭自动确认
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
注意:若处理中崩溃且未ack,RabbitMQ会将消息重新入队(前提是队列和消息都设为durable,且消费者未设置requeue=False)。
四、常见问题与健壮性建议
- 连接断开重试:用 try-except 包裹连接逻辑,配合指数退避重连
- 消息序列化:发送前用 json.dumps(),接收后用 json.loads(),避免字节与字符串混淆
- 死信队列(DLX):为异常消息设置TTL或最大重试次数,导向专门的死信队列便于排查
- 连接池管理:高并发场景下避免频繁创建/关闭连接,可封装成单例或使用连接池(如pika不原生支持,需自行管理)
基本上就这些。MQ不是黑盒,理解“连接-声明-发/收-确认-异常兜底”这条主线,就能稳住大多数业务场景。










