概述

1. 深度研究系统架构

1.1 完整的研究工作流实现

基于实际的深度研究系统,展示LangGraph在复杂多阶段任务中的应用:

from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field
import operator
from datetime import datetime

class OverallState(TypedDict):
    """研究系统的整体状态模式
    
    这个状态设计展示了LangGraph在复杂工作流中的状态管理能力:
    - 使用Annotated类型定义累积行为
    - 支持多轮迭代的状态追踪  
    - 集成配置参数和运行时状态
    """
    messages: Annotated[list, add_messages]                # 对话消息累积
    search_query: Annotated[list, operator.add]           # 搜索查询累积
    web_research_result: Annotated[list, operator.add]    # 研究结果累积
    sources_gathered: Annotated[list, operator.add]       # 来源信息累积
    initial_search_query_count: int                       # 初始查询数量
    max_research_loops: int                               # 最大研究循环次数
    research_loop_count: int                              # 当前循环次数
    reasoning_model: str                                  # 推理模型名称

class SearchQueryList(BaseModel):
    """搜索查询列表模型:确保查询生成的结构化输出"""
    query: List[str] = Field(description="优化的搜索查询列表,每个查询关注不同角度")

class Reflection(BaseModel):
    """反思分析结果模型:支持知识缺口分析和迭代决策"""
    is_sufficient: bool = Field(description="当前信息是否足够回答问题")
    knowledge_gap: List[str] = Field(description="识别的知识缺口列表")
    follow_up_queries: List[str] = Field(description="针对知识缺口的后续查询建议")
    confidence_score: float = Field(description="答案置信度评分", ge=0.0, le=1.0)

async def run_agent_workflow_async(
    user_input: str,
    debug: bool = False,
    max_plan_iterations: int = 3,
    max_step_num: int = 5,
    enable_background_investigation: bool = True,
) -> Dict[str, Any]:
    """异步运行代理工作流,处理用户输入
    
    这是一个生产级的研究工作流实现,展示了LangGraph在复杂
    多阶段任务中的应用能力,包括:
    - 智能查询生成
    - 并行信息收集
    - 反思式质量控制
    - 迭代优化机制
    
    Args:
        user_input: 用户的查询或请求
        debug: 是否启用调试级别日志
        max_plan_iterations: 最大计划迭代次数
        max_step_num: 计划中的最大步骤数
        enable_background_investigation: 是否在规划前进行网络搜索
        
    Returns:
        Dict[str, Any]: 工作流完成后的最终状态,包含研究结果和来源
    """
    
    # 初始化研究状态
    initial_state = OverallState(
        messages=[HumanMessage(content=user_input)],
        search_query=[],
        web_research_result=[],
        sources_gathered=[],
        initial_search_query_count=max_step_num,
        max_research_loops=max_plan_iterations,
        research_loop_count=0,
        reasoning_model="gemini-2.0-flash-exp",
    )
    
    # 构建研究工作流图
    research_graph = build_research_workflow()
    
    # 配置运行环境
    config = RunnableConfig(
        configurable={
            "thread_id": f"research_{int(time.time())}",
            "user_id": "anonymous", 
            "query_generator_model": "gemini-2.0-flash-exp",
            "enable_tracing": debug,
        },
        callbacks=[
            LangfuseCallbackHandler() if debug else None,
            ConsoleCallbackHandler() if debug else None,
        ]
    )
    
    # 异步执行工作流
    final_state = None
    execution_steps = []
    
    async for state_update in research_graph.astream(
        initial_state,
        config=config,
        stream_mode="updates"
    ):
        execution_steps.append({
            "timestamp": time.time(),
            "update": state_update,
        })
        
        if debug:
            for node_name, node_state in state_update.items():
                print(f"📍 节点 '{node_name}' 执行完成")
                
                # 显示关键状态变化
                if "messages" in node_state and node_state["messages"]:
                    latest_message = node_state["messages"][-1]
                    preview = latest_message.content[:200] + "..." if len(latest_message.content) > 200 else latest_message.content
                    print(f"💬 输出预览: {preview}")
                
                if "search_query" in node_state:
                    print(f"🔍 新增查询: {node_state['search_query']}")
                
                if "sources_gathered" in node_state:
                    print(f"📚 收集来源: {len(node_state['sources_gathered'])} 个")
        
        final_state = state_update
    
    # 添加执行统计信息
    if final_state:
        final_state["execution_stats"] = {
            "total_steps": len(execution_steps),
            "total_duration": execution_steps[-1]["timestamp"] - execution_steps[0]["timestamp"] if execution_steps else 0,
            "queries_executed": len(final_state.get("search_query", [])),
            "sources_collected": len(final_state.get("sources_gathered", [])),
        }
    
    return final_state

