概述

本文基于网上多篇深度技术文章和源码分析,深入探讨LangGraph的高级应用模式、企业级实践和技术实现细节。通过实际案例和源码剖析,展示LangGraph在复杂场景中的强大能力。

1. 深度研究系统架构

1.1 完整的研究工作流实现

基于实际的深度研究系统,展示LangGraph在复杂多阶段任务中的应用:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field
import operator
from datetime import datetime

class OverallState(TypedDict):
    """研究系统的整体状态模式
    
    这个状态设计展示了LangGraph在复杂工作流中的状态管理能力:
    - 使用Annotated类型定义累积行为
    - 支持多轮迭代的状态追踪  
    - 集成配置参数和运行时状态
    """
    messages: Annotated[list, add_messages]                # 对话消息累积
    search_query: Annotated[list, operator.add]           # 搜索查询累积
    web_research_result: Annotated[list, operator.add]    # 研究结果累积
    sources_gathered: Annotated[list, operator.add]       # 来源信息累积
    initial_search_query_count: int                       # 初始查询数量
    max_research_loops: int                               # 最大研究循环次数
    research_loop_count: int                              # 当前循环次数
    reasoning_model: str                                  # 推理模型名称

class SearchQueryList(BaseModel):
    """搜索查询列表模型:确保查询生成的结构化输出"""
    query: List[str] = Field(description="优化的搜索查询列表,每个查询关注不同角度")

class Reflection(BaseModel):
    """反思分析结果模型:支持知识缺口分析和迭代决策"""
    is_sufficient: bool = Field(description="当前信息是否足够回答问题")
    knowledge_gap: List[str] = Field(description="识别的知识缺口列表")
    follow_up_queries: List[str] = Field(description="针对知识缺口的后续查询建议")
    confidence_score: float = Field(description="答案置信度评分", ge=0.0, le=1.0)

async def run_agent_workflow_async(
    user_input: str,
    debug: bool = False,
    max_plan_iterations: int = 3,
    max_step_num: int = 5,
    enable_background_investigation: bool = True,
) -> Dict[str, Any]:
    """异步运行代理工作流,处理用户输入
    
    这是一个生产级的研究工作流实现,展示了LangGraph在复杂
    多阶段任务中的应用能力,包括:
    - 智能查询生成
    - 并行信息收集
    - 反思式质量控制
    - 迭代优化机制
    
    Args:
        user_input: 用户的查询或请求
        debug: 是否启用调试级别日志
        max_plan_iterations: 最大计划迭代次数
        max_step_num: 计划中的最大步骤数
        enable_background_investigation: 是否在规划前进行网络搜索
        
    Returns:
        Dict[str, Any]: 工作流完成后的最终状态,包含研究结果和来源
    """
    
    # 初始化研究状态
    initial_state = OverallState(
        messages=[HumanMessage(content=user_input)],
        search_query=[],
        web_research_result=[],
        sources_gathered=[],
        initial_search_query_count=max_step_num,
        max_research_loops=max_plan_iterations,
        research_loop_count=0,
        reasoning_model="gemini-2.0-flash-exp",
    )
    
    # 构建研究工作流图
    research_graph = build_research_workflow()
    
    # 配置运行环境
    config = RunnableConfig(
        configurable={
            "thread_id": f"research_{int(time.time())}",
            "user_id": "anonymous", 
            "query_generator_model": "gemini-2.0-flash-exp",
            "enable_tracing": debug,
        },
        callbacks=[
            LangfuseCallbackHandler() if debug else None,
            ConsoleCallbackHandler() if debug else None,
        ]
    )
    
    # 异步执行工作流
    final_state = None
    execution_steps = []
    
    async for state_update in research_graph.astream(
        initial_state,
        config=config,
        stream_mode="updates"
    ):
        execution_steps.append({
            "timestamp": time.time(),
            "update": state_update,
        })
        
        if debug:
            for node_name, node_state in state_update.items():
                print(f"📍 节点 '{node_name}' 执行完成")
                
                # 显示关键状态变化
                if "messages" in node_state and node_state["messages"]:
                    latest_message = node_state["messages"][-1]
                    preview = latest_message.content[:200] + "..." if len(latest_message.content) > 200 else latest_message.content
                    print(f"💬 输出预览: {preview}")
                
                if "search_query" in node_state:
                    print(f"🔍 新增查询: {node_state['search_query']}")
                
                if "sources_gathered" in node_state:
                    print(f"📚 收集来源: {len(node_state['sources_gathered'])} 个")
        
        final_state = state_update
    
    # 添加执行统计信息
    if final_state:
        final_state["execution_stats"] = {
            "total_steps": len(execution_steps),
            "total_duration": execution_steps[-1]["timestamp"] - execution_steps[0]["timestamp"] if execution_steps else 0,
            "queries_executed": len(final_state.get("search_query", [])),
            "sources_collected": len(final_state.get("sources_gathered", [])),
        }
    
    return final_state

