流式响应实现

实现基于 AI 的流式响应,提升聊天应用的实时交互体验。

📚 学习目标

学完这篇文章后,你将能够:

  • 理解流式响应(Streaming)的原理
  • 掌握 Server-Sent Events (SSE) 的使用
  • 学会在 Next.js 中实现流式响应
  • 了解 AI 应用中的流式输出
  • 掌握客户端解析流式数据的方法

前置知识

在开始学习之前,建议先阅读:

你需要了解:

  • HTTP 响应基础知识
  • 异步编程(async/await)
  • ReadableStream API

1️⃣ 流式响应概述

1.1 什么是流式响应?

流式响应(Streaming)是一种将数据分成多个小块逐步发送给客户端的技术,而不是等到所有数据都生成完才发送。

对比

特性传统响应流式响应
数据发送一次性发送分块逐步发送
等待时间需等待所有数据立即开始发送
用户体验长时间等待实时看到结果
适用场景快速响应耗时操作(AI 生成)

1.2 为什么 AI 应用需要流式响应?

传统响应的问题

  • AI 生成完整文本需要 10-30 秒
  • 用户需要长时间等待白屏
  • 体验不好,用户可能以为卡死了

流式响应的优势

  • ✅ 用户实时看到 AI 生成的内容
  • ✅ 无需长时间等待
  • ✅ 更好的用户体验
  • ✅ 可以随时中断(停止生成)

1.3 Server-Sent Events (SSE)

SSE 是一种基于 HTTP 的服务器推送技术,特别适合实时数据传输。

特点

  • 📡 单向通信(服务器 → 客户端)
  • 🔄 长连接(持续发送数据)
  • 📄 文本格式(简单易用)
  • 🌐 标准 HTTP 协议(无需额外协议)

协议格式

data: Hello\n\n
data: World\n\n
data: !\n\n

2️⃣ 服务端实现

2.1 基本语法

import { NextRequest } from 'next/server';

export async function POST(request: Request) {
  const stream = new ReadableStream({
    async start(controller) {
      const encoder = new TextEncoder();

      // 模拟生成数据
      const words = ['Hello', 'World', '!'];

      for (const word of words) {
        // 发送数据块
        const data = JSON.stringify({ content: word });
        controller.enqueue(encoder.encode(`data: ${data}\n\n`));

        // 等待 1 秒
        await new Promise((resolve) => setTimeout(resolve, 1000));
      }

      // 关闭流
      controller.close();
    },
  });

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      Connection: 'keep-alive',
    },
  });
}

代码解析

  1. ReadableStream - 创建可读流
  2. TextEncoder - 将文本编码为字节
  3. controller.enqueue() - 发送数据块
  4. controller.close() - 关闭流
  5. 响应头设置 SSE 格式

2.2 本项目的流式响应实现

查看聊天 API:app/api/chat/route.ts

import { NextRequest } from 'next/server';
import { chatService } from '@/app/services';
import { withAuth } from '@/app/middleware/auth';

/**
 * POST /api/chat
 * 流式 AI 对话接口
 */
export const POST = withAuth(async (request: NextRequest, auth) => {
  try {
    // 解析请求体
    const body = await request.json();
    const { messages, sessionId, model, tools } = body;

    // 创建流式响应
    const stream = new ReadableStream({
      async start(controller) {
        const encoder = new TextEncoder();

        try {
          // 调用 AI 服务(流式)
          const aiStream = await chatService.streamChat({
            messages,
            sessionId,
            model,
            tools,
            auth,
            onChunk: (chunk) => {
              // 发送 AI 生成的内容
              const data = JSON.stringify({
                type: 'chunk',
                content: chunk,
              });
              controller.enqueue(encoder.encode(`data: ${data}\n\n`));
            },
            onToolCall: (toolCall) => {
              // 发送工具调用信息
              const data = JSON.stringify({
                type: 'tool_call',
                tool_call: toolCall,
              });
              controller.enqueue(encoder.encode(`data: ${data}\n\n`));
            },
            onEnd: () => {
              // 发送结束信号
              const data = JSON.stringify({ type: 'end' });
              controller.enqueue(encoder.encode(`data: ${data}\n\n`));
              controller.close();
            },
          });
        } catch (error) {
          // 发送错误信息
          const data = JSON.stringify({
            type: 'error',
            error: String(error),
          });
          controller.enqueue(encoder.encode(`data: ${data}\n\n`));
          controller.close();
        }
      },
    });

    return new Response(stream, {
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        Connection: 'keep-alive',
      },
    });
  } catch (error) {
    return NextResponse.json(
      { error: '聊天失败', detail: String(error) },
      { status: 500 },
    );
  }
});

