03 · 流式输出
把同步 JSON 响应升级成 SSE 事件流,让 assistant 回复能够边生成边显示。
课时资源
一、学习目标
本课所在阶段:第一阶段 · 最小可运行闭环。
学完这一课后,你应该能够:
- 理解为什么同步 JSON 返回不适合做 Chat Bot 的回复体验
- 看懂 SSE(Server-Sent Events)的事件格式是什么样的
- 明白前端如何把多个
message.delta事件拼成同一条 assistant 消息 - 知道这一课的核心改动发生在"传输层",而不是"消息模型层"
二、问题背景
第二课已经打通了完整链路,但用户会发现:发出问题后,页面会有一段停顿,然后整段回复一次性弹出来。这种体验和真实 Chat Bot(比如 ChatGPT)差距明显——真实产品的回复是一个字一个字地出现的。
如果这一课跳过,后续会出现几个问题:
- 你会很难区分"同步请求 / 流式请求"这两种截然不同的范式
- 后面的工具调用事件(
tool.call)、Artifact 事件(artifact.update)也都是事件流形式,如果 SSE 基础没建立,就很难理解 - 长文本回复下,用户体验会明显落后于产品预期
这一课解决的问题是:把第二课的"等待后一次性显示全部"改造成"边生成边显示"。
三、核心概念
这一课最重要的概念是:把后端回复理解成一个持续发送的事件流,而不是单次 HTTP 响应体。
从同步到流式的关键变化
| 维度 | lesson-02(同步) | lesson-03(流式) |
|---|---|---|
| 后端返回方式 | NextResponse.json(...) 一次性返回 | new Response(ReadableStream) 持续推送 |
| 前端读取方式 | await response.json() | response.body.getReader() 逐块读取 |
| 消息更新方式 | 收到完整回复后一次性添加 | 每帧 message.delta 追加到同一条消息 |
| Agent 本身 | 同步调用 runChat(messages) | 新增 streamChat 异步生成器 |
| 响应头 | Content-Type: application/json | Content-Type: text/event-stream |
当前实现的三个关键决策:
- 后端不再等 Agent 回复完毕才关闭连接,而是改用
ReadableStream边生成边推送 - 前端不再等整个
response.json()解析完,而是用response.body.getReader()逐块读取 - 每收到一个
message.delta事件,就把新内容追加到同一条 assistant 消息上——而不是每次创建新消息
本课关键文件
四、关键代码解析
app/api/chat/route.ts - 创建 SSE 流,依次发出 message.start、message.delta 和 message.end 事件。
app/agent/chatbot.ts - 新增 streamChat 异步生成器,将完整回复切分成文本片段推送。
app/page.tsx - 使用 response.body.getReader() 逐块读取 SSE 事件流并解析。
五、整体流程
这张图里最关键的一段:message.delta 不只出现一次,而是循环 N 次。前端每收到一条就追加一次,用户看到的是同一个气泡在"生长",而不是多个气泡依次出现。
六、运行过程
1. 前端发出请求(方式和第二课相同)
sendMessage 里的 fetch('/api/chat', ...) 调用不需要改动,因为这一课的改变发生在响应侧,不在请求侧。
2. 后端创建 SSE 流而不是 JSON 响应
route.ts 创建一个 ReadableStream,并在 start 回调里依次发出三类事件:
message.start:告知前端这条 assistant 消息的稳定 idmessage.delta(循环 N 次):每次推送一个文本片段message.end:通知前端流结束
3. Agent 新增流式出口,原有同步入口保留
chatbot.ts 里 runChat 没有任何改动,新增的 streamChat 只是用它拿到完整文本后再切分推送。这说明:流式和非流式的区别在"传输层",不在"Agent 计算层"。
4. SSE 格式的文本写入流
formatSse(event, data) 负责把事件名和 JSON 数据拼成 SSE 协议格式:
event: message.delta
data: {"id":"assistant-xxx","delta":"你"}注意最后有两个换行符,这是 SSE 规范要求的事件分隔符。
5. 前端逐块读取并拼接
response.body.getReader() 返回一个读取器,每次 await reader.read() 拿到一个二进制块。TextDecoder 把每块解码成字符串后,加入 buffer,再按 \n\n 切出完整事件逐个解析。
注意:服务端会发送 message.end 事件标记流的结束,但前端当前实现中并没有显式处理这个事件——流的结束是通过 reader.read() 返回 { done: true } 来判断的。这种设计是可行的,但未来如果需要在流结束时执行额外逻辑(比如标记消息为"已完成"状态),可以添加对 message.end 的处理。
6. 追加而不是新建消息
每收到 message.delta,appendAssistantMessage 会查找已有消息列表里是否存在这个 id。如果存在就追加内容,不存在才创建新条目。这保证了用户看到的是单个气泡在增长,而不是一个个新气泡不断出现。
七、关键代码解析
关键代码 1:SSE 事件格式和 ReadableStream
function formatSse(event: string, data: unknown) {
return `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
}
const stream = new ReadableStream({
async start(controller) {
controller.enqueue(encoder.encode(formatSse('message.start', { id: assistantId })));
for await (const chunk of streamChat(messages)) {
controller.enqueue(encoder.encode(formatSse('message.delta', { id: assistantId, delta: chunk })));
}
controller.enqueue(encoder.encode(formatSse('message.end', { id: assistantId, role: 'assistant' })));
controller.close();
},
});代码解析:
-
formatSse把每条事件编码成event: xxx\ndata: {...}\n\n的文本格式。这是 W3C SSE 规范要求的格式,浏览器的EventSourceAPI 原生支持它。这一课选择手动解析 SSE 而不是直接用EventSource,是因为EventSource不支持 POST 请求,而聊天接口需要携带请求体。 -
ReadableStream的start回调可以是异步的,这让for await遍历异步生成器成为可能。controller.enqueue()每次把一段 SSE 文本推进流里,浏览器端会立即收到,不用等后面的数据。 -
message.start事件在循环开始前发出,目的是让前端在第一个message.delta到达之前就已经知道这条 assistant 消息的 id。如果没有这一步,前端第一次收到 delta 时会创建消息,但如果 delta 非常小(只有一个字),用户会看到那一帧出现一个几乎空的气泡,体验不稳定。 -
controller.close()必须在最后一次enqueue之后调用,否则浏览器会认为连接还开着,不会触发读取器的done: true。
关键代码 2:前端如何消费 buffer 里的 SSE 事件
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let assistantId = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const events = buffer.split('\n\n');
buffer = events.pop() ?? '';
for (const rawEvent of events) {
const lines = rawEvent.split('\n');
const eventName = lines.find((l) => l.startsWith('event:'))?.replace('event:', '').trim();
const dataLine = lines.find((l) => l.startsWith('data:'))?.replace('data:', '').trim();
if (!eventName || !dataLine) continue;
const payload = JSON.parse(dataLine) as { id: string; delta?: string };
// `message.start` 记录 assistant 消息的稳定 id
if (eventName === 'message.start') {
assistantId = payload.id;
}
// `message.delta` 追加内容,实现打字机效果
if (eventName === 'message.delta') {
setMessages((current) => appendAssistantMessage(current, payload.id, payload.delta ?? ''));
}
}
}代码解析:
-
assistantId变量在message.start事件中被记录,用于追踪当前流对应的 assistant 消息。虽然message.delta的 payload 里也包含id,但提前记录可以验证流的完整性——如果最终assistantId仍为空,说明服务端没有正确发送起始事件。 -
buffer的作用是处理"网络切包"的情况。浏览器每次reader.read()拿到的value不保证是完整的一条 SSE 事件——如果一条事件被拆成了两个包,必须把残缺部分缓存到buffer里,等下一次read()拿到的数据拼上来再解析。 -
buffer.split('\n\n')按 SSE 分隔符切出完整事件。events.pop()把末尾那段可能不完整的内容留回 buffer,其余都是可以安全解析的完整事件。 -
decoder.decode(value, { stream: true })里的stream: true告诉TextDecoder这不是最后一块数据,不要在解码时进行换行符清洗,保留字节级别的精确处理。去掉这个参数在大多数情况下不会有问题,但在多字节 UTF-8 字符(比如 emoji、中文)被切断时会出现乱码。 -
setMessages((current) => appendAssistantMessage(...))用了回调形式而不是直接用messages,是因为循环是异步的,如果直接用messages闭包内的值,快速到来的多次 delta 事件可能会基于同一个旧的messages做更新,导致内容丢失。回调形式始终基于最新状态。
关键代码 3:追加而不是新建消息
function appendAssistantMessage(messages: ChatMessage[], messageId: string, delta: string): ChatMessage[] {
const existing = messages.find((m) => m.id === messageId);
if (!existing) {
return [...messages, { id: messageId, role: 'assistant', content: delta }];
}
return messages.map((m) =>
m.id === messageId ? { ...m, content: `${m.content}${delta}` } : m
);
}代码解析:
-
这个函数的关键在于"用 id 定位,而不是用 role 或 index 定位"。如果只按
role === 'assistant'找最后一条消息,当用户发送第二轮问题时,有可能把本轮 delta 追加到上一轮的 assistant 消息上。id 唯一性保证了追加只发生在本次流的消息上。 -
第一次收到某个 id 的 delta 时(
existing为undefined),函数会创建一条新消息。从第二次开始,每帧都进入map分支,找到那条消息并用字符串拼接追加内容。 -
这里没有用
immer或其他不可变库,而是手动...m展开,保持了实现的最小依赖。对于教学目的来说,能看清楚"找到 → 拼接 → 返回新数组"这条逻辑链路比引入更多工具更重要。 -
如果把这个函数去掉,直接在循环里
setMessages(prev => [...prev, { id, role: 'assistant', content: delta }]),用户会看到每收到一个词就出现一条新消息气泡,页面会爆炸式增长——这是无法做到"打字机效果"的根本原因。
八、常见问题
为什么选 SSE 而不是 WebSocket?
SSE 是单向推送(服务端 → 客户端),聊天场景里"发送一次用户消息,服务端持续推回复"这个模式天然适合 SSE。WebSocket 是双向长连接,建立成本更高,适合需要"实时双向通信"的场景,比如多人协作。对聊天应用来说,SSE 是更轻量的选择。
message.start 事件有什么用?它能不能省掉?
理论上可以省掉——前端完全可以在第一次收到某个 id 的 message.delta 时创建消息。但保留 message.start 有两个好处:一是语义更清晰("流开始了" vs "内容抵达"),二是让前端可以在首个 delta 到达前就准备好消息的"容器",避免第一个 delta 极小时出现空闪烁。参考项目中对应的是 session 事件,作用类似。
这一课的流和参考项目有什么本质区别?
本质区别在于 chunk 的来源:这一课的 chunk 来自字符串切分 + 人为延迟(setTimeout(18)),参考项目的 chunk 来自 LangGraph app.streamEvents() 监听真实 LLM 的 on_chat_model_stream 事件。但两者的前端消费结构(读取 → buffer → 解析 → 追加)几乎相同,所以学会了这一课之后,理解参考项目的流式实现会非常顺畅。
如果网络中断,流读取会怎样?
reader.read() 会在连接断开时抛出异常,直接被外层的 try/catch 捕获,追加一条错误消息。这和第二课的错误处理方式一致,保证即使出错也不会让页面进入无响应状态。
服务端发送了 message.end 事件,为什么前端没有处理?
这是一个有意的设计选择。流的结束有两种判断方式:一是等待 { done: true }(当前实现),二是处理 message.end 事件。前者更简洁,后者语义更明确。当前实现选择了前者,但服务端仍然发送 message.end 是为了保持协议的完整性,未来如果需要在流结束时执行额外逻辑(比如更新消息状态为"已完成"),可以很方便地添加对这个事件的处理。
九、练习题
- 解释
appendAssistantMessage为什么要用id来定位消息,而不是简单地找列表里role === 'assistant'的最后一条。 - 说明
buffer变量的作用:如果去掉 buffer 直接在每次reader.read()后解析,会在什么情况下出错? - 如果把
message.start事件去掉,前端代码需要做什么调整才能维持现有功能? - 打开
app/agent/chatbot.ts,修改streamChat里的延迟从 18ms 改为 100ms,重新运行并观察打字机效果的变化速度。再试试把response.split(/(\s+)/)改为response.split('')(按单个字符切分),感受 chunk 粒度对流式体验的影响。
十、总结
这一课真正建立的是:把 Chat Bot 的回复从"一次性弹出"升级成"逐字显示"的能力基础。
只要你已经能说清楚 SSE 事件是怎样从后端一条条推过来、前端如何用 buffer 拼出完整事件、appendAssistantMessage 为什么是追加而不是新建,这一课就掌握了。
和参考项目相比,这一课用教学版切分代替了真实 streamEvents 事件,但前端消费结构已经和参考项目对齐。后续课程会在这条事件流上陆续叠加工具调用事件和模型事件。
登录以继续阅读
解锁完整文档、代码示例及更多高级功能。