def build_research_workflow() -> CompiledStateGraph:
    """构建深度研究工作流图
    
    该图实现了一个完整的研究流程:
    1. 查询生成:将用户问题转换为多个搜索查询
    2. 并行搜索:同时执行多个搜索任务
    3. 结果收集:汇总所有搜索结果
    4. 反思分析:评估信息充分性
    5. 迭代优化:根据反思结果决定是否需要更多信息
    6. 答案综合:生成最终的综合答案
    """
    graph = StateGraph(OverallState)
    
    # 查询生成节点
    def generate_query(state: OverallState, config: RunnableConfig) -> Dict[str, Any]:
        """智能查询生成:将研究主题分解为多个搜索角度"""
        configurable = config.get("configurable", {})
        query_model = configurable.get("query_generator_model", "gemini-2.0-flash-exp")
        
        # 初始化查询生成模型
        llm = ChatGoogleGenerativeAI(
            model=query_model,
            temperature=1.0,  # 提高查询多样性
            max_retries=2,
            api_key=os.getenv("GEMINI_API_KEY"),
        )
        structured_llm = llm.with_structured_output(SearchQueryList)
        
        # 构建上下文感知的查询生成提示
        research_topic = get_research_topic(state["messages"])
        current_date = datetime.now().strftime("%Y-%m-%d")
        previous_queries = state.get("search_query", [])
        
        query_instructions = f"""当前日期: {current_date}

研究主题: {research_topic}

之前已执行的查询(避免重复):
{chr(10).join(f"- {q}" for q in previous_queries[-5:]) if previous_queries else "无"}

请生成 {state["initial_search_query_count"]} 个新的优化搜索查询,要求:

1. **多角度覆盖**:从不同角度和层面分析主题
2. **时效性考虑**:包含最新信息和趋势分析
3. **深度挖掘**:不仅获取基础信息,还要深入技术细节
4. **专业术语**:使用领域专业术语提高搜索精度
5. **避免重复**:确保与之前查询不重复

每个查询应该是独立且具体的,能够获得有价值的信息片段。"""
        
        result = structured_llm.invoke(query_instructions)
        
        return {
            "query_list": result.query,
            "query_generation_completed": True,
            "query_generated_at": time.time(),
        }
    
    # 并行搜索分发节点
    def continue_to_web_research(state: OverallState) -> List[Send]:
        """启动并行网络搜索
        
        使用LangGraph的Send机制实现真正的并行搜索,
        每个查询都会启动一个独立的搜索任务
        """
        query_list = state.get("query_list", [])
        
        return [
            Send("web_research", {
                "search_query": search_query,
                "id": int(idx),
                "total_queries": len(query_list),
                "research_context": {
                    "main_topic": get_research_topic(state["messages"]),
                    "current_loop": state.get("research_loop_count", 0),
                }
            })
            for idx, search_query in enumerate(query_list)
        ]
    
    # 网络搜索执行节点
    def web_research(state: Dict[str, Any], config: RunnableConfig) -> OverallState:
        """执行单个搜索查询的网络研究
        
        集成Google Search API和Gemini模型,实现:
        - 智能搜索查询优化
        - 自动引用提取和格式化
        - URL优化和短链接生成
        """
        search_query = state["search_query"]
        search_id = state["id"]
        research_context = state.get("research_context", {})
        
        # 获取模型配置
        configurable = config.get("configurable", {})
        model_name = configurable.get("query_generator_model", "gemini-2.0-flash-exp")
        
        # 构建搜索上下文
        search_prompt = f"""
使用Google Search API搜索以下查询并提供详细分析:

查询: {search_query}
主题背景: {research_context.get("main_topic", "未知")}

搜索要求:
1. 使用多个相关关键词组合进行搜索
2. 优先选择权威可信的信息源
3. 提取关键事实、数据和观点
4. 分析信息的时效性和相关性
5. 总结核心发现和洞察

请提供结构化的研究结果,包含引用来源。
"""
        
        # 执行搜索
        genai_client = genai.GenerativeModel(model_name)
        response = genai_client.generate_content(
            search_prompt,
            tools=[{"google_search": {}}],
            config={"temperature": 0}  # 确保搜索结果的一致性
        )
        
        # 处理搜索结果和引用
        resolved_urls = resolve_urls(
            response.candidates[0].grounding_metadata.grounding_chunks, 
            search_id
        )
        citations = get_citations(response, resolved_urls)
        modified_text = insert_citation_markers(response.text, citations)
        
        # 构建结构化的来源信息
        sources_gathered = [
            {
                "url": url_info["url"],
                "title": url_info.get("title", "未知标题"),
                "short_url": url_info["short_url"],
                "value": url_info["value"],
                "search_id": search_id,
                "search_query": search_query,
                "collected_at": time.time(),
                "relevance_score": _calculate_relevance_score(
                    url_info, search_query, research_context
                ),
            }
            for url_info in resolved_urls
        ]
        
        return {
            "sources_gathered": sources_gathered,
            "search_query": [search_query],
            "web_research_result": [modified_text],
        }
    
    # 反思分析节点
    def reflection(state: OverallState, config: RunnableConfig) -> Dict[str, Any]:
        """反思分析:评估信息充分性并识别知识缺口
        
        这是LangGraph反思机制的核心实现,支持:
        - 自动评估研究结果的完整性
        - 识别知识缺口和信息不足的领域
        - 生成针对性的后续查询建议
        - 质量控制和迭代决策
        """
        reasoning_model = state.get("reasoning_model", "gemini-2.0-flash-exp")
        research_topic = get_research_topic(state["messages"])
        summaries = "\n\n---\n\n".join(state["web_research_result"])
        current_date = datetime.now().strftime("%Y-%m-%d")
        
        # 构建深度反思提示
        reflection_instructions = f"""作为研究分析专家,请对以下研究结果进行深度反思分析:

当前日期: {current_date}
研究主题: {research_topic}
已完成搜索次数: {len(state["search_query"])}
当前研究循环: {state.get("research_loop_count", 0)}

研究结果摘要:
{summaries}

请基于以下维度进行反思分析:

1. **信息完整性评估**:
   - 是否覆盖了主题的核心方面?
   - 是否存在明显的信息空白?
   - 不同来源的信息是否一致?

2. **知识缺口识别**:
   - 哪些重要问题尚未得到充分回答?
   - 需要哪些类型的补充信息?
   - 是否需要更专业或更新的信息?

3. **信息质量评估**:
   - 来源的权威性和可信度如何?
   - 信息的时效性是否满足要求?
   - 是否存在相互矛盾的信息?

4. **后续行动建议**:
   - 如果信息不充分,建议具体的后续查询
   - 优先级排序和搜索策略建议

请提供结构化的分析结果。"""
        
        # 执行反思分析
        llm = ChatGoogleGenerativeAI(
            model=reasoning_model,
            temperature=0.3,  # 平衡创造性和一致性
            max_retries=2,
            api_key=os.getenv("GEMINI_API_KEY"),
        )
        
        result = llm.with_structured_output(Reflection).invoke(reflection_instructions)
        
        return {
            "is_sufficient": result.is_sufficient,
            "knowledge_gap": result.knowledge_gap,
            "follow_up_queries": result.follow_up_queries,
            "confidence_score": result.confidence_score,
            "research_loop_count": state["research_loop_count"] + 1,
            "reflection_completed_at": time.time(),
        }
    
    # 最终答案综合节点
    def finalize_answer(state: OverallState, config: RunnableConfig) -> Dict[str, Any]:
        """综合所有研究结果生成最终答案
        
        实现智能的信息综合和答案生成:
        - 整合多个来源的信息
        - 生成结构化的综合答案
        - 自动处理引用和来源标注
        - 质量评估和置信度计算
        """
        reasoning_model = state.get("reasoning_model", "gemini-2.0-flash-exp") 
        research_topic = get_research_topic(state["messages"])
        summaries = "\n---\n\n".join(state["web_research_result"])
        current_date = datetime.now().strftime("%Y-%m-%d")
        
        # 构建综合答案生成提示
        answer_instructions = f"""作为研究分析师,请基于以下研究结果生成全面、准确的答案:

当前日期: {current_date}
研究主题: {research_topic}
研究循环次数: {state.get("research_loop_count", 0)}
信息来源数量: {len(state.get("sources_gathered", []))}

研究结果详情:
{summaries}

答案生成要求:

1. **结构化组织**:
   - 使用清晰的标题和子标题
   - 逻辑性强的信息组织
   - 重点突出关键发现

2. **客观性和平衡性**:
   - 呈现多种观点和角度
   - 避免偏见和主观判断
   - 承认不确定性和争议

3. **引用和来源**:
   - 明确标注信息来源
   - 使用内联引用格式
   - 提供完整的参考文献列表

4. **实用性**:
   - 直接回答用户的核心问题
   - 提供可操作的建议和结论
   - 突出关键要点和影响

请生成专业、全面的研究报告。"""
        
        # 生成最终答案
        llm = ChatGoogleGenerativeAI(
            model=reasoning_model,
            temperature=0,  # 确保答案的一致性和准确性
            max_retries=2,
            api_key=os.getenv("GEMINI_API_KEY"),
        )
        
        result = llm.invoke(answer_instructions)
        
        # 处理引用链接替换
        unique_sources = []
        answer_content = result.content
        
        for source in state["sources_gathered"]:
            if source["short_url"] in answer_content:
                # 将短链接替换为完整引用
                answer_content = answer_content.replace(
                    source["short_url"], 
                    source["value"]
                )
                unique_sources.append(source)
        
        # 计算答案质量指标
        quality_metrics = {
            "sources_cited": len(unique_sources),
            "content_length": len(answer_content),
            "research_depth": state.get("research_loop_count", 0),
            "confidence_score": state.get("confidence_score", 0.8),
        }
        
        return {
            "messages": [AIMessage(content=answer_content)],
            "sources_gathered": unique_sources,
            "research_completed": True,
            "quality_metrics": quality_metrics,
            "final_answer_generated_at": time.time(),
        }
    
    # 构建图结构和流程控制
    graph.add_node("generate_query", generate_query)
    graph.add_node("continue_to_web_research", continue_to_web_research)
    graph.add_node("web_research", web_research)
    graph.add_node("reflection", reflection)
    graph.add_node("finalize_answer", finalize_answer)
    
    # 设置流程路径
    graph.set_entry_point("generate_query")
    graph.add_edge("generate_query", "continue_to_web_research")
    graph.add_edge("continue_to_web_research", "web_research")
    graph.add_edge("web_research", "reflection")
    
    # 智能条件路由:基于反思结果决定下一步
    def should_continue_research(state: OverallState) -> str:
        """决定是否继续研究的智能条件函数"""
        is_sufficient = state.get("is_sufficient", False)
        research_count = state.get("research_loop_count", 0)
        max_loops = state.get("max_research_loops", 3)
        confidence = state.get("confidence_score", 0)
        
        # 多重条件判断
        if is_sufficient and confidence > 0.7:
            return "finalize_answer"
        elif research_count >= max_loops:
            # 达到最大循环次数,强制结束
            return "finalize_answer"
        elif len(state.get("knowledge_gap", [])) == 0:
            # 没有识别到知识缺口
            return "finalize_answer"
        else:
            # 继续研究
            return "generate_query"
    
    graph.add_conditional_edges(
        "reflection",
        should_continue_research,
        {
            "generate_query": "generate_query",
            "finalize_answer": "finalize_answer",
        }
    )
    
    graph.set_finish_point("finalize_answer")
    
    return graph.compile(
        checkpointer=PostgresCheckpointSaver.from_conn_string(
            os.getenv("DATABASE_URL", "postgresql://localhost/langgraph")
        ),
        debug=True,
        name="DeepResearchWorkflow",
    )