def build_research_workflow() -> CompiledStateGraph:
    """构建深度研究工作流图
    
    该图实现了一个完整的研究流程:
    1. 查询生成:将用户问题转换为多个搜索查询
    2. 并行搜索:同时执行多个搜索任务
    3. 结果收集:汇总所有搜索结果
    4. 反思分析:评估信息充分性
    5. 迭代优化:根据反思结果决定是否需要更多信息
    6. 答案综合:生成最终的综合答案
    """
    graph = StateGraph(OverallState)
    
    # 查询生成节点
    def generate_query(state: OverallState, config: RunnableConfig) -> Dict[str, Any]:
        """智能查询生成:将研究主题分解为多个搜索角度"""
        configurable = config.get("configurable", {})
        query_model = configurable.get("query_generator_model", "gemini-2.0-flash-exp")
        
        # 初始化查询生成模型
        llm = ChatGoogleGenerativeAI(
            model=query_model,
            temperature=1.0,  # 提高查询多样性
            max_retries=2,
            api_key=os.getenv("GEMINI_API_KEY"),
        )
        structured_llm = llm.with_structured_output(SearchQueryList)
        
        # 构建上下文感知的查询生成提示
        research_topic = get_research_topic(state["messages"])
        current_date = datetime.now().strftime("%Y-%m-%d")
        previous_queries = state.get("search_query", [])
        
        query_instructions = f"""当前日期: {current_date}

研究主题: {research_topic}

之前已执行的查询(避免重复):
{chr(10).join(f"- {q}" for q in previous_queries[-5:]) if previous_queries else "无"}

请生成 {state["initial_search_query_count"]} 个新的优化搜索查询,要求:

1. **多角度覆盖**:从不同角度和层面分析主题
2. **时效性考虑**:包含最新信息和趋势分析
3. **深度挖掘**:不仅获取基础信息,还要深入技术细节
4. **专业术语**:使用领域专业术语提高搜索精度
5. **避免重复**:确保与之前查询不重复

每个查询应该是独立且具体的,能够获得有价值的信息片段。"""
        
        result = structured_llm.invoke(query_instructions)
        
        return {
            "query_list": result.query,
            "query_generation_completed": True,
            "query_generated_at": time.time(),
        }
    
    # 并行搜索分发节点
    def continue_to_web_research(state: OverallState) -> List[Send]:
        """启动并行网络搜索
        
        使用LangGraph的Send机制实现真正的并行搜索,
        每个查询都会启动一个独立的搜索任务
        """
        query_list = state.get("query_list", [])
        
        return [
            Send("web_research", {
                "search_query": search_query,
                "id": int(idx),
                "total_queries": len(query_list),
                "research_context": {
                    "main_topic": get_research_topic(state["messages"]),
                    "current_loop": state.get("research_loop_count", 0),
                }
            })
            for idx, search_query in enumerate(query_list)
        ]
    
    # 网络搜索执行节点
    def web_research(state: Dict[str, Any], config: RunnableConfig) -> OverallState:
        """执行单个搜索查询的网络研究
        
        集成Google Search API和Gemini模型,实现:
        - 智能搜索查询优化
        - 自动引用提取和格式化
        - URL优化和短链接生成
        """
        search_query = state["search_query"]
        search_id = state["id"]
        research_context = state.get("research_context", {})
        
        # 获取模型配置
        configurable = config.get("configurable", {})
        model_name = configurable.get("query_generator_model", "gemini-2.0-flash-exp")
        
        # 构建搜索上下文
        search_prompt = f"""
使用Google Search API搜索以下查询并提供详细分析:

查询: {search_query}
主题背景: {research_context.get("main_topic", "未知")}

搜索要求:
1. 使用多个相关关键词组合进行搜索
2. 优先选择权威可信的信息源
3. 提取关键事实、数据和观点
4. 分析信息的时效性和相关性
5. 总结核心发现和洞察

请提供结构化的研究结果,包含引用来源。
"""
        
        # 执行搜索
        genai_client = genai.GenerativeModel(model_name)
        response = genai_client.generate_content(
            search_prompt,
            tools=[{"google_search": {}}],
            config={"temperature": 0}  # 确保搜索结果的一致性
        )
        
        # 处理搜索结果和引用
        resolved_urls = resolve_urls(
            response.candidates[0].grounding_metadata.grounding_chunks, 
            search_id
        )
        citations = get_citations(response, resolved_urls)
        modified_text = insert_citation_markers(response.text, citations)
        
        # 构建结构化的来源信息
        sources_gathered = [
            {
                "url": url_info["url"],
                "title": url_info.get("title", "未知标题"),
                "short_url": url_info["short_url"],
                "value": url_info["value"],
                "search_id": search_id,
                "search_query": search_query,
                "collected_at": time.time(),
                "relevance_score": _calculate_relevance_score(
                    url_info, search_query, research_context
                ),
            }
            for url_info in resolved_urls
        ]
        
        return {
            "sources_gathered": sources_gathered,
            "search_query": [search_query],
            "web_research_result": [modified_text],
        }
    
    # 反思分析节点
    def reflection(state: OverallState, config: RunnableConfig) -> Dict[str, Any]:
        """反思分析:评估信息充分性并识别知识缺口
        
        这是LangGraph反思机制的核心实现,支持:
        - 自动评估研究结果的完整性
        - 识别知识缺口和信息不足的领域
        - 生成针对性的后续查询建议
        - 质量控制和迭代决策
        """
        reasoning_model = state.get("reasoning_model", "gemini-2.0-flash-exp")
        research_topic = get_research_topic(state["messages"])
        summaries = "\n\n---\n\n".join(state["web_research_result"])
        current_date = datetime.now().strftime("%Y-%m-%d")
        
        # 构建深度反思提示
        reflection_instructions = f"""作为研究分析专家,请对以下研究结果进行深度反思分析:

当前日期: {current_date}
研究主题: {research_topic}
已完成搜索次数: {len(state["search_query"])}
当前研究循环: {state.get("research_loop_count", 0)}

研究结果摘要:
{summaries}

请进行反思分析:

1. **信息完整性评估**:
   - 是否覆盖了主题的核心方面?
   - 是否存在明显的信息空白?
   - 不同来源的信息是否一致?

2. **知识缺口识别**:
   - 哪些重要问题尚未得到充分回答?
   - 需要哪些类型的补充信息?
   - 是否需要更专业或更新的信息?

3. **信息质量评估**:
   - 来源的权威性和可信度如何?
   - 信息的时效性是否满足要求?
   - 是否存在相互矛盾的信息?

4. **后续行动建议**:
   - 如果信息不充分,建议具体的后续查询
   - 优先级排序和搜索策略建议

请提供结构化的分析结果。"""
        
        # 执行反思分析
        llm = ChatGoogleGenerativeAI(
            model=reasoning_model,
            temperature=0.3,  # 平衡创造性和一致性
            max_retries=2,
            api_key=os.getenv("GEMINI_API_KEY"),
        )
        
        result = llm.with_structured_output(Reflection).invoke(reflection_instructions)
        
        return {
            "is_sufficient": result.is_sufficient,
            "knowledge_gap": result.knowledge_gap,
            "follow_up_queries": result.follow_up_queries,
            "confidence_score": result.confidence_score,
            "research_loop_count": state["research_loop_count"] + 1,
            "reflection_completed_at": time.time(),
        }
    
    # 最终答案综合节点
    def finalize_answer(state: OverallState, config: RunnableConfig) -> Dict[str, Any]:
        """综合所有研究结果生成最终答案
        
        实现智能的信息综合和答案生成:
        - 整合多个来源的信息
        - 生成结构化的综合答案
        - 自动处理引用和来源标注
        - 质量评估和置信度计算
        """
        reasoning_model = state.get("reasoning_model", "gemini-2.0-flash-exp") 
        research_topic = get_research_topic(state["messages"])
        summaries = "\n---\n\n".join(state["web_research_result"])
        current_date = datetime.now().strftime("%Y-%m-%d")
        
        # 构建综合答案生成提示
        answer_instructions = f"""作为研究分析师,请基于以下研究结果生成全面、准确的答案:

当前日期: {current_date}
研究主题: {research_topic}
研究循环次数: {state.get("research_loop_count", 0)}
信息来源数量: {len(state.get("sources_gathered", []))}

研究结果详情:
{summaries}

答案生成要求:

1. **结构化组织**:
   - 使用清晰的标题和子标题
   - 逻辑性强的信息组织
   - 重点突出关键发现

2. **客观性和平衡性**:
   - 呈现多种观点和角度
   - 避免偏见和主观判断
   - 承认不确定性和争议

3. **引用和来源**:
   - 明确标注信息来源
   - 使用内联引用格式
   - 提供完整的参考文献列表

4. **实用性**:
   - 直接回答用户的核心问题
   - 提供可操作的建议和结论
   - 突出关键要点和影响

请生成专业、全面的研究报告。"""
        
        # 生成最终答案
        llm = ChatGoogleGenerativeAI(
            model=reasoning_model,
            temperature=0,  # 确保答案的一致性和准确性
            max_retries=2,
            api_key=os.getenv("GEMINI_API_KEY"),
        )
        
        result = llm.invoke(answer_instructions)
        
        # 处理引用链接替换
        unique_sources = []
        answer_content = result.content
        
        for source in state["sources_gathered"]:
            if source["short_url"] in answer_content:
                # 将短链接替换为完整引用
                answer_content = answer_content.replace(
                    source["short_url"], 
                    source["value"]
                )
                unique_sources.append(source)
        
        # 计算答案质量指标
        quality_metrics = {
            "sources_cited": len(unique_sources),
            "content_length": len(answer_content),
            "research_depth": state.get("research_loop_count", 0),
            "confidence_score": state.get("confidence_score", 0.8),
        }
        
        return {
            "messages": [AIMessage(content=answer_content)],
            "sources_gathered": unique_sources,
            "research_completed": True,
            "quality_metrics": quality_metrics,
            "final_answer_generated_at": time.time(),
        }
    
    # 构建图结构和流程控制
    graph.add_node("generate_query", generate_query)
    graph.add_node("continue_to_web_research", continue_to_web_research)
    graph.add_node("web_research", web_research)
    graph.add_node("reflection", reflection)
    graph.add_node("finalize_answer", finalize_answer)
    
    # 设置流程路径
    graph.set_entry_point("generate_query")
    graph.add_edge("generate_query", "continue_to_web_research")
    graph.add_edge("continue_to_web_research", "web_research")
    graph.add_edge("web_research", "reflection")
    
    # 智能条件路由:基于反思结果决定下一步
    def should_continue_research(state: OverallState) -> str:
        """决定是否继续研究的智能条件函数"""
        is_sufficient = state.get("is_sufficient", False)
        research_count = state.get("research_loop_count", 0)
        max_loops = state.get("max_research_loops", 3)
        confidence = state.get("confidence_score", 0)
        
        # 多重条件判断
        if is_sufficient and confidence > 0.7:
            return "finalize_answer"
        elif research_count >= max_loops:
            # 达到最大循环次数,强制结束
            return "finalize_answer"
        elif len(state.get("knowledge_gap", [])) == 0:
            # 没有识别到知识缺口
            return "finalize_answer"
        else:
            # 继续研究
            return "generate_query"
    
    graph.add_conditional_edges(
        "reflection",
        should_continue_research,
        {
            "generate_query": "generate_query",
            "finalize_answer": "finalize_answer",
        }
    )
    
    graph.set_finish_point("finalize_answer")
    
    return graph.compile(
        checkpointer=PostgresCheckpointSaver.from_conn_string(
            os.getenv("DATABASE_URL", "postgresql://localhost/langgraph")
        ),
        debug=True,
        name="DeepResearchWorkflow",
    )

