架构模式

并行处理

利用 LangGraph 实现任务的并行执行和 Map-Reduce 模式

📚 学习目标

学完这篇文章后,你将能够:

  • 在图结构中设计并行执行的分支
  • 实现 Map-Reduce 模式来处理批量数据
  • 理解 LangGraph 的并行执行原理与限制

前置知识

在开始学习之前,建议先阅读:

你需要了解:

  • JavaScript Promise.all 的概念

1. 隐式并行 (Fan-out)

在 LangGraph 中,实现并行非常简单:让一个节点同时指向多个后续节点

代码实现

只需添加多条边即可。

graph
  .addEdge('node_a', 'node_b') // A -> B
  .addEdge('node_a', 'node_c'); // A -> C

node_a 完成后,node_bnode_c 会同时被调度执行。

状态合并 (Fan-in)

当 B 和 C 都完成后,D 才会执行。 注意:B 和 C 必须同时更新状态的不同字段,或者目标字段有合适的 Reducer(如数组追加),否则会发生覆盖冲突。


2. 核心概念:超级步骤(Super-step)

你可以把 LangGraph 的一次执行想象成"分波次(waves)"推进:

  • 同一波次(同一 super-step):可以并行跑多个节点
  • 下一波次:要等这一波全部完成,然后合并状态再进入下一波

这也是为什么 reducer 很关键:并行节点写同一个字段时,合并策略决定了最终结果。


3. 并行 vs 串行:什么时候该并行?

场景并行价值推荐策略
多数据源检索Fan-out + Fan-in 汇总
依赖前一步结果保持串行
多角度分析并行 + 合并结论

判断标准:如果各分支不依赖彼此结果,并行能带来显著收益;反之保持串行更稳定。


4. 并行任务的可观测性

可以用流式事件观察每个分支的执行顺序:

const stream = app.streamEvents({ query: 'LangGraph' }, { version: 'v2' });

for await (const event of stream) {
  console.log(event.event, event.name);
}

说明:对于并行系统,观测执行顺序和状态合并尤为重要。


5. Map-Reduce 模式

处理数组列表时的经典模式:对列表中的每一项执行相同的操作(Map),然后汇总结果(Reduce)。

LangGraph 提供了 Send API 来实现动态分支。

使用 Send API

import { Send, END } from '@langchain/langgraph';

// 1. Map 逻辑(在 Router 中使用)
const mapNode = (state) => {
  return state.items.map(
    (item) => new Send('processor_node', { content: item }),
  );
};

// 2. Processor Node
const processorNode = (state) => {
  return { results: [state.content.toUpperCase()] };
};

// 3. 构建图
graph
  .addNode('processor_node', processorNode)
  .addConditionalEdges(START, mapNode); // 动态生成 Send 对象

关键点new Send(targetNode, state) 创建了一个并行任务分支。

Map-Reduce 执行流程


6. Reducer 设计:并行结果怎么合并?

并行场景最常见的 3 种 reducer:

目标字段推荐 reducer场景
数组累积concat并行产生多个结果条目
字典合并{...a, ...b}并行写不同 key 的结果
覆盖写入(_a, b) => b你明确只需要最后一次结果

下面给一个"并行检索 + 合并结果"的状态定义:

import { Annotation } from '@langchain/langgraph';

const State = Annotation.Root({
  query: Annotation<string>(),
  hits: Annotation<Record<string, string[]>>({
    reducer: (a, b) => ({ ...a, ...b }),
    default: () => ({}),
  }),
});

Reducer 冲突示例

如果多个并行节点都写 results,但 reducer 不是 concat,会发生覆盖:

const State = Annotation.Root({
  results: Annotation<string[]>({
    reducer: (a, b) => a.concat(b),
    default: () => [],
  }),
});

const workerA = async () => ({ results: ['A'] });
const workerB = async () => ({ results: ['B'] });

提示:只要有并行写入,就必须明确 Reducer 的合并规则。


7. Send API 详解

Send API 是 LangGraphJS 中实现动态并行处理的核心机制:

Send API 基础

import { StateGraph, Annotation, Send, START } from '@langchain/langgraph';

