并行处理
利用 LangGraph 实现任务的并行执行和 Map-Reduce 模式
📚 学习目标
学完这篇文章后,你将能够:
- 在图结构中设计并行执行的分支
- 实现 Map-Reduce 模式来处理批量数据
- 理解 LangGraph 的并行执行原理与限制
前置知识
在开始学习之前,建议先阅读:
你需要了解:
- JavaScript
Promise.all的概念
1. 隐式并行 (Fan-out)
在 LangGraph 中,实现并行非常简单:让一个节点同时指向多个后续节点。
代码实现
只需添加多条边即可。
graph
.addEdge('node_a', 'node_b') // A -> B
.addEdge('node_a', 'node_c'); // A -> C当 node_a 完成后,node_b 和 node_c 会同时被调度执行。
状态合并 (Fan-in)
当 B 和 C 都完成后,D 才会执行。 注意:B 和 C 必须同时更新状态的不同字段,或者目标字段有合适的 Reducer(如数组追加),否则会发生覆盖冲突。
2. 核心概念:超级步骤(Super-step)
你可以把 LangGraph 的一次执行想象成"分波次(waves)"推进:
- 同一波次(同一 super-step):可以并行跑多个节点
- 下一波次:要等这一波全部完成,然后合并状态再进入下一波
这也是为什么 reducer 很关键:并行节点写同一个字段时,合并策略决定了最终结果。
3. 并行 vs 串行:什么时候该并行?
| 场景 | 并行价值 | 推荐策略 |
|---|---|---|
| 多数据源检索 | 高 | Fan-out + Fan-in 汇总 |
| 依赖前一步结果 | 低 | 保持串行 |
| 多角度分析 | 中 | 并行 + 合并结论 |
判断标准:如果各分支不依赖彼此结果,并行能带来显著收益;反之保持串行更稳定。
4. 并行任务的可观测性
可以用流式事件观察每个分支的执行顺序:
const stream = app.streamEvents({ query: 'LangGraph' }, { version: 'v2' });
for await (const event of stream) {
console.log(event.event, event.name);
}说明:对于并行系统,观测执行顺序和状态合并尤为重要。
5. Map-Reduce 模式
处理数组列表时的经典模式:对列表中的每一项执行相同的操作(Map),然后汇总结果(Reduce)。
LangGraph 提供了 Send API 来实现动态分支。
使用 Send API
import { Send, END } from '@langchain/langgraph';
// 1. Map 逻辑(在 Router 中使用)
const mapNode = (state) => {
return state.items.map(
(item) => new Send('processor_node', { content: item }),
);
};
// 2. Processor Node
const processorNode = (state) => {
return { results: [state.content.toUpperCase()] };
};
// 3. 构建图
graph
.addNode('processor_node', processorNode)
.addConditionalEdges(START, mapNode); // 动态生成 Send 对象关键点:new Send(targetNode, state) 创建了一个并行任务分支。
Map-Reduce 执行流程
6. Reducer 设计:并行结果怎么合并?
并行场景最常见的 3 种 reducer:
| 目标字段 | 推荐 reducer | 场景 |
|---|---|---|
| 数组累积 | concat | 并行产生多个结果条目 |
| 字典合并 | {...a, ...b} | 并行写不同 key 的结果 |
| 覆盖写入 | (_a, b) => b | 你明确只需要最后一次结果 |
下面给一个"并行检索 + 合并结果"的状态定义:
import { Annotation } from '@langchain/langgraph';
const State = Annotation.Root({
query: Annotation<string>(),
hits: Annotation<Record<string, string[]>>({
reducer: (a, b) => ({ ...a, ...b }),
default: () => ({}),
}),
});Reducer 冲突示例
如果多个并行节点都写 results,但 reducer 不是 concat,会发生覆盖:
const State = Annotation.Root({
results: Annotation<string[]>({
reducer: (a, b) => a.concat(b),
default: () => [],
}),
});
const workerA = async () => ({ results: ['A'] });
const workerB = async () => ({ results: ['B'] });提示:只要有并行写入,就必须明确 Reducer 的合并规则。
7. Send API 详解
Send API 是 LangGraphJS 中实现动态并行处理的核心机制:
Send API 基础
import { StateGraph, Annotation, Send, START } from '@langchain/langgraph';
interface SendState {
input: string;
results: Record<string, any>;
summary: string;
}
// 创建 Send 对象
const createSend = (targetNode: string, args: Record<string, any>) => {
return new Send(targetNode, args);
};
// Send API 在超级步骤中的作用
const sendNode = (state: SendState) => {
const tasks = ['taskA', 'taskB', 'taskC'];
// 动态创建并行任务
return tasks.map((taskName) =>
createSend(taskName, { taskData: state.input }),
);
};动态任务创建
Send API 核心功能:
- 动态任务创建:在运行时创建并行任务
- 独立状态管理:每个并行任务可以有自己的状态
- 灵活的数据流:支持复杂的数据传递和聚合模式
- 错误隔离:单个任务的失败不会影响其他并行任务
8. 实战:并行检索(3 路)+ 汇总
这个例子展示了一个典型模式:
- fan-out:同时请求 3 个数据源
- fan-in:把结果汇总成一个 summary
import { Annotation, StateGraph, START, END } from '@langchain/langgraph';
const State = Annotation.Root({
query: Annotation<string>(),
hits: Annotation<Record<string, string[]>>({
reducer: (a, b) => ({ ...a, ...b }),
default: () => ({}),
}),
summary: Annotation<string>({
default: () => '',
}),
});
const searchA = async (state: typeof State.State) => ({
hits: { a: [`A:${state.query}`] },
});
const searchB = async (state: typeof State.State) => ({
hits: { b: [`B:${state.query}`] },
});
const searchC = async (state: typeof State.State) => ({
hits: { c: [`C:${state.query}`] },
});
const summarize = async (state: typeof State.State) => {
const all = Object.values(state.hits).flat();
return { summary: `Total hits: ${all.length}\n` + all.join('\n') };
};
export const app = new StateGraph(State)
.addNode('searchA', searchA)
.addNode('searchB', searchB)
.addNode('searchC', searchC)
.addNode('summarize', summarize)
// fan-out
.addEdge(START, 'searchA')
.addEdge(START, 'searchB')
.addEdge(START, 'searchC')
// fan-in
.addEdge('searchA', 'summarize')
.addEdge('searchB', 'summarize')
.addEdge('searchC', 'summarize')
.addEdge('summarize', END)
.compile();代码解析:
- 三个搜索节点并行写入
hits的不同 key,所以用字典合并 reducer({...a, ...b})。 summarize节点会等三路都跑完才执行。
9. Map-Reduce 进阶示例
当 Reduce 需要结构化结果时,建议给结果字段配 reducer:
const State = Annotation.Root({
items: Annotation<string[]>(),
results: Annotation<Array<{ id: number; score: number }>>({
reducer: (a, b) => a.concat(b),
default: () => [],
}),
});
const mapNode = (state: typeof State.State) =>
state.items.map((item, index) => new Send('score', { id: index, item }));
const scoreNode = async (state: { id: number; item: string }) => ({
results: [{ id: state.id, score: state.item.length }],
});说明:结构化结果让 Reduce 更容易做排序/过滤/聚合。
10. 多代理协作系统
在复杂的 AI 应用中,不同的代理可以并行处理任务的不同方面:
import { StateGraph, Annotation, Send } from '@langchain/langgraph';
interface MultiAgentState {
query: string;
results: Record<string, string>;
finalResponse: string;
}
// 任务分发器 - 根据查询类型选择代理
const taskDispatcher = (state: MultiAgentState) => {
const query = state.query.toLowerCase();
const agents = ['research', 'analysis', 'creative'];
// 智能选择相关代理
const selectedAgents = agents.filter((agent) => {
if (agent === 'research')
return query.includes('研究') || query.includes('查找');
if (agent === 'analysis')
return query.includes('分析') || query.includes('比较');
if (agent === 'creative')
return query.includes('创建') || query.includes('设计');
return true; // 默认包含
});
// 为选中的代理创建并行任务
return selectedAgents.map(
(agentType) => new Send('processAgent', { agentType, query: state.query }),
);
};
// 代理处理器
const processAgent = async (state: { agentType: string; query: string }) => {
await new Promise((resolve) => setTimeout(resolve, 300));
const responses = {
research: `🔍 研究结果:关于"${state.query}"的背景信息`,
analysis: `📊 分析结果:"${state.query}"的深度评估`,
creative: `💡 创意建议:关于"${state.query}"的创新方案`,
};
const result =
responses[state.agentType] || `🤖 ${state.agentType}代理的处理结果`;
return { results: { [state.agentType]: result } };
};
// 结果综合器
const synthesizeResults = (state: MultiAgentState) => {
if (Object.keys(state.results).length === 0) return { finalResponse: '' };
const finalResponse = [
'🤝 多代理协作结果',
'',
...Object.entries(state.results).map(
([agent, result]) => `【${agent}代理】${result}`,
),
'',
'🎯 综合建议:基于多个代理的专业分析,为您提供全面解决方案。',
].join('\n');
return { finalResponse };
};11. 错误隔离与降级
并行任务中,某个分支失败不应拖垮整个流程:
const safeScore = async (state: { id: number; item: string }) => {
try {
return { results: [{ id: state.id, score: state.item.length }] };
} catch {
return { results: [{ id: state.id, score: 0 }] };
}
};注意:降级结果要有可识别标记,避免"错误数据"影响最终汇总。
12. 性能优化与最佳实践
并发控制策略
虽然并行处理可以提升性能,但过多的并发可能导致:
| 问题 | 影响 | 建议策略 |
|---|---|---|
| API 限流 | 请求被拒绝 | 根据系统资源和 API 限制合理设置并发数量 |
| 内存占用过高 | 应用变慢或崩溃 | 监控内存使用,必要时减少并发数 |
| 系统资源竞争 | 性能下降 | 使用连接池,避免频繁创建连接 |
| 错误传播 | 难以定位问题 | 为每个并行任务添加错误处理 |
建议并发数:
- 本地开发环境:2-4 个并行任务
- 云端部署环境:5-10 个并行任务(取决于实例配置)
- 批量处理:10-50 个项目每批
状态管理优化
// 高效状态管理策略
const StateAnnotation = Annotation.Root({
// 核心数据
results: Annotation<string[]>({
reducer: (state, update) => state.concat(update),
default: () => [],
}),
// 使用 Map 提升查找效率
resultCache: Annotation<Map<string, string>>({
reducer: (state, update) => {
const newMap = new Map(state);
update.forEach((value, key) => newMap.set(key, value));
return newMap;
},
default: () => new Map(),
}),
});13. 常见坑与解决方案
并行写同一字段会发生什么?
如果并行节点都返回 { results: [...] },而 results 不是 concat reducer,你很可能只得到"最后一个写入者"的结果。
解决方案:
- 写不同字段(例如
hits.a/hits.b) - 或者给该字段配置数组 concat reducer
常见问题汇总
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 并行任务不执行 | 超级步骤配置错误 | 检查 Send API 调用,确保节点名正确 |
| 状态不一致 | Reducer 使用不当 | 确保每个并行任务返回的状态格式一致 |
| 内存泄漏 | 未正确清理状态 | 检查 Reducer 的实现,避免累积过多状态 |
| 任务丢失 | 错误处理不完善 | 为每个并行任务添加错误处理和回退逻辑 |
| 性能下降 | 并发数过高 | 根据系统资源调整并发数量 |
14. 配置并行度与限流
并行并不等于"无限并发"。在真实系统里你需要考虑:
- 外部 API 限流:并行请求太多会触发 rate limit
- 资源占用:并发过高会放大内存和 CPU 压力
- 排队策略:有些任务应该串行以保持顺序
实用建议:把"可并行"的部分做成子图或 Send 分支,同时控制 recursionLimit 与外部请求节奏。
15. 流式并行处理
LangGraphJS 还支持流式并行处理,让你能够实时观察并行任务的执行进度:
import { StateGraph, Annotation, Send, START } from '@langchain/langgraph';
interface StreamingState {
input: string;
results: Record<string, any>;
completed: number;
finalOutput: string;
}
// 任务分发器
const distributeTask = (state: StreamingState) => {
return {
results: {},
completed: 0,
tasks: ['process', 'analyze', 'validate'],
};
};
// 数据处理任务(模拟流式)
const processData = async (state: { index: number; taskType: string }) => {
const steps = ['初始化', '处理数据', '生成结果'];
for (let i = 0; i < steps.length; i++) {
await new Promise((resolve) => setTimeout(resolve, 100));
console.log(`📡 流式进度 ${i + 1}/${steps.length}: ${steps[i]}`);
}
return {
results: { [state.taskType]: `✅ ${state.taskType}完成` },
completed: 1,
};
};
// 结果聚合器
const aggregateResults = (state: StreamingState) => {
if (state.completed < 3) return { finalOutput: '' };
const finalOutput = [
'🌟 流式并行处理完成',
'',
...Object.entries(state.results).map(([task, result]) => result),
'',
`🎯 总计完成 ${state.completed} 个任务`,
].join('\n');
return { finalOutput };
};流式处理工作流
16. 监控和调试
执行时间监控
// 执行时间监控示例
const monitoredNode = async (state: any) => {
const startTime = Date.now();
try {
const result = await processTask(state.task);
const duration = Date.now() - startTime;
console.log(`任务完成,耗时: ${duration}ms`);
if (duration > 5000) {
console.warn(`⚠️ 慢任务: ${duration}ms`);
}
return {
result,
executionTime: duration,
timestamp: new Date().toISOString(),
};
} catch (error) {
console.error('任务执行失败:', error);
throw error;
}
};并发度监控
// 并发度监控
interface ConcurrencyMetrics {
activeTasks: number;
queuedTasks: number;
completedTasks: number;
failedTasks: number;
}
const metrics: ConcurrencyMetrics = {
activeTasks: 0,
queuedTasks: 0,
completedTasks: 0,
failedTasks: 0,
};
// 更新指标
const updateMetrics = (event: string) => {
switch (event) {
case 'taskStarted':
metrics.activeTasks++;
metrics.queuedTasks--;
break;
case 'taskCompleted':
metrics.completedTasks++;
metrics.activeTasks--;
break;
case 'taskFailed':
metrics.failedTasks++;
metrics.activeTasks--;
break;
}
console.log('当前并发状态:', {
active: metrics.activeTasks,
queued: metrics.queuedTasks,
completed: metrics.completedTasks,
failed: metrics.failedTasks,
});
};💡 练习题
-
场景题:你需要分析 10 篇新闻稿的情感。设计一个 Map-Reduce 图:
- Map: 让 LLM 给每篇稿子打分。
- Reduce: 计算平均分。
- 请写出状态 Schema 定义(注意 Reducer 的使用)。
点击查看答案
让
scores使用concatreducer,Map 节点返回{ scores: [score] }。 -
操作题:将一个并行检索例子改造成"结构化输出 + 去重合并"。
点击查看答案
让每个分支输出
{ source, docs },Reduce 节点按source合并并去重。 -
思考题:为什么 Reducer 是并行的关键?
点击查看答案
并行节点会同时写入状态,Reducer 决定"冲突时如何合并"。没有合适 Reducer 会导致数据丢失。
-
操作题:为并行分支加入错误隔离逻辑,并确保 Reduce 阶段可识别失败分支。
点击查看答案
在分支返回中加
status字段(ok/error),Reduce 根据 status 过滤或降级处理。 -
思考题:并行度过高可能带来哪些问题?
点击查看答案
常见问题包括 API 限流、资源争抢、响应抖动,需结合限流与队列策略控制。
✅ 总结
本章要点:
- Fan-out/Fan-in:通过图结构实现静态并行。
- Send API:实现动态并行(Map-Reduce)。
- 并行处理时务必配置好状态的 Reducer,防止数据丢失。
下一步:恭喜!你已经掌握了 LangGraph 的核心架构。接下来我们将介绍一些实用功能,如调试和持久化。
登录以继续阅读
解锁完整文档、代码示例及更多高级功能。