qwen sdk无原生observer模式,需自行基于流式响应迭代器实现;通过比对content长度差提取新增token,注意空chunk、网络异常、线程安全及超时处理。
☞☞☞AI 智能聊天, 问答助手, AI 智能搜索, 免费无限量使用 DeepSeek R1 模型☜☜☜

Observer 模式在 Qwen(千问)SDK 中不存在原生实现
Qwen 的官方 Python SDK(dashscope 或 qwen-agent)不提供 Observer、subscribe、notify 这类接口。它面向的是「单次请求-响应」或「流式生成」场景,不是事件驱动架构。想用观察者模式监听模型输出过程,得自己搭骨架。
流式响应里怎么“观察” token 生成
真正能动手的地方是流式调用返回的迭代器——比如 dashscope.Generation.call(..., stream=True) 返回一个 Generator,每次 next() 或 for 循环拿到一个 GenerationResponse 对象。这里的 output.text 是增量拼接的,但不是每个 chunk 都含新 token;实际有效增量藏在 output.choices[0].message.content(新版)或 output.text(旧版)里,且可能重复。
- 必须检查
response.output.choices是否非空,空值会抛AttributeError - 推荐用
response.output.choices[0].delta.content(如果 API 支持 delta 格式),否则要自己比对前后content长度差 - 别直接把每次拿到的
content全量渲染,否则会重复刷屏——得缓存上一次完整内容,只 diff 出新增部分
自己实现简易 Observer 时容易漏掉的边界
手动注册回调函数看似简单,但真实跑起来常卡在三处:
-
stream=True时网络中断不会触发on_error,而是抛requests.exceptions.ReadTimeout或ConnectionError,需在外层 try/catch - 回调函数里做 UI 更新(比如 Tkinter / Streamlit)可能触发线程安全问题:SDK 默认同步阻塞调用,但流式迭代本身不新开线程,所以 UI 线程会被卡住——得用
threading.Thread包一层再 start - 没设超时参数(
timeout=30)的话,大模型卡住时整个程序挂起,无法 cancel,建议配合asyncio.wait_for或threading.Timer做兜底
要不要封装成类?看你的调用频次
如果只是临时调试流式输出,写个裸循环加 print 就够了;但如果多个地方都要监听 token、计算耗时、统计 token 数、同步到前端,那就值得抽一个轻量 QwenStreamObserver 类。注意两点:
- 不要在类里 hold 住
dashscope.api_key或 client 实例——这些该由调用方传入,避免状态污染 - 回调方法名别叫
update(),容易和某些 GUI 框架冲突;用on_token()、on_finish()更直白 - 示例骨架:
class QwenStreamObserver: def __init__(self, on_token=None, on_finish=None): self.on_token = on_token self.on_finish = on_finish self._prev_content = "" def observe(self, response_gen): for r in response_gen: if r.output.choices: curr = r.output.choices[0].message.content new_part = curr[len(self._prev_content):] if new_part and self.on_token: self.on_token(new_part) self._prev_content = curr if self.on_finish: self.on_finish()
真正麻烦的从来不是写 observer,而是判断哪一帧算“新 token”、怎么应对服务端偶尔发来的空 chunk、以及模型突然 mid-stream 把前面内容全推翻(比如带 self-refine 的多阶段生成)。这些得靠日志 + 实际 response 字段反复对齐,没法靠模式名字蒙混过关。










