架构模式
并行处理
利用 LangGraph 实现任务的并行执行和 Map-Reduce 模式
📚 学习目标
学完这篇文章后,你将能够:
- 在图结构中设计并行执行的分支
- 实现 Map-Reduce 模式来处理批量数据
- 理解 LangGraph 的并行执行原理与限制
前置知识
在开始学习之前,建议先阅读:
你需要了解:
- JavaScript
Promise.all的概念
1️⃣ 隐式并行 (Fan-out)
在 LangGraph 中,实现并行非常简单:让一个节点同时指向多个后续节点。
代码实现
只需添加多条边即可。
graph
.addEdge("node_a", "node_b") // A -> B
.addEdge("node_a", "node_c") // A -> C当 node_a 完成后,node_b 和 node_c 会同时被调度执行。
状态合并 (Fan-in)
当 B 和 C 都完成后,D 才会执行。 注意:B 和 C 必须同时更新状态的不同字段,或者目标字段有合适的 Reducer(如数组追加),否则会发生覆盖冲突。
2️⃣ Map-Reduce 模式
处理数组列表时的经典模式:对列表中的每一项执行相同的操作(Map),然后汇总结果(Reduce)。
LangGraph 提供了 Send API 来实现动态分支。
使用 Send API
import { Send, END } from '@langchain/langgraph';
// 1. Map 逻辑(在 Router 中使用)
const mapNode = (state) => {
return state.items.map(item =>
new Send("processor_node", { content: item })
);
};
// 2. Processor Node
const processorNode = (state) => {
return { results: [state.content.toUpperCase()] };
};
// 3. 构建图
graph
.addNode("processor_node", processorNode)
.addConditionalEdges(START, mapNode); // 动态生成 Send 对象关键点:new Send(targetNode, state) 创建了一个并行任务分支。
3️⃣ 配置并行度
虽然逻辑上是并行的,但实际执行受到 JavaScript 运行时和并发限制的影响。
你可以控制最大并发数(如果使用的是 LangGraph Cloud 或特定运行器),但在本地 JS 环境中,通常受限于 API 的 Rate Limit。
📚 参考资源
官方文档
💡 练习题
- 场景题:你需要分析 10 篇新闻稿的情感。设计一个 Map-Reduce 图:
- Map: 让 LLM 给每篇稿子打分。
- Reduce: 计算平均分。
- 请写出状态 Schema 定义(注意 Reducer 的使用)。
✅ 总结
本章要点:
- Fan-out/Fan-in:通过图结构实现静态并行。
- Send API:实现动态并行(Map-Reduce)。
- 并行处理时务必配置好状态的 Reducer,防止数据丢失。
下一步:恭喜!你已经掌握了 LangGraph 的核心架构。接下来我们将介绍一些实用功能,如调试和持久化。
登录以继续阅读
解锁完整文档、代码示例及更多高级功能。