# 辅助工具函数
def get_research_topic(messages: List[BaseMessage]) -> str:
    """从消息历史中智能提取研究主题"""
    if not messages:
        return "未知研究主题"
    
    # 查找最后一条人类消息
    for message in reversed(messages):
        if isinstance(message, HumanMessage):
            content = message.content.strip()
            
            # 简单的主题提取逻辑
            if len(content) > 200:
                # 长消息,提取前100个字符作为主题
                return content[:100] + "..."
            else:
                return content
    
    return "未知研究主题"

def resolve_urls(grounding_chunks, search_id: int) -> List[Dict[str, str]]:
    """解析并优化URL引用
    
    处理Google Search API返回的引用信息:
    - 提取URL和标题信息
    - 生成短链接标记
    - 创建格式化的引用格式
    """
    resolved_urls = []
    
    for idx, chunk in enumerate(grounding_chunks):
        if hasattr(chunk, 'web') and chunk.web:
            url = chunk.web.uri
            title = getattr(chunk.web, 'title', f"来源 {idx + 1}")
            
            # 生成唯一的短链接标记
            short_url = f"[{search_id}-{idx}]"
            
            resolved_urls.append({
                "url": url,
                "title": title,
                "short_url": short_url,
                "value": f"[{title}]({url})",
                "chunk_index": idx,
            })
    
    return resolved_urls