# 辅助工具函数
def get_research_topic(messages: List[BaseMessage]) -> str:
    """从消息历史中智能提取研究主题"""
    if not messages:
        return "未知研究主题"
    
    # 查找最后一条人类消息
    for message in reversed(messages):
        if isinstance(message, HumanMessage):
            content = message.content.strip()
            
            # 简单的主题提取逻辑
            if len(content) > 200:
                # 长消息,提取前100个字符作为主题
                return content[:100] + "..."
            else:
                return content
    
    return "未知研究主题"

def resolve_urls(grounding_chunks, search_id: int) -> List[Dict[str, str]]:
    """解析并优化URL引用
    
    处理Google Search API返回的引用信息:
    - 提取URL和标题信息
    - 生成短链接标记
    - 创建格式化的引用格式
    """
    resolved_urls = []
    
    for idx, chunk in enumerate(grounding_chunks):
        if hasattr(chunk, 'web') and chunk.web:
            url = chunk.web.uri
            title = getattr(chunk.web, 'title', f"来源 {idx + 1}")
            
            # 生成唯一的短链接标记
            short_url = f"[{search_id}-{idx}]"
            
            resolved_urls.append({
                "url": url,
                "title": title,
                "short_url": short_url,
                "value": f"[{title}]({url})",
                "chunk_index": idx,
            })
    
    return resolved_urls

