第一阶段 · 最小可运行闭环

03 · 流式输出

把同步 JSON 响应升级成 SSE 事件流,让 assistant 回复能够边生成边显示。

课时资源

一、学习目标

本课所在阶段:第一阶段 · 最小可运行闭环

学完这一课后,你应该能够:

  • 理解为什么同步 JSON 返回不适合做 Chat Bot 的回复体验
  • 看懂 SSE(Server-Sent Events)的事件格式是什么样的
  • 明白前端如何把多个 message.delta 事件拼成同一条 assistant 消息
  • 知道这一课的核心改动发生在"传输层",而不是"消息模型层"

二、问题背景

第二课已经打通了完整链路,但用户会发现:发出问题后,页面会有一段停顿,然后整段回复一次性弹出来。这种体验和真实 Chat Bot(比如 ChatGPT)差距明显——真实产品的回复是一个字一个字地出现的。

如果这一课跳过,后续会出现几个问题:

  • 你会很难区分"同步请求 / 流式请求"这两种截然不同的范式
  • 后面的工具调用事件(tool.call)、Artifact 事件(artifact.update)也都是事件流形式,如果 SSE 基础没建立,就很难理解
  • 长文本回复下,用户体验会明显落后于产品预期

这一课解决的问题是:把第二课的"等待后一次性显示全部"改造成"边生成边显示"

三、核心概念

这一课最重要的概念是:把后端回复理解成一个持续发送的事件流,而不是单次 HTTP 响应体。

从同步到流式的关键变化

维度lesson-02(同步)lesson-03(流式)
后端返回方式NextResponse.json(...) 一次性返回new Response(ReadableStream) 持续推送
前端读取方式await response.json()response.body.getReader() 逐块读取
消息更新方式收到完整回复后一次性添加每帧 message.delta 追加到同一条消息
Agent 本身同步调用 runChat(messages)新增 streamChat 异步生成器
响应头Content-Type: application/jsonContent-Type: text/event-stream

当前实现的三个关键决策:

  1. 后端不再等 Agent 回复完毕才关闭连接,而是改用 ReadableStream 边生成边推送
  2. 前端不再等整个 response.json() 解析完,而是用 response.body.getReader() 逐块读取
  3. 每收到一个 message.delta 事件,就把新内容追加到同一条 assistant 消息上——而不是每次创建新消息

本课关键文件

四、关键代码解析

app/api/chat/route.ts - 创建 SSE 流,依次发出 message.startmessage.deltamessage.end 事件。

app/agent/chatbot.ts - 新增 streamChat 异步生成器,将完整回复切分成文本片段推送。

app/page.tsx - 使用 response.body.getReader() 逐块读取 SSE 事件流并解析。

五、整体流程

这张图里最关键的一段:message.delta 不只出现一次,而是循环 N 次。前端每收到一条就追加一次,用户看到的是同一个气泡在"生长",而不是多个气泡依次出现。

六、运行过程

1. 前端发出请求(方式和第二课相同)

sendMessage 里的 fetch('/api/chat', ...) 调用不需要改动,因为这一课的改变发生在响应侧,不在请求侧。

2. 后端创建 SSE 流而不是 JSON 响应

route.ts 创建一个 ReadableStream,并在 start 回调里依次发出三类事件:

  • message.start:告知前端这条 assistant 消息的稳定 id
  • message.delta(循环 N 次):每次推送一个文本片段
  • message.end:通知前端流结束

3. Agent 新增流式出口,原有同步入口保留

chatbot.tsrunChat 没有任何改动,新增的 streamChat 只是用它拿到完整文本后再切分推送。这说明:流式和非流式的区别在"传输层",不在"Agent 计算层"

4. SSE 格式的文本写入流

formatSse(event, data) 负责把事件名和 JSON 数据拼成 SSE 协议格式:

event: message.delta
data: {"id":"assistant-xxx","delta":"你"}

注意最后有两个换行符,这是 SSE 规范要求的事件分隔符。

5. 前端逐块读取并拼接

response.body.getReader() 返回一个读取器,每次 await reader.read() 拿到一个二进制块。TextDecoder 把每块解码成字符串后,加入 buffer,再按 \n\n 切出完整事件逐个解析。

注意:服务端会发送 message.end 事件标记流的结束,但前端当前实现中并没有显式处理这个事件——流的结束是通过 reader.read() 返回 { done: true } 来判断的。这种设计是可行的,但未来如果需要在流结束时执行额外逻辑(比如标记消息为"已完成"状态),可以添加对 message.end 的处理。

