0

0

Python pulsar-client 的 python 异步支持

舞夢輝影

舞夢輝影

发布时间:2026-02-19 08:19:03

|

895人浏览过

|

来源于php中文网

原创

asyncio 不能直接使用 pulsar-client,因其底层为同步阻塞 c++ 实现;正确方式是 consumer 启用 listener 回调并用 asyncio.to_thread 执行异步逻辑,producer 需线程安全复用,且必须显式 ack/nack 消息。

python pulsar-client 的 python 异步支持

asyncio 不能直接用 pulsar.Client

Python pulsar-client 官方 SDK 本身是同步阻塞的,底层基于 C++ client 封装,pulsar.Clientproducer.send()consumer.receive() 全部会阻塞当前线程 —— 即使你在 async def 函数里调用,也会让整个 event loop 卡住。

常见错误现象:RuntimeWarning: coroutine 'xxx' was never awaited 没出现,但 CPU 占用低、吞吐上不去、大量协程堆积在 pending 状态,本质是同步调用拖垮了 asyncio 调度。

  • 别试图给 client.create_producer()await —— 它根本不是协程,加了会报 TypeError: object XXX can't be used in 'await' expression
  • 不要用 loop.run_in_executor 包一层就以为“异步化”了:短消息场景下线程切换开销可能比发送本身还高,且无法解决 consumer 拉取的实时性问题
  • 真实使用场景:Web API 接收请求后发 Pulsar 消息(需低延迟响应)、长周期 consumer 处理流式事件(需不阻塞其他任务)

必须用 pulsar-clientlistener 模式配 asyncio.to_thread

官方唯一推荐的轻量适配方式,是启用 consumer 的 listener callback,并用 asyncio.to_thread 把业务逻辑扔进线程池执行,避免阻塞 event loop。注意:producer 仍需手动管理线程安全,不能跨线程复用同一 Producer 实例。

关键参数差异:consumer.subscribe(listener=...) 中的 listener 函数必须是普通同步函数;它内部调用 asyncio.to_thread(your_async_handler, msg) 才真正进入异步世界。

立即学习Python免费学习笔记(深入)”;

NewsBang
NewsBang

盛大旗下AI团队推出的智能新闻阅读App

下载
  • listener 函数内禁止 await 任何东西,否则会 crash(Pulsar C++ 层不识别协程对象)
  • 每个 msg 必须显式调用 consumer.acknowledge(msg)consumer.negative_acknowledge(msg),listener 不自动 ack
  • 若 handler 抛异常,需在 listener 里捕获并调用 negative_acknowledge,否则消息会卡在 unacked 状态

替代方案:用 pulsar-client-cpp + uvloop 自建 bridge 成本太高

有人尝试用 Cython 封装 pulsar-client-cpp 的 async 接口,或通过 uvloop 直接对接 libpulsar 的 event loop —— 这些路径在实际项目中几乎不可维护。C++ client 的 async callback 语义和 Python asyncio 的生命周期管理冲突严重,比如 MessageId 在 callback 触发时可能已被 GC,导致 acknowledge 失败并静默丢消息。

性能影响明显:每条消息多一次 Python/C 边界穿越 + 手动 refcount 管理,吞吐反而比 to_thread 低 20%~40%,且 macOS 和 musl libc 环境下容易 core dump。

  • 兼容性风险:pulsar-client 3.x 的 C++ 库 ABI 不稳定,升级 minor 版本常需重编译 binding
  • 调试困难:stack trace 混杂 Python/Cpp/uvloop 三层,gdb 断点难打,core 文件无 Python 上下文
  • 除非你已有成熟 C++ 异步消息中间件团队,否则别碰这个方向

Consumer 启动慢、首次 receive 延迟高?检查 receiver_queue_sizemax_pending_chunked_message

默认配置下,consumer 初始化要等 broker 返回全量 topic 分区元数据,再逐个建立 TCP 连接,冷启动耗时可达 2~5 秒。这不是异步问题,而是 client 内部同步初始化逻辑导致的 —— 即使你用 to_thread,这个初始化过程也得在主线程完成。