def get_citations(response, resolved_urls: List[Dict]) -> List[Dict]:
    """提取和格式化引用信息"""
    citations = []
    
    for url_info in resolved_urls:
        citations.append({
            "url": url_info["url"],
            "title": url_info["title"],
            "short_url": url_info["short_url"],
            "referenced_text": "",  # 可以添加引用的具体文本片段
        })
    
    return citations

def insert_citation_markers(text: str, citations: List[Dict]) -> str:
    """在文本中智能插入引用标记"""
    modified_text = text
    
    # 简单的引用插入策略
    # 实际应用中可以使用更复杂的NLP技术来精确定位引用位置
    for citation in citations:
        original_url = citation["url"]
        short_marker = citation["short_url"]
        
        if original_url in modified_text:
            modified_text = modified_text.replace(original_url, short_marker)
    
    return modified_text

def _calculate_relevance_score(
    url_info: Dict, 
    search_query: str, 
    research_context: Dict
) -> float:
    """计算来源的相关性评分"""
    score = 0.5  # 基础分数
    
    # 基于标题相关性
    title = url_info.get("title", "").lower()
    query_terms = search_query.lower().split()
    
    title_matches = sum(1 for term in query_terms if term in title)
    score += (title_matches / len(query_terms)) * 0.3
    
    # 基于URL权威性
    url = url_info.get("url", "")
    if any(domain in url for domain in [".edu", ".gov", ".org"]):
        score += 0.2
    
    # 基于内容长度(更长的内容通常更详细)
    content_length = len(url_info.get("value", ""))
    if content_length > 200:
        score += 0.1
    
    return min(score, 1.0)

