架构模式

并行处理

利用 LangGraph 实现任务的并行执行和 Map-Reduce 模式

📚 学习目标

学完这篇文章后,你将能够:

  • 在图结构中设计并行执行的分支
  • 实现 Map-Reduce 模式来处理批量数据
  • 理解 LangGraph 的并行执行原理与限制

前置知识

在开始学习之前,建议先阅读:

你需要了解:

  • JavaScript Promise.all 的概念

1️⃣ 隐式并行 (Fan-out)

在 LangGraph 中,实现并行非常简单:让一个节点同时指向多个后续节点

代码实现

只需添加多条边即可。

graph
  .addEdge("node_a", "node_b") // A -> B
  .addEdge("node_a", "node_c") // A -> C

node_a 完成后,node_bnode_c 会同时被调度执行。

状态合并 (Fan-in)

当 B 和 C 都完成后,D 才会执行。 注意:B 和 C 必须同时更新状态的不同字段,或者目标字段有合适的 Reducer(如数组追加),否则会发生覆盖冲突。


2️⃣ Map-Reduce 模式

处理数组列表时的经典模式:对列表中的每一项执行相同的操作(Map),然后汇总结果(Reduce)。

LangGraph 提供了 Send API 来实现动态分支。

使用 Send API

import { Send, END } from '@langchain/langgraph';

// 1. Map 逻辑(在 Router 中使用)
const mapNode = (state) => {
  return state.items.map(item => 
    new Send("processor_node", { content: item })
  );
};

// 2. Processor Node
const processorNode = (state) => {
  return { results: [state.content.toUpperCase()] };
};

// 3. 构建图
graph
  .addNode("processor_node", processorNode)
  .addConditionalEdges(START, mapNode); // 动态生成 Send 对象

关键点new Send(targetNode, state) 创建了一个并行任务分支。


3️⃣ 配置并行度

虽然逻辑上是并行的,但实际执行受到 JavaScript 运行时和并发限制的影响。

你可以控制最大并发数(如果使用的是 LangGraph Cloud 或特定运行器),但在本地 JS 环境中,通常受限于 API 的 Rate Limit。


📚 参考资源

官方文档


💡 练习题

  1. 场景题:你需要分析 10 篇新闻稿的情感。设计一个 Map-Reduce 图:
    • Map: 让 LLM 给每篇稿子打分。
    • Reduce: 计算平均分。
    • 请写出状态 Schema 定义(注意 Reducer 的使用)。

✅ 总结

本章要点

  • Fan-out/Fan-in:通过图结构实现静态并行。
  • Send API:实现动态并行(Map-Reduce)。
  • 并行处理时务必配置好状态的 Reducer,防止数据丢失。

下一步:恭喜!你已经掌握了 LangGraph 的核心架构。接下来我们将介绍一些实用功能,如调试和持久化。

登录以继续阅读

解锁完整文档、代码示例及更多高级功能。

立即登录

On this page