代码解析

  1. 解析请求体(消息、会话ID、模型、工具)
  2. 创建 ReadableStream
  3. 调用 chatService.streamChat() 获取 AI 流
  4. 通过回调函数发送不同类型的数据:
    • onChunk - AI 生成的内容
    • onToolCall - 工具调用信息
    • onEnd - 结束信号
  5. 设置 SSE 响应头

2.3 流式响应的数据格式

本项目使用 JSON 格式的数据块:

// AI 内容块
{
  "type": "chunk",
  "content": "Hello, how are you?"
}

// 工具调用块
{
  "type": "tool_call",
  "tool_call": {
    "name": "search",
    "arguments": "{\"query\": \"weather\"}"
  }
}

// 工具结果块
{
  "type": "tool_result",
  "result": "The weather is sunny."
}

// 结束信号
{
  "type": "end"
}

// 错误信号
{
  "type": "error",
  "error": "Something went wrong"
}

3️⃣ 客户端实现

3.1 解析流式响应

'use client';

import { useState } from 'react';

export default function ChatInterface() {
  const [messages, setMessages] = useState([]);
  const [loading, setLoading] = useState(false);

  const sendMessage = async (text: string) => {
    setLoading(true);

    try {
      const response = await fetch('/api/chat', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify({
          messages: [{ role: 'user', content: text }],
        }),
      });

      if (!response.body) {
        throw new Error('无法读取响应流');
      }

      const reader = response.body.getReader();
      const decoder = new TextDecoder();

      let currentMessage = '';

      while (true) {
        const { done, value } = await reader.read();

        if (done) break;

        // 解码数据
        const chunk = decoder.decode(value);

        // 解析 SSE 格式
        const lines = chunk.split('\n\n');

        for (const line of lines) {
          if (!line.startsWith('data: ')) continue;

          const data = line.replace('data: ', '');
          const parsed = JSON.parse(data);

          if (parsed.type === 'chunk') {
            // AI 生成的内容
            currentMessage += parsed.content;
            // 更新 UI
            setMessages(prev => {
              const newMessages = [...prev];
              newMessages[newMessages.length - 1] = {
                role: 'assistant',
                content: currentMessage,
              };
              return newMessages;
            });
          } else if (parsed.type === 'tool_call') {
            // 工具调用
            console.log('工具调用:', parsed.tool_call);
          } else if (parsed.type === 'end') {
            // 结束
            setLoading(false);
          }
        }
      }
    } catch (error) {
      console.error('发送消息失败:', error);
      setLoading(false);
    }
  };

  return (
    <div>
      {messages.map((msg, i) => (
        <div key={i}>
          <strong>{msg.role}:</strong> {msg.content}
        </div>
      ))}
      <button onClick={() => sendMessage('Hello')}>
        发送消息
      </button>
    </div>
  );
}

3.2 本项目的客户端实现

查看消息发送 Hook:app/hooks/useSendMessage.ts

'use client';

import { useCallback, useRef } from 'react';
import type { Message, ToolCall } from '@/app/database/messages';

export function useSendMessage({
  addMessage,
  updateMessage,
  selectedTools,
}: SendMessageProps) {
  const abortControllerRef = useRef<AbortController | null>(null);

  const sendMessage = useCallback(
    async (content: string) => {
      // 创建 AbortController 用于中断请求
      abortControllerRef.current = new AbortController();

      try {
        // 添加用户消息
        const userMessage: Message = {
          id: crypto.randomUUID(),
          role: 'user',
          content,
          created_at: new Date().toISOString(),
        };
        addMessage(userMessage);

        // 添加空的 AI 消息占位
        const assistantMessage: Message = {
          id: crypto.randomUUID(),
          role: 'assistant',
          content: '',
          created_at: new Date().toISOString(),
        };
        addMessage(assistantMessage);

        // 发送请求
        const response = await fetch('/api/chat', {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify({
            messages: [userMessage],
            sessionId: currentSessionId,
            tools: selectedTools,
          }),
          signal: abortControllerRef.current.signal,
        });

        if (!response.body) {
          throw new Error('无法读取响应流');
        }

        const reader = response.body.getReader();
        const decoder = new TextDecoder();

        let assistantContent = '';
        let toolCalls: ToolCall[] = [];

        while (true) {
          const { done, value } = await reader.read();

          if (done) break;

          const chunk = decoder.decode(value);
          const lines = chunk.split('\n\n');

          for (const line of lines) {
            if (!line.startsWith('data: ')) continue;

            const data = line.replace('data: ', '');
            const parsed = JSON.parse(data);

            if (parsed.type === 'chunk') {
              // AI 生成的内容
              assistantContent += parsed.content;
              updateMessage(assistantMessage.id, {
                content: assistantContent,
              });
            } else if (parsed.type === 'tool_call') {
              // 工具调用
              toolCalls.push(parsed.tool_call);
              updateMessage(assistantMessage.id, {
                tool_calls: toolCalls,
              });
            } else if (parsed.type === 'tool_result') {
              // 工具结果
              // 更新工具调用结果
              const updatedToolCalls = toolCalls.map((tc) =>
                tc.id === parsed.tool_call_id
                  ? { ...tc, result: parsed.result }
                  : tc,
              );
              updateMessage(assistantMessage.id, {
                tool_calls: updatedToolCalls,
              });
            } else if (parsed.type === 'end') {
              // 结束
              setLoading(false);
            }
          }
        }
      } catch (error) {
        if (error.name === 'AbortError') {
          console.log('消息发送已中断');
        } else {
          console.error('发送消息失败:', error);
        }
        setLoading(false);
      }
    },
    [addMessage, updateMessage, selectedTools, currentSessionId],
  );

  const stopMessage = useCallback(() => {
    if (abortControllerRef.current) {
      abortControllerRef.current.abort();
    }
  }, []);

  return { sendMessage, stopMessage };
}