interface SendState {
  input: string;
  results: Record<string, any>;
  summary: string;
}

// 创建 Send 对象
const createSend = (targetNode: string, args: Record<string, any>) => {
  return new Send(targetNode, args);
};

// Send API 在超级步骤中的作用
const sendNode = (state: SendState) => {
  const tasks = ['taskA', 'taskB', 'taskC'];

  // 动态创建并行任务
  return tasks.map((taskName) =>
    createSend(taskName, { taskData: state.input }),
  );
};

动态任务创建

Send API 核心功能

  • 动态任务创建:在运行时创建并行任务
  • 独立状态管理:每个并行任务可以有自己的状态
  • 灵活的数据流:支持复杂的数据传递和聚合模式
  • 错误隔离:单个任务的失败不会影响其他并行任务

8. 实战:并行检索(3 路)+ 汇总

这个例子展示了一个典型模式:

  • fan-out:同时请求 3 个数据源
  • fan-in:把结果汇总成一个 summary
import { Annotation, StateGraph, START, END } from '@langchain/langgraph';

const State = Annotation.Root({
  query: Annotation<string>(),
  hits: Annotation<Record<string, string[]>>({
    reducer: (a, b) => ({ ...a, ...b }),
    default: () => ({}),
  }),
  summary: Annotation<string>({
    default: () => '',
  }),
});

const searchA = async (state: typeof State.State) => ({
  hits: { a: [`A:${state.query}`] },
});
const searchB = async (state: typeof State.State) => ({
  hits: { b: [`B:${state.query}`] },
});
const searchC = async (state: typeof State.State) => ({
  hits: { c: [`C:${state.query}`] },
});

const summarize = async (state: typeof State.State) => {
  const all = Object.values(state.hits).flat();
  return { summary: `Total hits: ${all.length}\n` + all.join('\n') };
};

export const app = new StateGraph(State)
  .addNode('searchA', searchA)
  .addNode('searchB', searchB)
  .addNode('searchC', searchC)
  .addNode('summarize', summarize)
  // fan-out
  .addEdge(START, 'searchA')
  .addEdge(START, 'searchB')
  .addEdge(START, 'searchC')
  // fan-in
  .addEdge('searchA', 'summarize')
  .addEdge('searchB', 'summarize')
  .addEdge('searchC', 'summarize')
  .addEdge('summarize', END)
  .compile();

代码解析

  1. 三个搜索节点并行写入 hits 的不同 key,所以用字典合并 reducer({...a, ...b})。
  2. summarize 节点会等三路都跑完才执行。

9. Map-Reduce 进阶示例

当 Reduce 需要结构化结果时,建议给结果字段配 reducer:

const State = Annotation.Root({
  items: Annotation<string[]>(),
  results: Annotation<Array<{ id: number; score: number }>>({
    reducer: (a, b) => a.concat(b),
    default: () => [],
  }),
});

const mapNode = (state: typeof State.State) =>
  state.items.map((item, index) => new Send('score', { id: index, item }));

const scoreNode = async (state: { id: number; item: string }) => ({
  results: [{ id: state.id, score: state.item.length }],
});

说明:结构化结果让 Reduce 更容易做排序/过滤/聚合。


10. 多代理协作系统

在复杂的 AI 应用中,不同的代理可以并行处理任务的不同方面:

import { StateGraph, Annotation, Send } from '@langchain/langgraph';

interface MultiAgentState {
  query: string;
  results: Record<string, string>;
  finalResponse: string;
}

// 任务分发器 - 根据查询类型选择代理
const taskDispatcher = (state: MultiAgentState) => {
  const query = state.query.toLowerCase();
  const agents = ['research', 'analysis', 'creative'];

  // 智能选择相关代理
  const selectedAgents = agents.filter((agent) => {
    if (agent === 'research')
      return query.includes('研究') || query.includes('查找');
    if (agent === 'analysis')
      return query.includes('分析') || query.includes('比较');
    if (agent === 'creative')
      return query.includes('创建') || query.includes('设计');
    return true; // 默认包含
  });

  // 为选中的代理创建并行任务
  return selectedAgents.map(
    (agentType) => new Send('processAgent', { agentType, query: state.query }),
  );
};