def get_citations(response, resolved_urls: List[Dict]) -> List[Dict]:
    """提取和格式化引用信息"""
    citations = []
    
    for url_info in resolved_urls:
        citations.append({
            "url": url_info["url"],
            "title": url_info["title"],
            "short_url": url_info["short_url"],
            "referenced_text": "",  # 可以添加引用的具体文本片段
        })
    
    return citations

def insert_citation_markers(text: str, citations: List[Dict]) -> str:
    """在文本中智能插入引用标记"""
    modified_text = text
    
    # 简单的引用插入策略
    # 实际应用中可以使用更复杂的NLP技术来精确定位引用位置
    for citation in citations:
        original_url = citation["url"]
        short_marker = citation["short_url"]
        
        if original_url in modified_text:
            modified_text = modified_text.replace(original_url, short_marker)
    
    return modified_text

def _calculate_relevance_score(
    url_info: Dict, 
    search_query: str, 
    research_context: Dict
) -> float:
    """计算来源的相关性评分"""
    score = 0.5  # 基础分数
    
    # 基于标题相关性
    title = url_info.get("title", "").lower()
    query_terms = search_query.lower().split()
    
    title_matches = sum(1 for term in query_terms if term in title)
    score += (title_matches / len(query_terms)) * 0.3
    
    # 基于URL权威性
    url = url_info.get("url", "")
    if any(domain in url for domain in [".edu", ".gov", ".org"]):
        score += 0.2
    
    # 基于内容长度(更长的内容通常更详细)
    content_length = len(url_info.get("value", ""))
    if content_length > 200:
        score += 0.1
    
    return min(score, 1.0)