2. 多智能体协作模式

2.1 分层多智能体系统

基于实际应用案例,展示专业的多智能体协作架构:

class MultiAgentResearchSystem:
    """多智能体研究系统:实现专业分工和协作"""
    
    def __init__(self):
        self.coordinator = None
        self.specialists = {}
        self.coordination_graph = None
        
    def build_hierarchical_research_team(self) -> CompiledStateGraph:
        """构建分层研究团队"""
        
        class TeamState(TypedDict):
            messages: Annotated[list, add_messages]
            current_task: Optional[str]
            task_queue: List[Dict[str, Any]]
            specialist_results: Dict[str, Any]
            coordination_history: List[Dict[str, Any]]
            research_plan: Optional[Dict[str, Any]]
        
        graph = StateGraph(TeamState)
        
        # 协调者智能体:负责任务分解和团队协调
        def coordinator_agent(state: TeamState) -> Dict[str, Any]:
            """协调者智能体:任务分解、分配和结果整合"""
            
            messages = state["messages"]
            current_task = state.get("current_task")
            
            if not current_task:
                # 初始任务分解
                user_request = messages[-1].content if messages else ""
                
                # 分析任务复杂度和专业需求
                task_analysis = self._analyze_task_requirements(user_request)
                
                # 生成研究计划
                research_plan = self._create_research_plan(task_analysis)
                
                # 分解为子任务
                subtasks = self._decompose_into_subtasks(research_plan)
                
                return {
                    "research_plan": research_plan,
                    "task_queue": subtasks,
                    "current_task": subtasks[0] if subtasks else None,
                    "coordination_history": [{
                        "action": "task_decomposition",
                        "plan": research_plan,
                        "subtasks_count": len(subtasks),
                        "timestamp": time.time(),
                    }],
                }
            else:
                # 处理专家返回的结果
                specialist_results = state.get("specialist_results", {})
                task_queue = state.get("task_queue", [])
                
                if specialist_results and task_queue:
                    # 记录当前任务完成
                    completed_task = task_queue[0]
                    remaining_tasks = task_queue[1:]
                    
                    coordination_entry = {
                        "action": "task_completion",
                        "completed_task": completed_task,
                        "specialist": completed_task.get("assigned_specialist"),
                        "result_summary": specialist_results.get("summary", ""),
                        "timestamp": time.time(),
                    }
                    
                    if remaining_tasks:
                        # 还有待处理任务
                        return {
                            "current_task": remaining_tasks[0],
                            "task_queue": remaining_tasks,
                            "coordination_history": state.get("coordination_history", []) + [coordination_entry],
                        }
                    else:
                        # 所有任务完成,整合最终结果
                        final_result = self._integrate_specialist_results(
                            state["specialist_results"], 
                            state["research_plan"]
                        )
                        
                        return {
                            "messages": [AIMessage(content=final_result)],
                            "coordination_history": state.get("coordination_history", []) + [coordination_entry],
                            "research_completed": True,
                        }
                
                return {"current_task": None}  # 异常情况处理
        
        # 数据分析专家
        def data_analyst_agent(state: TeamState) -> Dict[str, Any]:
            """数据分析专家:处理数据分析和统计任务"""
            current_task = state.get("current_task", {})
            
            if current_task.get("type") != "data_analysis":
                return {}  # 不是数据分析任务
            
            # 执行数据分析
            analysis_result = self._perform_data_analysis(current_task)
            
            return {
                "specialist_results": {
                    "type": "data_analysis",
                    "summary": analysis_result["summary"],
                    "details": analysis_result["details"],
                    "visualizations": analysis_result.get("charts", []),
                    "confidence": analysis_result.get("confidence", 0.8),
                }
            }
        
        # 代码生成专家
        def code_generator_agent(state: TeamState) -> Dict[str, Any]:
            """代码生成专家:处理编程和技术实现任务"""
            current_task = state.get("current_task", {})
            
            if current_task.get("type") != "code_generation":
                return {}
            
            # 执行代码生成
            code_result = self._generate_code_solution(current_task)
            
            return {
                "specialist_results": {
                    "type": "code_generation",
                    "summary": code_result["summary"],
                    "code": code_result["code"],
                    "tests": code_result.get("tests", []),
                    "documentation": code_result.get("docs", ""),
                }
            }
        
        # 质量保证专家
        def qa_specialist_agent(state: TeamState) -> Dict[str, Any]:
            """质量保证专家:验证结果质量和准确性"""
            current_task = state.get("current_task", {})
            
            if current_task.get("type") != "quality_assurance":
                return {}
            
            # 执行质量检查
            qa_result = self._perform_quality_assurance(
                current_task, 
                state.get("specialist_results", {})
            )
            
            return {
                "specialist_results": {
                    "type": "quality_assurance", 
                    "summary": qa_result["summary"],
                    "issues_found": qa_result["issues"],
                    "recommendations": qa_result["recommendations"],
                    "quality_score": qa_result["score"],
                }
            }
        
        # 添加所有智能体节点
        graph.add_node("coordinator", coordinator_agent)
        graph.add_node("data_analyst", data_analyst_agent)
        graph.add_node("code_generator", code_generator_agent)
        graph.add_node("qa_specialist", qa_specialist_agent)
        
        # 设置协作流程
        graph.set_entry_point("coordinator")
        
        # 智能路由:根据任务类型分配给相应专家
        def route_to_specialist(state: TeamState) -> str:
            """智能路由到专业智能体"""
            current_task = state.get("current_task", {})
            task_type = current_task.get("type", "unknown")
            
            routing_map = {
                "data_analysis": "data_analyst",
                "code_generation": "code_generator", 
                "quality_assurance": "qa_specialist",
            }
            
            return routing_map.get(task_type, END)
        
        graph.add_conditional_edges(
            "coordinator",
            route_to_specialist,
            {
                "data_analyst": "data_analyst",
                "code_generator": "code_generator",
                "qa_specialist": "qa_specialist",
                END: END,
            }
        )
        
        # 专家完成后返回协调者
        for specialist in ["data_analyst", "code_generator", "qa_specialist"]:
            graph.add_edge(specialist, "coordinator")
        
        return graph.compile(
            checkpointer=PostgresCheckpointSaver.from_conn_string(
                os.getenv("DATABASE_URL")
            ),
            name="MultiAgentResearchTeam",
        )
    
    def _analyze_task_requirements(self, user_request: str) -> Dict[str, Any]:
        """分析任务需求和复杂度"""
        # 使用NLP技术分析任务特征
        task_features = {
            "contains_data": any(term in user_request.lower() for term in ["data", "statistics", "numbers", "chart"]),
            "requires_code": any(term in user_request.lower() for term in ["code", "programming", "implementation", "algorithm"]),
            "needs_qa": any(term in user_request.lower() for term in ["test", "verify", "validate", "check"]),
            "complexity_level": self._assess_complexity_level(user_request),
        }
        
        return task_features
    
    def _create_research_plan(self, task_analysis: Dict[str, Any]) -> Dict[str, Any]:
        """创建研究计划"""
        plan = {
            "phases": [],
            "estimated_duration": 0,
            "required_specialists": [],
        }
        
        if task_analysis["contains_data"]:
            plan["phases"].append("data_analysis")
            plan["required_specialists"].append("data_analyst")
            plan["estimated_duration"] += 30  # 分钟
        
        if task_analysis["requires_code"]:
            plan["phases"].append("code_generation")
            plan["required_specialists"].append("code_generator")
            plan["estimated_duration"] += 45
        
        if task_analysis["needs_qa"]:
            plan["phases"].append("quality_assurance")
            plan["required_specialists"].append("qa_specialist")
            plan["estimated_duration"] += 20
        
        return plan

