流式响应实现
实现基于 AI 的流式响应,提升聊天应用的实时交互体验。
📚 学习目标
学完这篇文章后,你将能够:
- 理解流式响应(Streaming)的原理
- 掌握 Server-Sent Events (SSE) 的使用
- 学会在 Next.js 中实现流式响应
- 了解 AI 应用中的流式输出
- 掌握客户端解析流式数据的方法
前置知识
在开始学习之前,建议先阅读:
你需要了解:
- HTTP 响应基础知识
- 异步编程(async/await)
- ReadableStream API
1️⃣ 流式响应概述
1.1 什么是流式响应?
流式响应(Streaming)是一种将数据分成多个小块逐步发送给客户端的技术,而不是等到所有数据都生成完才发送。
对比:
| 特性 | 传统响应 | 流式响应 |
|---|---|---|
| 数据发送 | 一次性发送 | 分块逐步发送 |
| 等待时间 | 需等待所有数据 | 立即开始发送 |
| 用户体验 | 长时间等待 | 实时看到结果 |
| 适用场景 | 快速响应 | 耗时操作(AI 生成) |
1.2 为什么 AI 应用需要流式响应?
传统响应的问题:
- AI 生成完整文本需要 10-30 秒
- 用户需要长时间等待白屏
- 体验不好,用户可能以为卡死了
流式响应的优势:
- ✅ 用户实时看到 AI 生成的内容
- ✅ 无需长时间等待
- ✅ 更好的用户体验
- ✅ 可以随时中断(停止生成)
1.3 Server-Sent Events (SSE)
SSE 是一种基于 HTTP 的服务器推送技术,特别适合实时数据传输。
特点:
- 📡 单向通信(服务器 → 客户端)
- 🔄 长连接(持续发送数据)
- 📄 文本格式(简单易用)
- 🌐 标准 HTTP 协议(无需额外协议)
协议格式:
data: Hello\n\n
data: World\n\n
data: !\n\n2️⃣ 服务端实现
2.1 基本语法
import { NextRequest } from 'next/server';
export async function POST(request: Request) {
const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
// 模拟生成数据
const words = ['Hello', 'World', '!'];
for (const word of words) {
// 发送数据块
const data = JSON.stringify({ content: word });
controller.enqueue(encoder.encode(`data: ${data}\n\n`));
// 等待 1 秒
await new Promise((resolve) => setTimeout(resolve, 1000));
}
// 关闭流
controller.close();
},
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
});
}代码解析:
ReadableStream- 创建可读流TextEncoder- 将文本编码为字节controller.enqueue()- 发送数据块controller.close()- 关闭流- 响应头设置 SSE 格式
2.2 本项目的流式响应实现
查看聊天 API:app/api/chat/route.ts
import { NextRequest } from 'next/server';
import { chatService } from '@/app/services';
import { withAuth } from '@/app/middleware/auth';
/**
* POST /api/chat
* 流式 AI 对话接口
*/
export const POST = withAuth(async (request: NextRequest, auth) => {
try {
// 解析请求体
const body = await request.json();
const { messages, sessionId, model, tools } = body;
// 创建流式响应
const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
try {
// 调用 AI 服务(流式)
const aiStream = await chatService.streamChat({
messages,
sessionId,
model,
tools,
auth,
onChunk: (chunk) => {
// 发送 AI 生成的内容
const data = JSON.stringify({
type: 'chunk',
content: chunk,
});
controller.enqueue(encoder.encode(`data: ${data}\n\n`));
},
onToolCall: (toolCall) => {
// 发送工具调用信息
const data = JSON.stringify({
type: 'tool_call',
tool_call: toolCall,
});
controller.enqueue(encoder.encode(`data: ${data}\n\n`));
},
onEnd: () => {
// 发送结束信号
const data = JSON.stringify({ type: 'end' });
controller.enqueue(encoder.encode(`data: ${data}\n\n`));
controller.close();
},
});
} catch (error) {
// 发送错误信息
const data = JSON.stringify({
type: 'error',
error: String(error),
});
controller.enqueue(encoder.encode(`data: ${data}\n\n`));
controller.close();
}
},
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
});
} catch (error) {
return NextResponse.json(
{ error: '聊天失败', detail: String(error) },
{ status: 500 },
);
}
});代码解析:
- 解析请求体(消息、会话ID、模型、工具)
- 创建
ReadableStream - 调用
chatService.streamChat()获取 AI 流 - 通过回调函数发送不同类型的数据:
onChunk- AI 生成的内容onToolCall- 工具调用信息onEnd- 结束信号
- 设置 SSE 响应头
2.3 流式响应的数据格式
本项目使用 JSON 格式的数据块:
// AI 内容块
{
"type": "chunk",
"content": "Hello, how are you?"
}
// 工具调用块
{
"type": "tool_call",
"tool_call": {
"name": "search",
"arguments": "{\"query\": \"weather\"}"
}
}
// 工具结果块
{
"type": "tool_result",
"result": "The weather is sunny."
}
// 结束信号
{
"type": "end"
}
// 错误信号
{
"type": "error",
"error": "Something went wrong"
}3️⃣ 客户端实现
3.1 解析流式响应
'use client';
import { useState } from 'react';
export default function ChatInterface() {
const [messages, setMessages] = useState([]);
const [loading, setLoading] = useState(false);
const sendMessage = async (text: string) => {
setLoading(true);
try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
messages: [{ role: 'user', content: text }],
}),
});
if (!response.body) {
throw new Error('无法读取响应流');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let currentMessage = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
// 解码数据
const chunk = decoder.decode(value);
// 解析 SSE 格式
const lines = chunk.split('\n\n');
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.replace('data: ', '');
const parsed = JSON.parse(data);
if (parsed.type === 'chunk') {
// AI 生成的内容
currentMessage += parsed.content;
// 更新 UI
setMessages(prev => {
const newMessages = [...prev];
newMessages[newMessages.length - 1] = {
role: 'assistant',
content: currentMessage,
};
return newMessages;
});
} else if (parsed.type === 'tool_call') {
// 工具调用
console.log('工具调用:', parsed.tool_call);
} else if (parsed.type === 'end') {
// 结束
setLoading(false);
}
}
}
} catch (error) {
console.error('发送消息失败:', error);
setLoading(false);
}
};
return (
<div>
{messages.map((msg, i) => (
<div key={i}>
<strong>{msg.role}:</strong> {msg.content}
</div>
))}
<button onClick={() => sendMessage('Hello')}>
发送消息
</button>
</div>
);
}3.2 本项目的客户端实现
查看消息发送 Hook:app/hooks/useSendMessage.ts
'use client';
import { useCallback, useRef } from 'react';
import type { Message, ToolCall } from '@/app/database/messages';
export function useSendMessage({
addMessage,
updateMessage,
selectedTools,
}: SendMessageProps) {
const abortControllerRef = useRef<AbortController | null>(null);
const sendMessage = useCallback(
async (content: string) => {
// 创建 AbortController 用于中断请求
abortControllerRef.current = new AbortController();
try {
// 添加用户消息
const userMessage: Message = {
id: crypto.randomUUID(),
role: 'user',
content,
created_at: new Date().toISOString(),
};
addMessage(userMessage);
// 添加空的 AI 消息占位
const assistantMessage: Message = {
id: crypto.randomUUID(),
role: 'assistant',
content: '',
created_at: new Date().toISOString(),
};
addMessage(assistantMessage);
// 发送请求
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: [userMessage],
sessionId: currentSessionId,
tools: selectedTools,
}),
signal: abortControllerRef.current.signal,
});
if (!response.body) {
throw new Error('无法读取响应流');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let assistantContent = '';
let toolCalls: ToolCall[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n\n');
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.replace('data: ', '');
const parsed = JSON.parse(data);
if (parsed.type === 'chunk') {
// AI 生成的内容
assistantContent += parsed.content;
updateMessage(assistantMessage.id, {
content: assistantContent,
});
} else if (parsed.type === 'tool_call') {
// 工具调用
toolCalls.push(parsed.tool_call);
updateMessage(assistantMessage.id, {
tool_calls: toolCalls,
});
} else if (parsed.type === 'tool_result') {
// 工具结果
// 更新工具调用结果
const updatedToolCalls = toolCalls.map((tc) =>
tc.id === parsed.tool_call_id
? { ...tc, result: parsed.result }
: tc,
);
updateMessage(assistantMessage.id, {
tool_calls: updatedToolCalls,
});
} else if (parsed.type === 'end') {
// 结束
setLoading(false);
}
}
}
} catch (error) {
if (error.name === 'AbortError') {
console.log('消息发送已中断');
} else {
console.error('发送消息失败:', error);
}
setLoading(false);
}
},
[addMessage, updateMessage, selectedTools, currentSessionId],
);
const stopMessage = useCallback(() => {
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}
}, []);
return { sendMessage, stopMessage };
}代码解析:
- 使用
AbortController支持中断请求 - 添加用户消息和 AI 消息占位
- 使用
fetch发送 POST 请求 - 通过
response.body.getReader()获取流 - 使用
TextDecoder解码数据 - 解析 SSE 格式(
data: {...}\n\n) - 根据不同类型更新消息:
chunk- 追加 AI 内容tool_call- 添加工具调用tool_result- 更新工具结果end- 标记结束
4️⃣ SSE 协议详解
4.1 SSE 消息格式
data: message 1\n\n
data: message 2\n\n
data: message 3\n\n格式说明:
- 每条消息以
data:开头 - 每条消息以
\n\n结尾 - 可以发送 JSON 数据
4.2 字段说明
| 字段 | 说明 |
|---|---|
data | 消息内容(必需) |
event | 事件类型(可选) |
id | 消息 ID(可选) |
retry | 重连时间(毫秒,可选) |
示例:
event: message
id: 123
retry: 3000
data: Hello World\n\n4.3 客户端监听特定事件
const eventSource = new EventSource('/api/stream');
eventSource.addEventListener('message', (event) => {
console.log('消息:', event.data);
});
eventSource.addEventListener('error', (event) => {
console.error('错误:', event);
});
// 关闭连接
eventSource.close();注意:本项目不使用 EventSource,而是使用 fetch + ReadableStream,因为需要发送 POST 请求(EventSource 只支持 GET)。
5️⃣ 中断流式响应
5.1 使用 AbortController
'use client';
import { useRef } from 'react';
export function useStreaming() {
const abortControllerRef = useRef<AbortController | null>(null);
const startStreaming = async () => {
abortControllerRef.current = new AbortController();
try {
const response = await fetch('/api/chat', {
method: 'POST',
body: JSON.stringify({ message: 'Hello' }),
signal: abortControllerRef.current.signal,
});
// 处理流式响应...
} catch (error) {
if (error.name === 'AbortError') {
console.log('流已中断');
}
}
};
const stopStreaming = () => {
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}
};
return { startStreaming, stopStreaming };
}5.2 本项目的中断实现
查看 ChatInput 组件中的停止按钮:
'use client';
import { useSendMessage } from '@/app/hooks/useSendMessage';
export default function ChatInput() {
const { sendMessage, stopMessage, sending } = useSendMessage();
return (
<div>
{sending ? (
<button onClick={stopMessage}>停止生成</button>
) : (
<button onClick={() => sendMessage('Hello')}>发送</button>
)}
</div>
);
}6️⃣ 最佳实践
6.1 错误处理
try {
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
try {
const data = JSON.parse(value);
// 处理数据
} catch (parseError) {
console.error('解析数据失败:', parseError);
}
}
} catch (error) {
if (error.name === 'AbortError') {
console.log('流已中断');
} else {
console.error('流式请求失败:', error);
}
}6.2 连接超时处理
// 服务端:设置超时
const stream = new ReadableStream({
async start(controller) {
const timeout = setTimeout(() => {
controller.error(new Error('请求超时'));
controller.close();
}, 60000); // 60秒超时
// 处理流...
clearTimeout(timeout);
},
});6.3 重连机制
const connectWithRetry = async (url: string, maxRetries = 3) => {
for (let i = 0; i < maxRetries; i++) {
try {
const response = await fetch(url);
if (response.ok) return response;
} catch (error) {
console.error(`连接失败 (${i + 1}/${maxRetries}):`, error);
if (i < maxRetries - 1) {
await new Promise((resolve) => setTimeout(resolve, 1000 * (i + 1)));
}
}
}
throw new Error('连接失败');
};7️⃣ 实战案例:AI 聊天流式输出
7.1 服务端实现
文件:app/api/chat/route.ts
import { NextRequest } from 'next/server';
import { chatService } from '@/app/services';
export async function POST(request: NextRequest) {
const body = await request.json();
const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
try {
// 模拟 AI 生成
const words = ['Hello', ',', 'how', 'are', 'you', '?'];
for (const word of words) {
const data = JSON.stringify({
type: 'chunk',
content: word + ' ',
});
controller.enqueue(encoder.encode(`data: ${data}\n\n`));
await new Promise((resolve) => setTimeout(resolve, 500));
}
// 结束
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ type: 'end' })}\n\n`),
);
controller.close();
} catch (error) {
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ type: 'error', error: String(error) })}\n\n`,
),
);
controller.close();
}
},
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
});
}7.2 客户端实现
文件:components/Chat.tsx
'use client';
import { useState } from 'react';
export default function Chat() {
const [messages, setMessages] = useState([]);
const [loading, setLoading] = useState(false);
const sendMessage = async (text: string) => {
setLoading(true);
// 添加用户消息
setMessages(prev => [...prev, { role: 'user', content: text }]);
// 添加 AI 消息占位
const aiMessageIndex = messages.length;
setMessages(prev => [...prev, { role: 'assistant', content: '' }]);
try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: text }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n\n');
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = JSON.parse(line.replace('data: ', ''));
if (data.type === 'chunk') {
setMessages(prev => {
const newMessages = [...prev];
newMessages[aiMessageIndex] = {
...newMessages[aiMessageIndex],
content: newMessages[aiMessageIndex].content + data.content,
};
return newMessages;
});
} else if (data.type === 'end') {
setLoading(false);
}
}
}
} catch (error) {
console.error('发送消息失败:', error);
setLoading(false);
}
};
return (
<div>
{messages.map((msg, i) => (
<div key={i}>
<strong>{msg.role}:</strong> {msg.content}
</div>
))}
<input
type="text"
onKeyDown={(e) => {
if (e.key === 'Enter' && !loading) {
sendMessage(e.currentTarget.value);
e.currentTarget.value = '';
}
}}
disabled={loading}
/>
</div>
);
}💡 练习题
-
选择题:SSE(Server-Sent Events)的特点是什么?
- A. 双向通信
- B. 单向通信(服务器 → 客户端)
- C. 基于 WebSocket
- D. 需要额外协议
-
代码题:创建一个流式 API,每秒发送一个数字,共 10 个数字。
-
分析题:查看本项目的 app/hooks/useSendMessage.ts,说明它如何解析流式数据。
-
实践题:实现一个简单的聊天应用:
- 服务端:流式返回 AI 生成的文本
- 客户端:实时显示 AI 生成的内容
- 功能:支持中断生成
📚 参考资源
官方文档
外部资源
本项目相关文件
- app/api/chat/route.ts - 流式聊天 API
- app/hooks/useSendMessage.ts - 消息发送 Hook
- app/components/ChatInput.tsx - 聊天输入组件
✅ 总结
流式响应(Streaming):
- 分块逐步发送数据
- 减少等待时间
- 更好的用户体验
- 特别适合 AI 应用
SSE(Server-Sent Events):
- 单向通信(服务器 → 客户端)
- 基于 HTTP 协议
- 格式简单(
data: {}\n\n) - 自动重连
服务端实现:
- 使用
ReadableStream controller.enqueue()发送数据controller.close()关闭流- 设置 SSE 响应头
客户端实现:
- 使用
fetch+response.body.getReader() - 使用
TextDecoder解码数据 - 解析 SSE 格式(
\n\n分隔) - 实时更新 UI
本项目的实现:
- 服务端:
/api/chat流式 AI 生成 - 客户端:
useSendMessageHook 解析流 - 支持中断(AbortController)
- 支持多种数据类型(chunk、tool_call、end)
下一步:阅读下一篇文章《React Hooks 与导航》,学习 Next.js 的内置导航 Hooks。
登录以继续阅读
解锁完整文档、代码示例及更多高级功能。