可缓解但无法根除:把 receiver_queue_size 从默认 1000 降到 100,能减少单次拉取的数据量;设 max_pending_chunked_message=0 关闭 chunk 支持(如果你不用 batch + compression),避免额外解析开销。

  • 千万别在 async def startup_event() 里直接 client.subscribe(...) —— FastAPI/Uvicorn 的 reload 模式会反复 init client,触发连接泄漏
  • 推荐做法:用 atexit.register() 或 lifespan hook 显式 close client,且 consumer 实例应全局单例复用
  • 最容易被忽略的一点:broker 端 subscriptionTypesEnabled 配置为 [Exclusive, Shared] 时,Shared 模式 consumer 启动更快,但要注意 message redelivery 行为变化

相关文章

python速学教程(入门到精通)
python速学教程(入门到精通)

python怎么学习?python怎么入门?python在哪学?python怎么学才快?不用担心,这里为大家提供了python速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
什么是中间件
什么是中间件

中间件是一种软件组件,充当不兼容组件之间的桥梁,提供额外服务,例如集成异构系统、提供常用服务、提高应用程序性能,以及简化应用程序开发。想了解更多中间件的相关内容,可以阅读本专题下面的文章。

180

2024.05.11

Golang 中间件开发与微服务架构
Golang 中间件开发与微服务架构

本专题系统讲解 Golang 在微服务架构中的中间件开发,包括日志处理、限流与熔断、认证与授权、服务监控、API 网关设计等常见中间件功能的实现。通过实战项目,帮助开发者理解如何使用 Go 编写高效、可扩展的中间件组件,并在微服务环境中进行灵活部署与管理。

224

2025.12.18

Python FastAPI异步API开发_Python怎么用FastAPI构建异步API
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API

Python FastAPI 异步开发利用 async/await 关键字,通过定义异步视图函数、使用异步数据库库 (如 databases)、异步 HTTP 客户端 (如 httpx),并结合后台任务队列(如 Celery)和异步依赖项,实现高效的 I/O 密集型 API,显著提升吞吐量和响应速度,尤其适用于处理数据库查询、网络请求等耗时操作,无需阻塞主线程。

28

2025.12.22

Python 微服务架构与 FastAPI 框架
Python 微服务架构与 FastAPI 框架

本专题系统讲解 Python 微服务架构设计与 FastAPI 框架应用,涵盖 FastAPI 的快速开发、路由与依赖注入、数据模型验证、API 文档自动生成、OAuth2 与 JWT 身份验证、异步支持、部署与扩展等。通过实际案例,帮助学习者掌握 使用 FastAPI 构建高效、可扩展的微服务应用,提高服务响应速度与系统可维护性。

223

2026.02.06

硬盘接口类型介绍
硬盘接口类型介绍

硬盘接口类型有IDE、SATA、SCSI、Fibre Channel、USB、eSATA、mSATA、PCIe等等。详细介绍:1、IDE接口是一种并行接口,主要用于连接硬盘和光驱等设备,它主要有两种类型:ATA和ATAPI,IDE接口已经逐渐被SATA接口;2、SATA接口是一种串行接口,相较于IDE接口,它具有更高的传输速度、更低的功耗和更小的体积;3、SCSI接口等等。

1528

2023.10.19

PHP接口编写教程
PHP接口编写教程

本专题整合了PHP接口编写教程,阅读专题下面的文章了解更多详细内容。

423

2025.10.17

php8.4实现接口限流的教程
php8.4实现接口限流的教程

PHP8.4本身不内置限流功能,需借助Redis(令牌桶)或Swoole(漏桶)实现;文件锁因I/O瓶颈、无跨机共享、秒级精度等缺陷不适用高并发场景。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

2260

2025.12.29

java接口相关教程
java接口相关教程

本专题整合了java接口相关内容,阅读专题下面的文章了解更多详细内容。

37

2026.01.19

pixiv网页版官网登录与阅读指南_pixiv官网直达入口与在线访问方法
pixiv网页版官网登录与阅读指南_pixiv官网直达入口与在线访问方法

本专题系统整理pixiv网页版官网入口及登录访问方式,涵盖官网登录页面直达路径、在线阅读入口及快速进入方法说明,帮助用户高效找到pixiv官方网站,实现便捷、安全的网页端浏览与账号登录体验。

561

2026.02.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.4万人学习

Django 教程
Django 教程

共28课时 | 4.4万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.6万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号