3. 企业级模式和最佳实践

3.1 故障恢复和容错机制

class ResilientWorkflowManager:
    """弹性工作流管理器:企业级故障恢复和容错"""
    
    def __init__(self, graph: CompiledStateGraph):
        self.graph = graph
        self.failure_tracker = defaultdict(int)
        self.recovery_strategies = self._setup_recovery_strategies()
        
    async def execute_with_resilience(
        self,
        input_data: Dict[str, Any],
        config: RunnableConfig,
        max_retries: int = 3,
    ) -> Dict[str, Any]:
        """弹性执行:支持自动故障恢复和重试"""
        
        retry_count = 0
        last_checkpoint = None
        
        while retry_count < max_retries:
            try:
                # 尝试从检查点恢复
                if last_checkpoint:
                    config = {
                        **config,
                        "configurable": {
                            **config.get("configurable", {}),
                            "checkpoint_id": last_checkpoint,
                        }
                    }
                    input_data = None  # 从检查点恢复时不需要新输入
                
                # 执行工作流
                final_state = None
                async for state in self.graph.astream(input_data, config):
                    final_state = state
                    last_checkpoint = state.get("checkpoint_id")
                
                return final_state
                
            except Exception as e:
                retry_count += 1
                error_type = type(e).__name__
                self.failure_tracker[error_type] += 1
                
                logger.warning(f"工作流执行失败 (尝试 {retry_count}/{max_retries}): {e}")
                
                # 应用恢复策略
                recovery_action = self.recovery_strategies.get(error_type, "retry")
                
                if recovery_action == "skip_node":
                    # 跳过失败的节点
                    config = self._configure_node_skip(config, e)
                elif recovery_action == "fallback_model":
                    # 切换到备用模型
                    config = self._configure_fallback_model(config, e)
                elif recovery_action == "reduce_complexity":
                    # 降低任务复杂度
                    input_data = self._reduce_task_complexity(input_data, e)
                
                # 指数退避
                await asyncio.sleep(2 ** retry_count)
        
        # 所有重试都失败
        raise WorkflowExecutionError(
            f"工作流执行失败,已重试 {max_retries} 次",
            failure_history=dict(self.failure_tracker)
        )

4. 总结

通过整合章的内容,我们看到LangGraph在实际应用中展现出的强大能力:

4.1 技术优势

  • 配置驱动架构:通过langgraph.json实现声明式的图管理
  • 智能状态路由:Command和Send机制支持复杂的控制流
  • 反思式迭代:内置的质量控制和自我改进能力
  • 企业级特性:完整的监控、安全、扩缩容支持

4.2 应用场景

  • 深度研究系统:多轮迭代的信息收集和分析
  • 智能客服平台:多智能体协作的客户服务
  • 代码生成工具:反思式的代码生成和优化
  • 法律文档分析:专业领域的结构化信息提取

通过深入理解这些高级模式和最佳实践,开发者能够充分发挥LangGraph的潜力,构建真正具有生产价值的智能体应用系统。