代码解析

  1. 使用 AbortController 支持中断请求
  2. 添加用户消息和 AI 消息占位
  3. 使用 fetch 发送 POST 请求
  4. 通过 response.body.getReader() 获取流
  5. 使用 TextDecoder 解码数据
  6. 解析 SSE 格式(data: {...}\n\n
  7. 根据不同类型更新消息:
    • chunk - 追加 AI 内容
    • tool_call - 添加工具调用
    • tool_result - 更新工具结果
    • end - 标记结束

4️⃣ SSE 协议详解

4.1 SSE 消息格式

data: message 1\n\n
data: message 2\n\n
data: message 3\n\n

格式说明

  • 每条消息以 data: 开头
  • 每条消息以 \n\n 结尾
  • 可以发送 JSON 数据

4.2 字段说明

字段说明
data消息内容(必需)
event事件类型(可选)
id消息 ID(可选)
retry重连时间(毫秒,可选)

示例

event: message
id: 123
retry: 3000
data: Hello World\n\n

4.3 客户端监听特定事件

const eventSource = new EventSource('/api/stream');

eventSource.addEventListener('message', (event) => {
  console.log('消息:', event.data);
});

eventSource.addEventListener('error', (event) => {
  console.error('错误:', event);
});

// 关闭连接
eventSource.close();

注意:本项目不使用 EventSource,而是使用 fetch + ReadableStream,因为需要发送 POST 请求(EventSource 只支持 GET)。


5️⃣ 中断流式响应

5.1 使用 AbortController

'use client';

import { useRef } from 'react';

export function useStreaming() {
  const abortControllerRef = useRef<AbortController | null>(null);

  const startStreaming = async () => {
    abortControllerRef.current = new AbortController();

    try {
      const response = await fetch('/api/chat', {
        method: 'POST',
        body: JSON.stringify({ message: 'Hello' }),
        signal: abortControllerRef.current.signal,
      });

      // 处理流式响应...
    } catch (error) {
      if (error.name === 'AbortError') {
        console.log('流已中断');
      }
    }
  };

  const stopStreaming = () => {
    if (abortControllerRef.current) {
      abortControllerRef.current.abort();
    }
  };

  return { startStreaming, stopStreaming };
}

5.2 本项目的中断实现

查看 ChatInput 组件中的停止按钮:

'use client';

import { useSendMessage } from '@/app/hooks/useSendMessage';

export default function ChatInput() {
  const { sendMessage, stopMessage, sending } = useSendMessage();

  return (
    <div>
      {sending ? (
        <button onClick={stopMessage}>停止生成</button>
      ) : (
        <button onClick={() => sendMessage('Hello')}>发送</button>
      )}
    </div>
  );
}

6️⃣ 最佳实践

6.1 错误处理

try {
  const reader = response.body.getReader();

  while (true) {
    const { done, value } = await reader.read();

    if (done) break;

    try {
      const data = JSON.parse(value);
      // 处理数据
    } catch (parseError) {
      console.error('解析数据失败:', parseError);
    }
  }
} catch (error) {
  if (error.name === 'AbortError') {
    console.log('流已中断');
  } else {
    console.error('流式请求失败:', error);
  }
}

6.2 连接超时处理

// 服务端:设置超时
const stream = new ReadableStream({
  async start(controller) {
    const timeout = setTimeout(() => {
      controller.error(new Error('请求超时'));
      controller.close();
    }, 60000); // 60秒超时

    // 处理流...

    clearTimeout(timeout);
  },
});

6.3 重连机制

const connectWithRetry = async (url: string, maxRetries = 3) => {
  for (let i = 0; i < maxRetries; i++) {
    try {
      const response = await fetch(url);
      if (response.ok) return response;
    } catch (error) {
      console.error(`连接失败 (${i + 1}/${maxRetries}):`, error);
      if (i < maxRetries - 1) {
        await new Promise((resolve) => setTimeout(resolve, 1000 * (i + 1)));
      }
    }
  }
  throw new Error('连接失败');
};

7️⃣ 实战案例:AI 聊天流式输出

7.1 服务端实现

文件:app/api/chat/route.ts

import { NextRequest } from 'next/server';
import { chatService } from '@/app/services';

export async function POST(request: NextRequest) {
  const body = await request.json();

  const stream = new ReadableStream({
    async start(controller) {
      const encoder = new TextEncoder();

      try {
        // 模拟 AI 生成
        const words = ['Hello', ',', 'how', 'are', 'you', '?'];

        for (const word of words) {
          const data = JSON.stringify({
            type: 'chunk',
            content: word + ' ',
          });
          controller.enqueue(encoder.encode(`data: ${data}\n\n`));

          await new Promise((resolve) => setTimeout(resolve, 500));
        }

        // 结束
        controller.enqueue(
          encoder.encode(`data: ${JSON.stringify({ type: 'end' })}\n\n`),
        );
        controller.close();
      } catch (error) {
        controller.enqueue(
          encoder.encode(
            `data: ${JSON.stringify({ type: 'error', error: String(error) })}\n\n`,
          ),
        );
        controller.close();
      }
    },
  });

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      Connection: 'keep-alive',
    },
  });
}

