
本文介绍如何在 langchain.js 中自定义回调处理器,过滤掉 agent 执行过程中的思考链(如 action、observation 等),仅将 response.output 的最终答案以流式方式逐 token 返回给客户端。
本文介绍如何在 langchain.js 中自定义回调处理器,过滤掉 agent 执行过程中的思考链(如 action、observation 等),仅将 response.output 的最终答案以流式方式逐 token 返回给客户端。
在使用 LangChain.js 构建基于 Agent 的流式问答服务时,一个常见痛点是:默认的 handleLLMNewToken 回调会将 LLM 生成的所有 token(包括推理过程中的中间步骤,如 "Thought:..."、"Action:..."、"Observation:...")全部推送至客户端,导致前端接收到大量非用户所需的冗余内容。而用户真正关心的,仅是最终自然语言形式的答案(即 response.output 字段)。
LangChain.js 当前(v0.1.x)尚未提供开箱即用的 FinalStreamingStdOutCallbackHandler 类似物(该功能在 Python 版本中已原生支持),因此需手动实现一个轻量级、状态感知的自定义回调处理器。
✅ 核心思路:状态机式 Token 过滤
我们通过维护一个内部状态标志(isInFinalAnswer),在 Agent 执行流程中识别“最终答案开始”的信号(通常是 Final Answer: 后缀或 response.output 确认阶段),此后才启用 token 流式输出。
以下是一个生产就绪的自定义处理器示例:
class FinalAnswerStreamingHandler {
private res: NodeJS.WritableStream;
private isInFinalAnswer = false;
private buffer = ""; // 缓冲未确认的 token,用于匹配起始标记
constructor(res: NodeJS.WritableStream) {
this.res = res;
}
handleLLMNewToken(token: string): void {
// Step 1: 检测是否进入最终答案阶段(兼容常见 Agent 输出格式)
if (!this.isInFinalAnswer) {
this.buffer += token;
// 常见触发条件(可按实际 Agent prefix 调整):
// - "Final Answer:"(Zero-shot React)
// - "Answer:"(某些自定义 agent)
// - 或更鲁棒地:等待 response.output 已确定后才开启(需结合 onAgentEnd)
if (/Final\s+Answer\s*:/i.test(this.buffer) || /Answer\s*:/i.test(this.buffer)) {
this.isInFinalAnswer = true;
// 清除前缀(如 "Final Answer: "),只流后续内容
const cleanToken = this.buffer.replace(/.*?(Final\s+Answer\s*:|Answer\s*:)\s*/i, "");
if (cleanToken) {
this.res.write(cleanToken);
}
this.buffer = "";
return;
}
return; // 仍在前导阶段,暂不输出
}
// Step 2: 已进入最终答案 → 直接流式写入
this.res.write(token);
}
// 【重要】配合 onAgentEnd 确保兜底(推荐启用)
handleAgentEnd(): void {
// 若因流式延迟导致最后 token 未 flush,此处可强制结束
this.res.write("\n");
}
}? 使用方式(集成到 Express/HTTP Server)
app.post("/chat", async (req, res) => {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
const handler = new FinalAnswerStreamingHandler(res);
const model = new ChatOpenAI({
modelName: "gpt-3.5-turbo",
temperature: 0.5,
streaming: true,
callbacks: [handler], // 注入自定义处理器
});
const executor = await initializeAgentExecutorWithOptions(
[qaTool],
model,
{
agentType: "zero-shot-react-description",
agentArgs: { prefix }, // 确保 prefix 中包含明确的 "Final Answer:" 提示
}
);
try {
const response = await executor.call({ input: req.body.prompt });
// 注意:response.output 是完整答案字符串,但流式已由 handler 分发
res.end();
} catch (err) {
console.error(err);
res.status(500).end();
}
});⚠️ 关键注意事项
-
Agent Prompt 必须规范:确保你使用的 prefix 显式要求模型以 "Final Answer:" 开头输出答案(这是 Zero-shot React 的标准约定)。例如:
... You have access to the following tools: ... Use the following format: Thought: ... Action: ... Observation: ... ... Final Answer: <your answer here>
- 不要依赖 onLLMEnd 或 onChainEnd:它们在流式过程中不保证时序,且无法访问 response.output;真正的答案边界应由 LLM 生成的 token 序列本身定义。
- 缓冲区大小控制:上述示例中 buffer 仅用于匹配起始标记,实际生产环境建议限制最大长度(如 buffer.length
- 多轮/复杂 Agent 场景:若使用 Plan-and-Execute、Self-Ask 等高级 Agent,需相应扩展 isInFinalAnswer 的检测逻辑(例如监听特定 tool 名称后的 Answer:)。
通过该方案,你将获得干净、可控的流式响应——前端接收到的每个 chunk 都是最终答案的一部分,无需二次解析或丢弃脏数据,显著提升用户体验与前端处理效率。