// 代理处理器
const processAgent = async (state: { agentType: string; query: string }) => {
  await new Promise((resolve) => setTimeout(resolve, 300));

  const responses = {
    research: `🔍 研究结果:关于"${state.query}"的背景信息`,
    analysis: `📊 分析结果:"${state.query}"的深度评估`,
    creative: `💡 创意建议:关于"${state.query}"的创新方案`,
  };

  const result =
    responses[state.agentType] || `🤖 ${state.agentType}代理的处理结果`;

  return { results: { [state.agentType]: result } };
};

// 结果综合器
const synthesizeResults = (state: MultiAgentState) => {
  if (Object.keys(state.results).length === 0) return { finalResponse: '' };

  const finalResponse = [
    '🤝 多代理协作结果',
    '',
    ...Object.entries(state.results).map(
      ([agent, result]) => `【${agent}代理】${result}`,
    ),
    '',
    '🎯 综合建议:基于多个代理的专业分析,为您提供全面解决方案。',
  ].join('\n');

  return { finalResponse };
};

11. 错误隔离与降级

并行任务中,某个分支失败不应拖垮整个流程:

const safeScore = async (state: { id: number; item: string }) => {
  try {
    return { results: [{ id: state.id, score: state.item.length }] };
  } catch {
    return { results: [{ id: state.id, score: 0 }] };
  }
};

注意:降级结果要有可识别标记,避免"错误数据"影响最终汇总。


12. 性能优化与最佳实践

并发控制策略

虽然并行处理可以提升性能,但过多的并发可能导致:

问题影响建议策略
API 限流请求被拒绝根据系统资源和 API 限制合理设置并发数量
内存占用过高应用变慢或崩溃监控内存使用,必要时减少并发数
系统资源竞争性能下降使用连接池,避免频繁创建连接
错误传播难以定位问题为每个并行任务添加错误处理

建议并发数

  • 本地开发环境:2-4 个并行任务
  • 云端部署环境:5-10 个并行任务(取决于实例配置)
  • 批量处理:10-50 个项目每批

状态管理优化

// 高效状态管理策略
const StateAnnotation = Annotation.Root({
  // 核心数据
  results: Annotation<string[]>({
    reducer: (state, update) => state.concat(update),
    default: () => [],
  }),

  // 使用 Map 提升查找效率
  resultCache: Annotation<Map<string, string>>({
    reducer: (state, update) => {
      const newMap = new Map(state);
      update.forEach((value, key) => newMap.set(key, value));
      return newMap;
    },
    default: () => new Map(),
  }),
});

13. 常见坑与解决方案

并行写同一字段会发生什么?

如果并行节点都返回 { results: [...] },而 results 不是 concat reducer,你很可能只得到"最后一个写入者"的结果。

