python连接mq核心是选对客户端库、建立可靠连接、正确收发消息并做好异常与确认处理;主流mq对应库包括rabbitmq用pika、kafka用kafka-python、redis用redis-py、rocketmq用rocketmq-client-python。

Python连接消息队列(MQ)系统,核心是选对客户端库、建立可靠连接、正确收发消息,并做好异常与确认处理。不同MQ系统协议和API略有差异,但通用逻辑一致。
一、选择MQ系统与对应Python客户端
主流MQ及推荐库:
- RabbitMQ → 使用 pika(官方推荐,支持AMQP协议)
- Kafka → 使用 kafka-python(纯Python实现,兼容Kafka 0.10+)
- Redis(作为轻量级MQ)→ 使用 redis-py(通过List或Pub/Sub模式)
- Apache RocketMQ → 使用 rocketmq-client-python(官方Python SDK)
安装示例(以RabbitMQ为例):
pip install pika
二、RabbitMQ基础连接与消息发送(AMQP流程)
典型步骤:建立连接 → 创建信道 → 声明交换机/队列 → 发布消息
立即学习“Python免费学习笔记(深入)”;
import pika
<h1>1. 连接RabbitMQ服务器</h1><p>connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()</p><h1>2. 确保队列存在(不存在则自动创建)</h1><p>channel.queue_declare(queue='task_queue', durable=True)</p><h1>3. 发送消息(持久化 + 消息确认)</h1><p>channel.basic_publish(
exchange='',
routing_key='task_queue',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化,重启后不丢失
)
)
print(" [x] Sent 'Hello World!'")
connection.close()
三、消费端:可靠接收与手动确认
避免消息丢失的关键是关闭自动确认(auto_ack=False),并显式调用 basic_ack。
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
# 模拟处理耗时任务
import time
time.sleep(2)
# 手动确认消息已处理完成
ch.basic_ack(delivery_tag=method.delivery_tag)
<h1>关闭自动确认</h1><p>channel.basic_consume(queue='task_queue', on_message_callback=callback)</p><p>print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
注意:若处理中崩溃且未ack,RabbitMQ会将消息重新入队(前提是队列和消息都设为durable,且消费者未设置requeue=False)。
四、常见问题与健壮性建议
- 连接断开重试:用 try-except 包裹连接逻辑,配合指数退避重连
- 消息序列化:发送前用 json.dumps(),接收后用 json.loads(),避免字节与字符串混淆
- 死信队列(DLX):为异常消息设置TTL或最大重试次数,导向专门的死信队列便于排查
- 连接池管理:高并发场景下避免频繁创建/关闭连接,可封装成单例或使用连接池(如pika不原生支持,需自行管理)
基本上就这些。MQ不是黑盒,理解“连接-声明-发/收-确认-异常兜底”这条主线,就能稳住大多数业务场景。