7.2 客户端实现

文件:components/Chat.tsx

'use client';

import { useState } from 'react';

export default function Chat() {
  const [messages, setMessages] = useState([]);
  const [loading, setLoading] = useState(false);

  const sendMessage = async (text: string) => {
    setLoading(true);

    // 添加用户消息
    setMessages(prev => [...prev, { role: 'user', content: text }]);

    // 添加 AI 消息占位
    const aiMessageIndex = messages.length;
    setMessages(prev => [...prev, { role: 'assistant', content: '' }]);

    try {
      const response = await fetch('/api/chat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ message: text }),
      });

      const reader = response.body.getReader();
      const decoder = new TextDecoder();

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n\n');

        for (const line of lines) {
          if (!line.startsWith('data: ')) continue;

          const data = JSON.parse(line.replace('data: ', ''));

          if (data.type === 'chunk') {
            setMessages(prev => {
              const newMessages = [...prev];
              newMessages[aiMessageIndex] = {
                ...newMessages[aiMessageIndex],
                content: newMessages[aiMessageIndex].content + data.content,
              };
              return newMessages;
            });
          } else if (data.type === 'end') {
            setLoading(false);
          }
        }
      }
    } catch (error) {
      console.error('发送消息失败:', error);
      setLoading(false);
    }
  };

  return (
    <div>
      {messages.map((msg, i) => (
        <div key={i}>
          <strong>{msg.role}:</strong> {msg.content}
        </div>
      ))}
      <input
        type="text"
        onKeyDown={(e) => {
          if (e.key === 'Enter' && !loading) {
            sendMessage(e.currentTarget.value);
            e.currentTarget.value = '';
          }
        }}
        disabled={loading}
      />
    </div>
  );
}

💡 练习题

  1. 选择题:SSE(Server-Sent Events)的特点是什么?

    • A. 双向通信
    • B. 单向通信(服务器 → 客户端)
    • C. 基于 WebSocket
    • D. 需要额外协议
  2. 代码题:创建一个流式 API,每秒发送一个数字,共 10 个数字。

  3. 分析题:查看本项目的 app/hooks/useSendMessage.ts,说明它如何解析流式数据。

  4. 实践题:实现一个简单的聊天应用:

    • 服务端:流式返回 AI 生成的文本
    • 客户端:实时显示 AI 生成的内容
    • 功能:支持中断生成

📚 参考资源

官方文档

外部资源

本项目相关文件


✅ 总结

流式响应(Streaming)

  • 分块逐步发送数据
  • 减少等待时间
  • 更好的用户体验
  • 特别适合 AI 应用

SSE(Server-Sent Events)

  • 单向通信(服务器 → 客户端)
  • 基于 HTTP 协议
  • 格式简单(data: {}\n\n
  • 自动重连

服务端实现

  • 使用 ReadableStream
  • controller.enqueue() 发送数据
  • controller.close() 关闭流
  • 设置 SSE 响应头

客户端实现

  • 使用 fetch + response.body.getReader()
  • 使用 TextDecoder 解码数据
  • 解析 SSE 格式(\n\n 分隔)
  • 实时更新 UI

本项目的实现

  • 服务端:/api/chat 流式 AI 生成
  • 客户端:useSendMessage Hook 解析流
  • 支持中断(AbortController)
  • 支持多种数据类型(chunk、tool_call、end)

下一步:阅读下一篇文章《React Hooks 与导航》,学习 Next.js 的内置导航 Hooks。

登录以继续阅读

解锁完整文档、代码示例及更多高级功能。

立即登录

On this page