6. 追加而不是新建消息

每收到 message.deltaappendAssistantMessage 会查找已有消息列表里是否存在这个 id。如果存在就追加内容,不存在才创建新条目。这保证了用户看到的是单个气泡在增长,而不是一个个新气泡不断出现。

七、关键代码解析

关键代码 1:SSE 事件格式和 ReadableStream

function formatSse(event: string, data: unknown) {
  return `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
}

const stream = new ReadableStream({
  async start(controller) {
    controller.enqueue(encoder.encode(formatSse('message.start', { id: assistantId })));

    for await (const chunk of streamChat(messages)) {
      controller.enqueue(encoder.encode(formatSse('message.delta', { id: assistantId, delta: chunk })));
    }

    controller.enqueue(encoder.encode(formatSse('message.end', { id: assistantId, role: 'assistant' })));
    controller.close();
  },
});

代码解析:

  1. formatSse 把每条事件编码成 event: xxx\ndata: {...}\n\n 的文本格式。这是 W3C SSE 规范要求的格式,浏览器的 EventSource API 原生支持它。这一课选择手动解析 SSE 而不是直接用 EventSource,是因为 EventSource 不支持 POST 请求,而聊天接口需要携带请求体。

  2. ReadableStreamstart 回调可以是异步的,这让 for await 遍历异步生成器成为可能。controller.enqueue() 每次把一段 SSE 文本推进流里,浏览器端会立即收到,不用等后面的数据。

  3. message.start 事件在循环开始前发出,目的是让前端在第一个 message.delta 到达之前就已经知道这条 assistant 消息的 id。如果没有这一步,前端第一次收到 delta 时会创建消息,但如果 delta 非常小(只有一个字),用户会看到那一帧出现一个几乎空的气泡,体验不稳定。

  4. controller.close() 必须在最后一次 enqueue 之后调用,否则浏览器会认为连接还开着,不会触发读取器的 done: true

关键代码 2:前端如何消费 buffer 里的 SSE 事件

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let assistantId = '';

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  buffer += decoder.decode(value, { stream: true });
  const events = buffer.split('\n\n');
  buffer = events.pop() ?? '';

  for (const rawEvent of events) {
    const lines = rawEvent.split('\n');
    const eventName = lines.find((l) => l.startsWith('event:'))?.replace('event:', '').trim();
    const dataLine = lines.find((l) => l.startsWith('data:'))?.replace('data:', '').trim();
    if (!eventName || !dataLine) continue;

    const payload = JSON.parse(dataLine) as { id: string; delta?: string };

    // `message.start` 记录 assistant 消息的稳定 id
    if (eventName === 'message.start') {
      assistantId = payload.id;
    }

    // `message.delta` 追加内容,实现打字机效果
    if (eventName === 'message.delta') {
      setMessages((current) => appendAssistantMessage(current, payload.id, payload.delta ?? ''));
    }
  }
}

代码解析:

  1. assistantId 变量在 message.start 事件中被记录,用于追踪当前流对应的 assistant 消息。虽然 message.delta 的 payload 里也包含 id,但提前记录可以验证流的完整性——如果最终 assistantId 仍为空,说明服务端没有正确发送起始事件。

  2. buffer 的作用是处理"网络切包"的情况。浏览器每次 reader.read() 拿到的 value 不保证是完整的一条 SSE 事件——如果一条事件被拆成了两个包,必须把残缺部分缓存到 buffer 里,等下一次 read() 拿到的数据拼上来再解析。

  3. buffer.split('\n\n') 按 SSE 分隔符切出完整事件。events.pop() 把末尾那段可能不完整的内容留回 buffer,其余都是可以安全解析的完整事件。

  4. decoder.decode(value, { stream: true }) 里的 stream: true 告诉 TextDecoder 这不是最后一块数据,不要在解码时进行换行符清洗,保留字节级别的精确处理。去掉这个参数在大多数情况下不会有问题,但在多字节 UTF-8 字符(比如 emoji、中文)被切断时会出现乱码。

  5. setMessages((current) => appendAssistantMessage(...)) 用了回调形式而不是直接用 messages,是因为循环是异步的,如果直接用 messages 闭包内的值,快速到来的多次 delta 事件可能会基于同一个旧的 messages 做更新,导致内容丢失。回调形式始终基于最新状态。

关键代码 3:追加而不是新建消息

function appendAssistantMessage(messages: ChatMessage[], messageId: string, delta: string): ChatMessage[] {
  const existing = messages.find((m) => m.id === messageId);
  if (!existing) {
    return [...messages, { id: messageId, role: 'assistant', content: delta }];
  }
  return messages.map((m) =>
    m.id === messageId ? { ...m, content: `${m.content}${delta}` } : m
  );
}

代码解析:

  1. 这个函数的关键在于"用 id 定位,而不是用 role 或 index 定位"。如果只按 role === 'assistant' 找最后一条消息,当用户发送第二轮问题时,有可能把本轮 delta 追加到上一轮的 assistant 消息上。id 唯一性保证了追加只发生在本次流的消息上。

  2. 第一次收到某个 id 的 delta 时(existingundefined),函数会创建一条新消息。从第二次开始,每帧都进入 map 分支,找到那条消息并用字符串拼接追加内容。

  3. 这里没有用 immer 或其他不可变库,而是手动 ...m 展开,保持了实现的最小依赖。对于教学目的来说,能看清楚"找到 → 拼接 → 返回新数组"这条逻辑链路比引入更多工具更重要。

  4. 如果把这个函数去掉,直接在循环里 setMessages(prev => [...prev, { id, role: 'assistant', content: delta }]),用户会看到每收到一个词就出现一条新消息气泡,页面会爆炸式增长——这是无法做到"打字机效果"的根本原因。

八、常见问题

为什么选 SSE 而不是 WebSocket?

SSE 是单向推送(服务端 → 客户端),聊天场景里"发送一次用户消息,服务端持续推回复"这个模式天然适合 SSE。WebSocket 是双向长连接,建立成本更高,适合需要"实时双向通信"的场景,比如多人协作。对聊天应用来说,SSE 是更轻量的选择。

message.start 事件有什么用?它能不能省掉?

理论上可以省掉——前端完全可以在第一次收到某个 id 的 message.delta 时创建消息。但保留 message.start 有两个好处:一是语义更清晰("流开始了" vs "内容抵达"),二是让前端可以在首个 delta 到达前就准备好消息的"容器",避免第一个 delta 极小时出现空闪烁。参考项目中对应的是 session 事件,作用类似。

这一课的流和参考项目有什么本质区别?

本质区别在于 chunk 的来源:这一课的 chunk 来自字符串切分 + 人为延迟(setTimeout(18)),参考项目的 chunk 来自 LangGraph app.streamEvents() 监听真实 LLM 的 on_chat_model_stream 事件。但两者的前端消费结构(读取 → buffer → 解析 → 追加)几乎相同,所以学会了这一课之后,理解参考项目的流式实现会非常顺畅。

如果网络中断,流读取会怎样?

reader.read() 会在连接断开时抛出异常,直接被外层的 try/catch 捕获,追加一条错误消息。这和第二课的错误处理方式一致,保证即使出错也不会让页面进入无响应状态。

服务端发送了 message.end 事件,为什么前端没有处理?

这是一个有意的设计选择。流的结束有两种判断方式:一是等待 { done: true }(当前实现),二是处理 message.end 事件。前者更简洁,后者语义更明确。当前实现选择了前者,但服务端仍然发送 message.end 是为了保持协议的完整性,未来如果需要在流结束时执行额外逻辑(比如更新消息状态为"已完成"),可以很方便地添加对这个事件的处理。

九、练习题

  1. 解释 appendAssistantMessage 为什么要用 id 来定位消息,而不是简单地找列表里 role === 'assistant' 的最后一条。
  2. 说明 buffer 变量的作用:如果去掉 buffer 直接在每次 reader.read() 后解析,会在什么情况下出错?
  3. 如果把 message.start 事件去掉,前端代码需要做什么调整才能维持现有功能?
  4. 打开 app/agent/chatbot.ts,修改 streamChat 里的延迟从 18ms 改为 100ms,重新运行并观察打字机效果的变化速度。再试试把 response.split(/(\s+)/) 改为 response.split('')(按单个字符切分),感受 chunk 粒度对流式体验的影响。

十、总结

这一课真正建立的是:把 Chat Bot 的回复从"一次性弹出"升级成"逐字显示"的能力基础。

只要你已经能说清楚 SSE 事件是怎样从后端一条条推过来、前端如何用 buffer 拼出完整事件、appendAssistantMessage 为什么是追加而不是新建,这一课就掌握了。

和参考项目相比,这一课用教学版切分代替了真实 streamEvents 事件,但前端消费结构已经和参考项目对齐。后续课程会在这条事件流上陆续叠加工具调用事件和模型事件。

登录以继续阅读

解锁完整文档、代码示例及更多高级功能。

立即登录

On this page