2. 多智能体协作模式

2.1 分层多智能体系统

基于实际应用案例,展示专业的多智能体协作架构:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class MultiAgentResearchSystem:
    """多智能体研究系统:实现专业分工和协作"""
    
    def __init__(self):
        self.coordinator = None
        self.specialists = {}
        self.coordination_graph = None
        
    def build_hierarchical_research_team(self) -> CompiledStateGraph:
        """构建分层研究团队"""
        
        class TeamState(TypedDict):
            messages: Annotated[list, add_messages]
            current_task: Optional[str]
            task_queue: List[Dict[str, Any]]
            specialist_results: Dict[str, Any]
            coordination_history: List[Dict[str, Any]]
            research_plan: Optional[Dict[str, Any]]
        
        graph = StateGraph(TeamState)
        
        # 协调者智能体:负责任务分解和团队协调
        def coordinator_agent(state: TeamState) -> Dict[str, Any]:
            """协调者智能体:任务分解、分配和结果整合"""
            
            messages = state["messages"]
            current_task = state.get("current_task")
            
            if not current_task:
                # 初始任务分解
                user_request = messages[-1].content if messages else ""
                
                # 分析任务复杂度和专业需求
                task_analysis = self._analyze_task_requirements(user_request)
                
                # 生成研究计划
                research_plan = self._create_research_plan(task_analysis)
                
                # 分解为子任务
                subtasks = self._decompose_into_subtasks(research_plan)
                
                return {
                    "research_plan": research_plan,
                    "task_queue": subtasks,
                    "current_task": subtasks[0] if subtasks else None,
                    "coordination_history": [{
                        "action": "task_decomposition",
                        "plan": research_plan,
                        "subtasks_count": len(subtasks),
                        "timestamp": time.time(),
                    }],
                }
            else:
                # 处理专家返回的结果
                specialist_results = state.get("specialist_results", {})
                task_queue = state.get("task_queue", [])
                
                if specialist_results and task_queue:
                    # 记录当前任务完成
                    completed_task = task_queue[0]
                    remaining_tasks = task_queue[1:]
                    
                    coordination_entry = {
                        "action": "task_completion",
                        "completed_task": completed_task,
                        "specialist": completed_task.get("assigned_specialist"),
                        "result_summary": specialist_results.get("summary", ""),
                        "timestamp": time.time(),
                    }
                    
                    if remaining_tasks:
                        # 还有待处理任务
                        return {
                            "current_task": remaining_tasks[0],
                            "task_queue": remaining_tasks,
                            "coordination_history": state.get("coordination_history", []) + [coordination_entry],
                        }
                    else:
                        # 所有任务完成,整合最终结果
                        final_result = self._integrate_specialist_results(
                            state["specialist_results"], 
                            state["research_plan"]
                        )
                        
                        return {
                            "messages": [AIMessage(content=final_result)],
                            "coordination_history": state.get("coordination_history", []) + [coordination_entry],
                            "research_completed": True,
                        }
                
                return {"current_task": None}  # 异常情况处理
        
        # 数据分析专家
        def data_analyst_agent(state: TeamState) -> Dict[str, Any]:
            """数据分析专家:处理数据分析和统计任务"""
            current_task = state.get("current_task", {})
            
            if current_task.get("type") != "data_analysis":
                return {}  # 不是数据分析任务
            
            # 执行数据分析
            analysis_result = self._perform_data_analysis(current_task)
            
            return {
                "specialist_results": {
                    "type": "data_analysis",
                    "summary": analysis_result["summary"],
                    "details": analysis_result["details"],
                    "visualizations": analysis_result.get("charts", []),
                    "confidence": analysis_result.get("confidence", 0.8),
                }
            }
        
        # 代码生成专家
        def code_generator_agent(state: TeamState) -> Dict[str, Any]:
            """代码生成专家:处理编程和技术实现任务"""
            current_task = state.get("current_task", {})
            
            if current_task.get("type") != "code_generation":
                return {}
            
            # 执行代码生成
            code_result = self._generate_code_solution(current_task)
            
            return {
                "specialist_results": {
                    "type": "code_generation",
                    "summary": code_result["summary"],
                    "code": code_result["code"],
                    "tests": code_result.get("tests", []),
                    "documentation": code_result.get("docs", ""),
                }
            }
        
        # 质量保证专家
        def qa_specialist_agent(state: TeamState) -> Dict[str, Any]:
            """质量保证专家:验证结果质量和准确性"""
            current_task = state.get("current_task", {})
            
            if current_task.get("type") != "quality_assurance":
                return {}
            
            # 执行质量检查
            qa_result = self._perform_quality_assurance(
                current_task, 
                state.get("specialist_results", {})
            )
            
            return {
                "specialist_results": {
                    "type": "quality_assurance", 
                    "summary": qa_result["summary"],
                    "issues_found": qa_result["issues"],
                    "recommendations": qa_result["recommendations"],
                    "quality_score": qa_result["score"],
                }
            }
        
        # 添加所有智能体节点
        graph.add_node("coordinator", coordinator_agent)
        graph.add_node("data_analyst", data_analyst_agent)
        graph.add_node("code_generator", code_generator_agent)
        graph.add_node("qa_specialist", qa_specialist_agent)
        
        # 设置协作流程
        graph.set_entry_point("coordinator")
        
        # 智能路由:根据任务类型分配给相应专家
        def route_to_specialist(state: TeamState) -> str:
            """智能路由到专业智能体"""
            current_task = state.get("current_task", {})
            task_type = current_task.get("type", "unknown")
            
            routing_map = {
                "data_analysis": "data_analyst",
                "code_generation": "code_generator", 
                "quality_assurance": "qa_specialist",
            }
            
            return routing_map.get(task_type, END)
        
        graph.add_conditional_edges(
            "coordinator",
            route_to_specialist,
            {
                "data_analyst": "data_analyst",
                "code_generator": "code_generator",
                "qa_specialist": "qa_specialist",
                END: END,
            }
        )
        
        # 专家完成后返回协调者
        for specialist in ["data_analyst", "code_generator", "qa_specialist"]:
            graph.add_edge(specialist, "coordinator")
        
        return graph.compile(
            checkpointer=PostgresCheckpointSaver.from_conn_string(
                os.getenv("DATABASE_URL")
            ),
            name="MultiAgentResearchTeam",
        )
    
    def _analyze_task_requirements(self, user_request: str) -> Dict[str, Any]:
        """分析任务需求和复杂度"""
        # 使用NLP技术分析任务特征
        task_features = {
            "contains_data": any(term in user_request.lower() for term in ["data", "statistics", "numbers", "chart"]),
            "requires_code": any(term in user_request.lower() for term in ["code", "programming", "implementation", "algorithm"]),
            "needs_qa": any(term in user_request.lower() for term in ["test", "verify", "validate", "check"]),
            "complexity_level": self._assess_complexity_level(user_request),
        }
        
        return task_features
    
    def _create_research_plan(self, task_analysis: Dict[str, Any]) -> Dict[str, Any]:
        """基于任务分析创建研究计划"""
        plan = {
            "phases": [],
            "estimated_duration": 0,
            "required_specialists": [],
        }
        
        if task_analysis["contains_data"]:
            plan["phases"].append("data_analysis")
            plan["required_specialists"].append("data_analyst")
            plan["estimated_duration"] += 30  # 分钟
        
        if task_analysis["requires_code"]:
            plan["phases"].append("code_generation")
            plan["required_specialists"].append("code_generator")
            plan["estimated_duration"] += 45
        
        if task_analysis["needs_qa"]:
            plan["phases"].append("quality_assurance")
            plan["required_specialists"].append("qa_specialist")
            plan["estimated_duration"] += 20
        
        return plan

