概述
1. 深度研究系统架构
1.1 完整的研究工作流实现
基于实际的深度研究系统,展示LangGraph在复杂多阶段任务中的应用:
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field
import operator
from datetime import datetime
class OverallState(TypedDict):
"""研究系统的整体状态模式
这个状态设计展示了LangGraph在复杂工作流中的状态管理能力:
- 使用Annotated类型定义累积行为
- 支持多轮迭代的状态追踪
- 集成配置参数和运行时状态
"""
messages: Annotated[list, add_messages] # 对话消息累积
search_query: Annotated[list, operator.add] # 搜索查询累积
web_research_result: Annotated[list, operator.add] # 研究结果累积
sources_gathered: Annotated[list, operator.add] # 来源信息累积
initial_search_query_count: int # 初始查询数量
max_research_loops: int # 最大研究循环次数
research_loop_count: int # 当前循环次数
reasoning_model: str # 推理模型名称
class SearchQueryList(BaseModel):
"""搜索查询列表模型:确保查询生成的结构化输出"""
query: List[str] = Field(description="优化的搜索查询列表,每个查询关注不同角度")
class Reflection(BaseModel):
"""反思分析结果模型:支持知识缺口分析和迭代决策"""
is_sufficient: bool = Field(description="当前信息是否足够回答问题")
knowledge_gap: List[str] = Field(description="识别的知识缺口列表")
follow_up_queries: List[str] = Field(description="针对知识缺口的后续查询建议")
confidence_score: float = Field(description="答案置信度评分", ge=0.0, le=1.0)
async def run_agent_workflow_async(
user_input: str,
debug: bool = False,
max_plan_iterations: int = 3,
max_step_num: int = 5,
enable_background_investigation: bool = True,
) -> Dict[str, Any]:
"""异步运行代理工作流,处理用户输入
这是一个生产级的研究工作流实现,展示了LangGraph在复杂
多阶段任务中的应用能力,包括:
- 智能查询生成
- 并行信息收集
- 反思式质量控制
- 迭代优化机制
Args:
user_input: 用户的查询或请求
debug: 是否启用调试级别日志
max_plan_iterations: 最大计划迭代次数
max_step_num: 计划中的最大步骤数
enable_background_investigation: 是否在规划前进行网络搜索
Returns:
Dict[str, Any]: 工作流完成后的最终状态,包含研究结果和来源
"""
# 初始化研究状态
initial_state = OverallState(
messages=[HumanMessage(content=user_input)],
search_query=[],
web_research_result=[],
sources_gathered=[],
initial_search_query_count=max_step_num,
max_research_loops=max_plan_iterations,
research_loop_count=0,
reasoning_model="gemini-2.0-flash-exp",
)
# 构建研究工作流图
research_graph = build_research_workflow()
# 配置运行环境
config = RunnableConfig(
configurable={
"thread_id": f"research_{int(time.time())}",
"user_id": "anonymous",
"query_generator_model": "gemini-2.0-flash-exp",
"enable_tracing": debug,
},
callbacks=[
LangfuseCallbackHandler() if debug else None,
ConsoleCallbackHandler() if debug else None,
]
)
# 异步执行工作流
final_state = None
execution_steps = []
async for state_update in research_graph.astream(
initial_state,
config=config,
stream_mode="updates"
):
execution_steps.append({
"timestamp": time.time(),
"update": state_update,
})
if debug:
for node_name, node_state in state_update.items():
print(f"📍 节点 '{node_name}' 执行完成")
# 显示关键状态变化
if "messages" in node_state and node_state["messages"]:
latest_message = node_state["messages"][-1]
preview = latest_message.content[:200] + "..." if len(latest_message.content) > 200 else latest_message.content
print(f"💬 输出预览: {preview}")
if "search_query" in node_state:
print(f"🔍 新增查询: {node_state['search_query']}")
if "sources_gathered" in node_state:
print(f"📚 收集来源: {len(node_state['sources_gathered'])} 个")
final_state = state_update
# 添加执行统计信息
if final_state:
final_state["execution_stats"] = {
"total_steps": len(execution_steps),
"total_duration": execution_steps[-1]["timestamp"] - execution_steps[0]["timestamp"] if execution_steps else 0,
"queries_executed": len(final_state.get("search_query", [])),
"sources_collected": len(final_state.get("sources_gathered", [])),
}
return final_state
def build_research_workflow() -> CompiledStateGraph:
"""构建深度研究工作流图
该图实现了一个完整的研究流程:
1. 查询生成:将用户问题转换为多个搜索查询
2. 并行搜索:同时执行多个搜索任务
3. 结果收集:汇总所有搜索结果
4. 反思分析:评估信息充分性
5. 迭代优化:根据反思结果决定是否需要更多信息
6. 答案综合:生成最终的综合答案
"""
graph = StateGraph(OverallState)
# 查询生成节点
def generate_query(state: OverallState, config: RunnableConfig) -> Dict[str, Any]:
"""智能查询生成:将研究主题分解为多个搜索角度"""
configurable = config.get("configurable", {})
query_model = configurable.get("query_generator_model", "gemini-2.0-flash-exp")
# 初始化查询生成模型
llm = ChatGoogleGenerativeAI(
model=query_model,
temperature=1.0, # 提高查询多样性
max_retries=2,
api_key=os.getenv("GEMINI_API_KEY"),
)
structured_llm = llm.with_structured_output(SearchQueryList)
# 构建上下文感知的查询生成提示
research_topic = get_research_topic(state["messages"])
current_date = datetime.now().strftime("%Y-%m-%d")
previous_queries = state.get("search_query", [])
query_instructions = f"""当前日期: {current_date}
研究主题: {research_topic}
之前已执行的查询(避免重复):
{chr(10).join(f"- {q}" for q in previous_queries[-5:]) if previous_queries else "无"}
请生成 {state["initial_search_query_count"]} 个新的优化搜索查询,要求:
1. **多角度覆盖**:从不同角度和层面分析主题
2. **时效性考虑**:包含最新信息和趋势分析
3. **深度挖掘**:不仅获取基础信息,还要深入技术细节
4. **专业术语**:使用领域专业术语提高搜索精度
5. **避免重复**:确保与之前查询不重复
每个查询应该是独立且具体的,能够获得有价值的信息片段。"""
result = structured_llm.invoke(query_instructions)
return {
"query_list": result.query,
"query_generation_completed": True,
"query_generated_at": time.time(),
}
# 并行搜索分发节点
def continue_to_web_research(state: OverallState) -> List[Send]:
"""启动并行网络搜索
使用LangGraph的Send机制实现真正的并行搜索,
每个查询都会启动一个独立的搜索任务
"""
query_list = state.get("query_list", [])
return [
Send("web_research", {
"search_query": search_query,
"id": int(idx),
"total_queries": len(query_list),
"research_context": {
"main_topic": get_research_topic(state["messages"]),
"current_loop": state.get("research_loop_count", 0),
}
})
for idx, search_query in enumerate(query_list)
]
# 网络搜索执行节点
def web_research(state: Dict[str, Any], config: RunnableConfig) -> OverallState:
"""执行单个搜索查询的网络研究
集成Google Search API和Gemini模型,实现:
- 智能搜索查询优化
- 自动引用提取和格式化
- URL优化和短链接生成
"""
search_query = state["search_query"]
search_id = state["id"]
research_context = state.get("research_context", {})
# 获取模型配置
configurable = config.get("configurable", {})
model_name = configurable.get("query_generator_model", "gemini-2.0-flash-exp")
# 构建搜索上下文
search_prompt = f"""
使用Google Search API搜索以下查询并提供详细分析:
查询: {search_query}
主题背景: {research_context.get("main_topic", "未知")}
搜索要求:
1. 使用多个相关关键词组合进行搜索
2. 优先选择权威可信的信息源
3. 提取关键事实、数据和观点
4. 分析信息的时效性和相关性
5. 总结核心发现和洞察
请提供结构化的研究结果,包含引用来源。
"""
# 执行搜索
genai_client = genai.GenerativeModel(model_name)
response = genai_client.generate_content(
search_prompt,
tools=[{"google_search": {}}],
config={"temperature": 0} # 确保搜索结果的一致性
)
# 处理搜索结果和引用
resolved_urls = resolve_urls(
response.candidates[0].grounding_metadata.grounding_chunks,
search_id
)
citations = get_citations(response, resolved_urls)
modified_text = insert_citation_markers(response.text, citations)
# 构建结构化的来源信息
sources_gathered = [
{
"url": url_info["url"],
"title": url_info.get("title", "未知标题"),
"short_url": url_info["short_url"],
"value": url_info["value"],
"search_id": search_id,
"search_query": search_query,
"collected_at": time.time(),
"relevance_score": _calculate_relevance_score(
url_info, search_query, research_context
),
}
for url_info in resolved_urls
]
return {
"sources_gathered": sources_gathered,
"search_query": [search_query],
"web_research_result": [modified_text],
}
# 反思分析节点
def reflection(state: OverallState, config: RunnableConfig) -> Dict[str, Any]:
"""反思分析:评估信息充分性并识别知识缺口
这是LangGraph反思机制的核心实现,支持:
- 自动评估研究结果的完整性
- 识别知识缺口和信息不足的领域
- 生成针对性的后续查询建议
- 质量控制和迭代决策
"""
reasoning_model = state.get("reasoning_model", "gemini-2.0-flash-exp")
research_topic = get_research_topic(state["messages"])
summaries = "\n\n---\n\n".join(state["web_research_result"])
current_date = datetime.now().strftime("%Y-%m-%d")
# 构建深度反思提示
reflection_instructions = f"""作为研究分析专家,请对以下研究结果进行深度反思分析:
当前日期: {current_date}
研究主题: {research_topic}
已完成搜索次数: {len(state["search_query"])}
当前研究循环: {state.get("research_loop_count", 0)}
研究结果摘要:
{summaries}
请进行反思分析:
1. **信息完整性评估**:
- 是否覆盖了主题的核心方面?
- 是否存在明显的信息空白?
- 不同来源的信息是否一致?
2. **知识缺口识别**:
- 哪些重要问题尚未得到充分回答?
- 需要哪些类型的补充信息?
- 是否需要更专业或更新的信息?
3. **信息质量评估**:
- 来源的权威性和可信度如何?
- 信息的时效性是否满足要求?
- 是否存在相互矛盾的信息?
4. **后续行动建议**:
- 如果信息不充分,建议具体的后续查询
- 优先级排序和搜索策略建议
请提供结构化的分析结果。"""
# 执行反思分析
llm = ChatGoogleGenerativeAI(
model=reasoning_model,
temperature=0.3, # 平衡创造性和一致性
max_retries=2,
api_key=os.getenv("GEMINI_API_KEY"),
)
result = llm.with_structured_output(Reflection).invoke(reflection_instructions)
return {
"is_sufficient": result.is_sufficient,
"knowledge_gap": result.knowledge_gap,
"follow_up_queries": result.follow_up_queries,
"confidence_score": result.confidence_score,
"research_loop_count": state["research_loop_count"] + 1,
"reflection_completed_at": time.time(),
}
# 最终答案综合节点
def finalize_answer(state: OverallState, config: RunnableConfig) -> Dict[str, Any]:
"""综合所有研究结果生成最终答案
实现智能的信息综合和答案生成:
- 整合多个来源的信息
- 生成结构化的综合答案
- 自动处理引用和来源标注
- 质量评估和置信度计算
"""
reasoning_model = state.get("reasoning_model", "gemini-2.0-flash-exp")
research_topic = get_research_topic(state["messages"])
summaries = "\n---\n\n".join(state["web_research_result"])
current_date = datetime.now().strftime("%Y-%m-%d")
# 构建综合答案生成提示
answer_instructions = f"""作为研究分析师,请基于以下研究结果生成全面、准确的答案:
当前日期: {current_date}
研究主题: {research_topic}
研究循环次数: {state.get("research_loop_count", 0)}
信息来源数量: {len(state.get("sources_gathered", []))}
研究结果详情:
{summaries}
答案生成要求:
1. **结构化组织**:
- 使用清晰的标题和子标题
- 逻辑性强的信息组织
- 重点突出关键发现
2. **客观性和平衡性**:
- 呈现多种观点和角度
- 避免偏见和主观判断
- 承认不确定性和争议
3. **引用和来源**:
- 明确标注信息来源
- 使用内联引用格式
- 提供完整的参考文献列表
4. **实用性**:
- 直接回答用户的核心问题
- 提供可操作的建议和结论
- 突出关键要点和影响
请生成专业、全面的研究报告。"""
# 生成最终答案
llm = ChatGoogleGenerativeAI(
model=reasoning_model,
temperature=0, # 确保答案的一致性和准确性
max_retries=2,
api_key=os.getenv("GEMINI_API_KEY"),
)
result = llm.invoke(answer_instructions)
# 处理引用链接替换
unique_sources = []
answer_content = result.content
for source in state["sources_gathered"]:
if source["short_url"] in answer_content:
# 将短链接替换为完整引用
answer_content = answer_content.replace(
source["short_url"],
source["value"]
)
unique_sources.append(source)
# 计算答案质量指标
quality_metrics = {
"sources_cited": len(unique_sources),
"content_length": len(answer_content),
"research_depth": state.get("research_loop_count", 0),
"confidence_score": state.get("confidence_score", 0.8),
}
return {
"messages": [AIMessage(content=answer_content)],
"sources_gathered": unique_sources,
"research_completed": True,
"quality_metrics": quality_metrics,
"final_answer_generated_at": time.time(),
}
# 构建图结构和流程控制
graph.add_node("generate_query", generate_query)
graph.add_node("continue_to_web_research", continue_to_web_research)
graph.add_node("web_research", web_research)
graph.add_node("reflection", reflection)
graph.add_node("finalize_answer", finalize_answer)
# 设置流程路径
graph.set_entry_point("generate_query")
graph.add_edge("generate_query", "continue_to_web_research")
graph.add_edge("continue_to_web_research", "web_research")
graph.add_edge("web_research", "reflection")
# 智能条件路由:基于反思结果决定下一步
def should_continue_research(state: OverallState) -> str:
"""决定是否继续研究的智能条件函数"""
is_sufficient = state.get("is_sufficient", False)
research_count = state.get("research_loop_count", 0)
max_loops = state.get("max_research_loops", 3)
confidence = state.get("confidence_score", 0)
# 多重条件判断
if is_sufficient and confidence > 0.7:
return "finalize_answer"
elif research_count >= max_loops:
# 达到最大循环次数,强制结束
return "finalize_answer"
elif len(state.get("knowledge_gap", [])) == 0:
# 没有识别到知识缺口
return "finalize_answer"
else:
# 继续研究
return "generate_query"
graph.add_conditional_edges(
"reflection",
should_continue_research,
{
"generate_query": "generate_query",
"finalize_answer": "finalize_answer",
}
)
graph.set_finish_point("finalize_answer")
return graph.compile(
checkpointer=PostgresCheckpointSaver.from_conn_string(
os.getenv("DATABASE_URL", "postgresql://localhost/langgraph")
),
debug=True,
name="DeepResearchWorkflow",
)
# 辅助工具函数
def get_research_topic(messages: List[BaseMessage]) -> str:
"""从消息历史中智能提取研究主题"""
if not messages:
return "未知研究主题"
# 查找最后一条人类消息
for message in reversed(messages):
if isinstance(message, HumanMessage):
content = message.content.strip()
# 简单的主题提取逻辑
if len(content) > 200:
# 长消息,提取前100个字符作为主题
return content[:100] + "..."
else:
return content
return "未知研究主题"
def resolve_urls(grounding_chunks, search_id: int) -> List[Dict[str, str]]:
"""解析并优化URL引用
处理Google Search API返回的引用信息:
- 提取URL和标题信息
- 生成短链接标记
- 创建格式化的引用格式
"""
resolved_urls = []
for idx, chunk in enumerate(grounding_chunks):
if hasattr(chunk, 'web') and chunk.web:
url = chunk.web.uri
title = getattr(chunk.web, 'title', f"来源 {idx + 1}")
# 生成唯一的短链接标记
short_url = f"[{search_id}-{idx}]"
resolved_urls.append({
"url": url,
"title": title,
"short_url": short_url,
"value": f"[{title}]({url})",
"chunk_index": idx,
})
return resolved_urls
def get_citations(response, resolved_urls: List[Dict]) -> List[Dict]:
"""提取和格式化引用信息"""
citations = []
for url_info in resolved_urls:
citations.append({
"url": url_info["url"],
"title": url_info["title"],
"short_url": url_info["short_url"],
"referenced_text": "", # 可以添加引用的具体文本片段
})
return citations
def insert_citation_markers(text: str, citations: List[Dict]) -> str:
"""在文本中智能插入引用标记"""
modified_text = text
# 简单的引用插入策略
# 实际应用中可以使用更复杂的NLP技术来精确定位引用位置
for citation in citations:
original_url = citation["url"]
short_marker = citation["short_url"]
if original_url in modified_text:
modified_text = modified_text.replace(original_url, short_marker)
return modified_text
def _calculate_relevance_score(
url_info: Dict,
search_query: str,
research_context: Dict
) -> float:
"""计算来源的相关性评分"""
score = 0.5 # 基础分数
# 基于标题相关性
title = url_info.get("title", "").lower()
query_terms = search_query.lower().split()
title_matches = sum(1 for term in query_terms if term in title)
score += (title_matches / len(query_terms)) * 0.3
# 基于URL权威性
url = url_info.get("url", "")
if any(domain in url for domain in [".edu", ".gov", ".org"]):
score += 0.2
# 基于内容长度(更长的内容通常更详细)
content_length = len(url_info.get("value", ""))
if content_length > 200:
score += 0.1
return min(score, 1.0)
2. 多智能体协作模式
2.1 分层多智能体系统
基于实际应用案例,展示专业的多智能体协作架构:
class MultiAgentResearchSystem:
"""多智能体研究系统:实现专业分工和协作"""
def __init__(self):
self.coordinator = None
self.specialists = {}
self.coordination_graph = None
def build_hierarchical_research_team(self) -> CompiledStateGraph:
"""构建分层研究团队"""
class TeamState(TypedDict):
messages: Annotated[list, add_messages]
current_task: Optional[str]
task_queue: List[Dict[str, Any]]
specialist_results: Dict[str, Any]
coordination_history: List[Dict[str, Any]]
research_plan: Optional[Dict[str, Any]]
graph = StateGraph(TeamState)
# 协调者智能体:负责任务分解和团队协调
def coordinator_agent(state: TeamState) -> Dict[str, Any]:
"""协调者智能体:任务分解、分配和结果整合"""
messages = state["messages"]
current_task = state.get("current_task")
if not current_task:
# 初始任务分解
user_request = messages[-1].content if messages else ""
# 分析任务复杂度和专业需求
task_analysis = self._analyze_task_requirements(user_request)
# 生成研究计划
research_plan = self._create_research_plan(task_analysis)
# 分解为子任务
subtasks = self._decompose_into_subtasks(research_plan)
return {
"research_plan": research_plan,
"task_queue": subtasks,
"current_task": subtasks[0] if subtasks else None,
"coordination_history": [{
"action": "task_decomposition",
"plan": research_plan,
"subtasks_count": len(subtasks),
"timestamp": time.time(),
}],
}
else:
# 处理专家返回的结果
specialist_results = state.get("specialist_results", {})
task_queue = state.get("task_queue", [])
if specialist_results and task_queue:
# 记录当前任务完成
completed_task = task_queue[0]
remaining_tasks = task_queue[1:]
coordination_entry = {
"action": "task_completion",
"completed_task": completed_task,
"specialist": completed_task.get("assigned_specialist"),
"result_summary": specialist_results.get("summary", ""),
"timestamp": time.time(),
}
if remaining_tasks:
# 还有待处理任务
return {
"current_task": remaining_tasks[0],
"task_queue": remaining_tasks,
"coordination_history": state.get("coordination_history", []) + [coordination_entry],
}
else:
# 所有任务完成,整合最终结果
final_result = self._integrate_specialist_results(
state["specialist_results"],
state["research_plan"]
)
return {
"messages": [AIMessage(content=final_result)],
"coordination_history": state.get("coordination_history", []) + [coordination_entry],
"research_completed": True,
}
return {"current_task": None} # 异常情况处理
# 数据分析专家
def data_analyst_agent(state: TeamState) -> Dict[str, Any]:
"""数据分析专家:处理数据分析和统计任务"""
current_task = state.get("current_task", {})
if current_task.get("type") != "data_analysis":
return {} # 不是数据分析任务
# 执行数据分析
analysis_result = self._perform_data_analysis(current_task)
return {
"specialist_results": {
"type": "data_analysis",
"summary": analysis_result["summary"],
"details": analysis_result["details"],
"visualizations": analysis_result.get("charts", []),
"confidence": analysis_result.get("confidence", 0.8),
}
}
# 代码生成专家
def code_generator_agent(state: TeamState) -> Dict[str, Any]:
"""代码生成专家:处理编程和技术实现任务"""
current_task = state.get("current_task", {})
if current_task.get("type") != "code_generation":
return {}
# 执行代码生成
code_result = self._generate_code_solution(current_task)
return {
"specialist_results": {
"type": "code_generation",
"summary": code_result["summary"],
"code": code_result["code"],
"tests": code_result.get("tests", []),
"documentation": code_result.get("docs", ""),
}
}
# 质量保证专家
def qa_specialist_agent(state: TeamState) -> Dict[str, Any]:
"""质量保证专家:验证结果质量和准确性"""
current_task = state.get("current_task", {})
if current_task.get("type") != "quality_assurance":
return {}
# 执行质量检查
qa_result = self._perform_quality_assurance(
current_task,
state.get("specialist_results", {})
)
return {
"specialist_results": {
"type": "quality_assurance",
"summary": qa_result["summary"],
"issues_found": qa_result["issues"],
"recommendations": qa_result["recommendations"],
"quality_score": qa_result["score"],
}
}
# 添加所有智能体节点
graph.add_node("coordinator", coordinator_agent)
graph.add_node("data_analyst", data_analyst_agent)
graph.add_node("code_generator", code_generator_agent)
graph.add_node("qa_specialist", qa_specialist_agent)
# 设置协作流程
graph.set_entry_point("coordinator")
# 智能路由:根据任务类型分配给相应专家
def route_to_specialist(state: TeamState) -> str:
"""智能路由到专业智能体"""
current_task = state.get("current_task", {})
task_type = current_task.get("type", "unknown")
routing_map = {
"data_analysis": "data_analyst",
"code_generation": "code_generator",
"quality_assurance": "qa_specialist",
}
return routing_map.get(task_type, END)
graph.add_conditional_edges(
"coordinator",
route_to_specialist,
{
"data_analyst": "data_analyst",
"code_generator": "code_generator",
"qa_specialist": "qa_specialist",
END: END,
}
)
# 专家完成后返回协调者
for specialist in ["data_analyst", "code_generator", "qa_specialist"]:
graph.add_edge(specialist, "coordinator")
return graph.compile(
checkpointer=PostgresCheckpointSaver.from_conn_string(
os.getenv("DATABASE_URL")
),
name="MultiAgentResearchTeam",
)
def _analyze_task_requirements(self, user_request: str) -> Dict[str, Any]:
"""分析任务需求和复杂度"""
# 使用NLP技术分析任务特征
task_features = {
"contains_data": any(term in user_request.lower() for term in ["data", "statistics", "numbers", "chart"]),
"requires_code": any(term in user_request.lower() for term in ["code", "programming", "implementation", "algorithm"]),
"needs_qa": any(term in user_request.lower() for term in ["test", "verify", "validate", "check"]),
"complexity_level": self._assess_complexity_level(user_request),
}
return task_features
def _create_research_plan(self, task_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""创建研究计划"""
plan = {
"phases": [],
"estimated_duration": 0,
"required_specialists": [],
}
if task_analysis["contains_data"]:
plan["phases"].append("data_analysis")
plan["required_specialists"].append("data_analyst")
plan["estimated_duration"] += 30 # 分钟
if task_analysis["requires_code"]:
plan["phases"].append("code_generation")
plan["required_specialists"].append("code_generator")
plan["estimated_duration"] += 45
if task_analysis["needs_qa"]:
plan["phases"].append("quality_assurance")
plan["required_specialists"].append("qa_specialist")
plan["estimated_duration"] += 20
return plan
3. 企业级模式和最佳实践
3.1 故障恢复和容错机制
class ResilientWorkflowManager:
"""弹性工作流管理器:企业级故障恢复和容错"""
def __init__(self, graph: CompiledStateGraph):
self.graph = graph
self.failure_tracker = defaultdict(int)
self.recovery_strategies = self._setup_recovery_strategies()
async def execute_with_resilience(
self,
input_data: Dict[str, Any],
config: RunnableConfig,
max_retries: int = 3,
) -> Dict[str, Any]:
"""弹性执行:支持自动故障恢复和重试"""
retry_count = 0
last_checkpoint = None
while retry_count < max_retries:
try:
# 尝试从检查点恢复
if last_checkpoint:
config = {
**config,
"configurable": {
**config.get("configurable", {}),
"checkpoint_id": last_checkpoint,
}
}
input_data = None # 从检查点恢复时不需要新输入
# 执行工作流
final_state = None
async for state in self.graph.astream(input_data, config):
final_state = state
last_checkpoint = state.get("checkpoint_id")
return final_state
except Exception as e:
retry_count += 1
error_type = type(e).__name__
self.failure_tracker[error_type] += 1
logger.warning(f"工作流执行失败 (尝试 {retry_count}/{max_retries}): {e}")
# 应用恢复策略
recovery_action = self.recovery_strategies.get(error_type, "retry")
if recovery_action == "skip_node":
# 跳过失败的节点
config = self._configure_node_skip(config, e)
elif recovery_action == "fallback_model":
# 切换到备用模型
config = self._configure_fallback_model(config, e)
elif recovery_action == "reduce_complexity":
# 降低任务复杂度
input_data = self._reduce_task_complexity(input_data, e)
# 指数退避
await asyncio.sleep(2 ** retry_count)
# 所有重试都失败
raise WorkflowExecutionError(
f"工作流执行失败,已重试 {max_retries} 次",
failure_history=dict(self.failure_tracker)
)
4. 总结
通过整合章的内容,我们看到LangGraph在实际应用中展现出的强大能力:
4.1 技术优势
- 配置驱动架构:通过langgraph.json实现声明式的图管理
- 智能状态路由:Command和Send机制支持复杂的控制流
- 反思式迭代:内置的质量控制和自我改进能力
- 企业级特性:完整的监控、安全、扩缩容支持
4.2 应用场景
- 深度研究系统:多轮迭代的信息收集和分析
- 智能客服平台:多智能体协作的客户服务
- 代码生成工具:反思式的代码生成和优化
- 法律文档分析:专业领域的结构化信息提取
通过深入理解这些高级模式和最佳实践,开发者能够充分发挥LangGraph的潜力,构建真正具有生产价值的智能体应用系统。