解决方案

  • 写不同字段(例如 hits.a/hits.b
  • 或者给该字段配置数组 concat reducer

常见问题汇总

问题原因解决方案
并行任务不执行超级步骤配置错误检查 Send API 调用,确保节点名正确
状态不一致Reducer 使用不当确保每个并行任务返回的状态格式一致
内存泄漏未正确清理状态检查 Reducer 的实现,避免累积过多状态
任务丢失错误处理不完善为每个并行任务添加错误处理和回退逻辑
性能下降并发数过高根据系统资源调整并发数量

14. 配置并行度与限流

并行并不等于"无限并发"。在真实系统里你需要考虑:

  • 外部 API 限流:并行请求太多会触发 rate limit
  • 资源占用:并发过高会放大内存和 CPU 压力
  • 排队策略:有些任务应该串行以保持顺序

实用建议:把"可并行"的部分做成子图或 Send 分支,同时控制 recursionLimit 与外部请求节奏。


15. 流式并行处理

LangGraphJS 还支持流式并行处理,让你能够实时观察并行任务的执行进度:

import { StateGraph, Annotation, Send, START } from '@langchain/langgraph';

interface StreamingState {
  input: string;
  results: Record<string, any>;
  completed: number;
  finalOutput: string;
}

// 任务分发器
const distributeTask = (state: StreamingState) => {
  return {
    results: {},
    completed: 0,
    tasks: ['process', 'analyze', 'validate'],
  };
};

// 数据处理任务(模拟流式)
const processData = async (state: { index: number; taskType: string }) => {
  const steps = ['初始化', '处理数据', '生成结果'];

  for (let i = 0; i < steps.length; i++) {
    await new Promise((resolve) => setTimeout(resolve, 100));
    console.log(`📡 流式进度 ${i + 1}/${steps.length}: ${steps[i]}`);
  }

  return {
    results: { [state.taskType]: `✅ ${state.taskType}完成` },
    completed: 1,
  };
};

// 结果聚合器
const aggregateResults = (state: StreamingState) => {
  if (state.completed < 3) return { finalOutput: '' };

  const finalOutput = [
    '🌟 流式并行处理完成',
    '',
    ...Object.entries(state.results).map(([task, result]) => result),
    '',
    `🎯 总计完成 ${state.completed} 个任务`,
  ].join('\n');

  return { finalOutput };
};

流式处理工作流


16. 监控和调试

执行时间监控

// 执行时间监控示例
const monitoredNode = async (state: any) => {
  const startTime = Date.now();

  try {
    const result = await processTask(state.task);
    const duration = Date.now() - startTime;

    console.log(`任务完成,耗时: ${duration}ms`);

    if (duration > 5000) {
      console.warn(`⚠️ 慢任务: ${duration}ms`);
    }

    return {
      result,
      executionTime: duration,
      timestamp: new Date().toISOString(),
    };
  } catch (error) {
    console.error('任务执行失败:', error);
    throw error;
  }
};

并发度监控

// 并发度监控
interface ConcurrencyMetrics {
  activeTasks: number;
  queuedTasks: number;
  completedTasks: number;
  failedTasks: number;
}

const metrics: ConcurrencyMetrics = {
  activeTasks: 0,
  queuedTasks: 0,
  completedTasks: 0,
  failedTasks: 0,
};

// 更新指标
const updateMetrics = (event: string) => {
  switch (event) {
    case 'taskStarted':
      metrics.activeTasks++;
      metrics.queuedTasks--;
      break;
    case 'taskCompleted':
      metrics.completedTasks++;
      metrics.activeTasks--;
      break;
    case 'taskFailed':
      metrics.failedTasks++;
      metrics.activeTasks--;
      break;
  }

  console.log('当前并发状态:', {
    active: metrics.activeTasks,
    queued: metrics.queuedTasks,
    completed: metrics.completedTasks,
    failed: metrics.failedTasks,
  });
};

💡 练习题

  1. 场景题:你需要分析 10 篇新闻稿的情感。设计一个 Map-Reduce 图:

    • Map: 让 LLM 给每篇稿子打分。
    • Reduce: 计算平均分。
    • 请写出状态 Schema 定义(注意 Reducer 的使用)。
    点击查看答案

    scores 使用 concat reducer,Map 节点返回 { scores: [score] }

  2. 操作题:将一个并行检索例子改造成"结构化输出 + 去重合并"。

    点击查看答案

    让每个分支输出 { source, docs },Reduce 节点按 source 合并并去重。

  3. 思考题:为什么 Reducer 是并行的关键?

    点击查看答案

    并行节点会同时写入状态,Reducer 决定"冲突时如何合并"。没有合适 Reducer 会导致数据丢失。

  4. 操作题:为并行分支加入错误隔离逻辑,并确保 Reduce 阶段可识别失败分支。

    点击查看答案

    在分支返回中加 status 字段(ok/error),Reduce 根据 status 过滤或降级处理。

  5. 思考题:并行度过高可能带来哪些问题?

    点击查看答案

    常见问题包括 API 限流、资源争抢、响应抖动,需结合限流与队列策略控制。


✅ 总结

本章要点

  • Fan-out/Fan-in:通过图结构实现静态并行。
  • Send API:实现动态并行(Map-Reduce)。
  • 并行处理时务必配置好状态的 Reducer,防止数据丢失。

下一步:恭喜!你已经掌握了 LangGraph 的核心架构。接下来我们将介绍一些实用功能,如调试和持久化。

登录以继续阅读

解锁完整文档、代码示例及更多高级功能。

立即登录

On this page