3. 企业级模式和最佳实践

3.1 故障恢复和容错机制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class ResilientWorkflowManager:
    """弹性工作流管理器:企业级故障恢复和容错"""
    
    def __init__(self, graph: CompiledStateGraph):
        self.graph = graph
        self.failure_tracker = defaultdict(int)
        self.recovery_strategies = self._setup_recovery_strategies()
        
    async def execute_with_resilience(
        self,
        input_data: Dict[str, Any],
        config: RunnableConfig,
        max_retries: int = 3,
    ) -> Dict[str, Any]:
        """弹性执行:支持自动故障恢复和重试"""
        
        retry_count = 0
        last_checkpoint = None
        
        while retry_count < max_retries:
            try:
                # 尝试从检查点恢复
                if last_checkpoint:
                    config = {
                        **config,
                        "configurable": {
                            **config.get("configurable", {}),
                            "checkpoint_id": last_checkpoint,
                        }
                    }
                    input_data = None  # 从检查点恢复时不需要新输入
                
                # 执行工作流
                final_state = None
                async for state in self.graph.astream(input_data, config):
                    final_state = state
                    last_checkpoint = state.get("checkpoint_id")
                
                return final_state
                
            except Exception as e:
                retry_count += 1
                error_type = type(e).__name__
                self.failure_tracker[error_type] += 1
                
                logger.warning(f"工作流执行失败 (尝试 {retry_count}/{max_retries}): {e}")
                
                # 应用恢复策略
                recovery_action = self.recovery_strategies.get(error_type, "retry")
                
                if recovery_action == "skip_node":
                    # 跳过失败的节点
                    config = self._configure_node_skip(config, e)
                elif recovery_action == "fallback_model":
                    # 切换到备用模型
                    config = self._configure_fallback_model(config, e)
                elif recovery_action == "reduce_complexity":
                    # 降低任务复杂度
                    input_data = self._reduce_task_complexity(input_data, e)
                
                # 指数退避
                await asyncio.sleep(2 ** retry_count)
        
        # 所有重试都失败
        raise WorkflowExecutionError(
            f"工作流执行失败,已重试 {max_retries} 次",
            failure_history=dict(self.failure_tracker)
        )

4. 总结

通过整合网上深度技术文章的内容,我们看到LangGraph在实际应用中展现出的强大能力:

4.1 技术优势

  • 配置驱动架构:通过langgraph.json实现声明式的图管理
  • 智能状态路由:Command和Send机制支持复杂的控制流
  • 反思式迭代:内置的质量控制和自我改进能力
  • 企业级特性:完整的监控、安全、扩缩容支持

4.2 应用场景

  • 深度研究系统:多轮迭代的信息收集和分析
  • 智能客服平台:多智能体协作的客户服务
  • 代码生成工具:反思式的代码生成和优化
  • 法律文档分析:专业领域的结构化信息提取

4.3 发展方向

  1. 更智能的路由算法:基于机器学习的动态路由优化
  2. 增强的可观测性:更精细的性能监控和调试工具
  3. 云原生优化:更好的容器化和微服务支持
  4. 行业标准化:推动多智能体应用的标准化发展

通过深入理解这些高级模式和最佳实践,开发者能够充分发挥LangGraph的潜力,构建真正具有生产价值的智能体应用系统。


创建时间: 2025年09月13日

本文由 tommie blog 原创发布