概述
LangGraph是一个专为构建多智能体应用而设计的Python框架,基于图计算模型实现复杂AI工作流的编排和执行。本手册将从基础安装到高级应用全面介绍LangGraph的使用方法。
1. 快速开始
1.1 安装配置
基础安装
# 安装核心包
pip install langgraph
# 安装检查点支持
pip install langgraph-checkpoint-postgres # PostgreSQL支持
pip install langgraph-checkpoint-sqlite # SQLite支持
# 安装预构建组件
pip install langgraph-prebuilt
# 安装CLI工具
pip install langgraph-cli
开发环境配置
# 创建项目目录
mkdir my-langgraph-app
cd my-langgraph-app
# 初始化项目
langgraph init
# 安装开发依赖
pip install -r requirements-dev.txt
环境变量配置
# .env文件
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
DATABASE_URL=postgresql://user:password@localhost/langgraph
LANGSMITH_API_KEY=your_langsmith_api_key
LANGSMITH_TRACING=true
1.2 项目结构
my-langgraph-app/
├── langgraph.json # 项目配置文件
├── .env # 环境变量
├── requirements.txt # 依赖包
├── src/
│ ├── __init__.py
│ ├── graph.py # 主图定义
│ ├── nodes/ # 节点实现
│ │ ├── __init__.py
│ │ ├── agent.py
│ │ └── tools.py
│ ├── state.py # 状态定义
│ └── utils.py # 工具函数
├── tests/ # 测试文件
└── docs/ # 文档
1.3 配置文件详解
langgraph.json配置
{
"graphs": {
"main_workflow": "./src/graph.py:workflow",
"chat_agent": "./src/agents/chat.py:agent_graph"
},
"python_version": "3.11",
"env": "./.env",
"dependencies": ["."],
"dockerfile_lines": [
"RUN apt-get update && apt-get install -y git",
"RUN pip install --no-cache-dir torch"
],
"deployment": {
"platform": "langgraph-cloud",
"scaling": {
"min_instances": 1,
"max_instances": 10,
"auto_scaling_metric": "cpu_utilization",
"target_utilization": 70
},
"monitoring": {
"enable_tracing": true,
"log_level": "INFO",
"metrics_collection": ["latency", "throughput", "error_rate"]
}
}
}
2. 核心概念
2.1 状态管理
状态定义
from typing import TypedDict, Annotated, List
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
"""智能体状态定义
状态是图中所有节点共享的数据结构,支持:
- 类型安全:基于TypedDict的强类型定义
- 累积操作:通过Annotated定义数据累积方式
- 版本控制:自动跟踪状态变化历史
"""
# 消息历史(自动累积)
messages: Annotated[List[BaseMessage], add_messages]
# 当前任务
current_task: str
# 执行结果
result: str
# 工具调用历史
tool_calls: List[dict]
# 错误信息
error: str | None
# 迭代计数
iteration_count: int
# 状态操作示例
def update_state_example():
"""状态更新示例"""
from langgraph.graph import StateGraph
def process_node(state: AgentState) -> AgentState:
"""处理节点:展示状态更新模式"""
return {
"current_task": "processing_data",
"iteration_count": state.get("iteration_count", 0) + 1,
"result": f"Processed at iteration {state.get('iteration_count', 0) + 1}"
}
# 创建图
graph = StateGraph(AgentState)
graph.add_node("process", process_node)
graph.set_entry_point("process")
graph.set_finish_point("process")
return graph.compile()
2.2 图构建
基础图构建
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langchain_core.tools import tool
# 定义工具
@tool
def search_web(query: str) -> str:
"""搜索网络信息"""
return f"搜索结果:{query}"
@tool
def calculate(expression: str) -> str:
"""计算数学表达式"""
try:
result = eval(expression)
return f"计算结果:{result}"
except:
return "计算错误"
def build_basic_graph():
"""构建基础图示例"""
def agent_node(state: AgentState) -> AgentState:
"""智能体节点:决策和工具调用"""
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4")
tools = [search_web, calculate]
llm_with_tools = llm.bind_tools(tools)
# 调用LLM
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
def should_continue(state: AgentState) -> str:
"""条件判断:是否继续执行"""
last_message = state["messages"][-1]
# 检查是否有工具调用
if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
return "tools"
else:
return END
# 创建图
graph = StateGraph(AgentState)
# 添加节点
graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode([search_web, calculate]))
# 设置入口
graph.set_entry_point("agent")
# 添加条件边
graph.add_conditional_edges(
"agent",
should_continue,
{
"tools": "tools",
END: END
}
)
# 工具执行后返回智能体
graph.add_edge("tools", "agent")
return graph.compile()
高级图模式
from langgraph.types import Command, Send
from typing import List
def build_advanced_graph():
"""构建高级图:支持并行执行和动态路由"""
def coordinator_node(state: AgentState) -> Command[str]:
"""协调器节点:智能路由决策"""
task = state.get("current_task", "")
if "search" in task.lower():
return Command(
goto="search_specialist",
update={"current_task": f"Routing to search: {task}"}
)
elif "calculate" in task.lower():
return Command(
goto="math_specialist",
update={"current_task": f"Routing to math: {task}"}
)
else:
return Command(
goto="general_agent",
update={"current_task": f"General processing: {task}"}
)
def parallel_dispatcher(state: AgentState) -> List[Send]:
"""并行分发器:将任务分发到多个工作节点"""
tasks = state.get("subtasks", [])
return [
Send("worker", {"task": task, "worker_id": i})
for i, task in enumerate(tasks)
]
def search_specialist(state: AgentState) -> AgentState:
"""搜索专家节点"""
task = state["current_task"]
result = f"搜索专家处理:{task}"
return {"result": result, "current_task": "search_completed"}
def math_specialist(state: AgentState) -> AgentState:
"""数学专家节点"""
task = state["current_task"]
result = f"数学专家处理:{task}"
return {"result": result, "current_task": "math_completed"}
def worker_node(state: dict) -> AgentState:
"""工作节点:处理并行任务"""
task = state["task"]
worker_id = state["worker_id"]
return {
"result": f"Worker {worker_id} completed: {task}",
"worker_id": worker_id
}
# 构建图
graph = StateGraph(AgentState)
# 添加节点
graph.add_node("coordinator", coordinator_node)
graph.add_node("dispatcher", parallel_dispatcher)
graph.add_node("search_specialist", search_specialist)
graph.add_node("math_specialist", math_specialist)
graph.add_node("worker", worker_node)
# 设置路由
graph.set_entry_point("coordinator")
# 条件路由
graph.add_conditional_edges(
"coordinator",
lambda state: state["current_task"].split(":")[0].split()[-1],
{
"search": "search_specialist",
"math": "math_specialist",
"processing": "dispatcher"
}
)
# 并行执行
graph.add_edge("dispatcher", "worker")
return graph.compile()
2.3 检查点系统
检查点配置
from langgraph.checkpoint.postgres import PostgresCheckpointSaver
from langgraph.checkpoint.sqlite import SqliteCheckpointSaver
from langgraph.checkpoint.memory import InMemorySaver
def setup_checkpointing():
"""设置检查点系统"""
# 生产环境:PostgreSQL
def create_postgres_checkpointer():
return PostgresCheckpointSaver.from_conn_string(
"postgresql://user:password@localhost/langgraph"
)
# 开发环境:SQLite
def create_sqlite_checkpointer():
return SqliteCheckpointSaver.from_conn_string("checkpoints.db")
# 测试环境:内存
def create_memory_checkpointer():
return InMemorySaver()
# 根据环境选择
import os
env = os.getenv("ENVIRONMENT", "development")
if env == "production":
return create_postgres_checkpointer()
elif env == "development":
return create_sqlite_checkpointer()
else:
return create_memory_checkpointer()
def use_checkpointing_example():
"""检查点使用示例"""
# 创建带检查点的图
checkpointer = setup_checkpointing()
graph = build_basic_graph()
app = graph.compile(checkpointer=checkpointer)
# 配置线程ID
config = {"configurable": {"thread_id": "conversation-1"}}
# 执行并自动保存检查点
result = app.invoke(
{"messages": [("user", "计算 2+2")]},
config=config
)
# 从检查点恢复执行
continued_result = app.invoke(
{"messages": [("user", "再计算 3+3")]},
config=config # 相同thread_id会从上次状态继续
)
# 查看执行历史
history = app.get_state_history(config)
for state in history:
print(f"Step {state.metadata['step']}: {state.values}")
return result, continued_result
3. API深入分析
3.1 StateGraph核心API
创建和配置
class StateGraph:
"""状态图核心类
StateGraph是LangGraph的核心API,负责:
- 图结构定义和管理
- 节点和边的添加
- 执行流程控制
- 状态传播机制
"""
def __init__(self, state_schema: Type[TypedDict]):
"""初始化状态图
Args:
state_schema: 状态模式定义,必须是TypedDict子类
"""
self.state_schema = state_schema
self.nodes = {}
self.edges = {}
self.conditional_edges = {}
self.entry_point = None
self.finish_point = None
def add_node(self, name: str, action: Callable) -> "StateGraph":
"""添加节点
Args:
name: 节点名称,必须唯一
action: 节点执行函数,签名为 (state) -> state_update
Returns:
StateGraph: 支持链式调用
"""
if name in self.nodes:
raise ValueError(f"Node '{name}' already exists")
self.nodes[name] = PregelNode(
name=name,
action=action,
input_channels=self._get_input_channels(action),
output_channels=self._get_output_channels(action)
)
return self
def add_edge(self, start: str, end: str) -> "StateGraph":
"""添加无条件边
Args:
start: 起始节点名称
end: 结束节点名称或END常量
Returns:
StateGraph: 支持链式调用
"""
if start not in self.nodes:
raise ValueError(f"Start node '{start}' not found")
if end != END and end not in self.nodes:
raise ValueError(f"End node '{end}' not found")
if start not in self.edges:
self.edges[start] = []
self.edges[start].append(end)
return self
def add_conditional_edges(
self,
start: str,
condition: Callable,
condition_map: Dict[str, str]
) -> "StateGraph":
"""添加条件边
Args:
start: 起始节点名称
condition: 条件函数,签名为 (state) -> str
condition_map: 条件值到目标节点的映射
Returns:
StateGraph: 支持链式调用
"""
if start not in self.nodes:
raise ValueError(f"Start node '{start}' not found")
# 验证条件映射中的目标节点
for condition_value, target in condition_map.items():
if target != END and target not in self.nodes:
raise ValueError(f"Target node '{target}' not found")
self.conditional_edges[start] = ConditionalEdge(
condition=condition,
condition_map=condition_map
)
return self
def set_entry_point(self, name: str) -> "StateGraph":
"""设置入口点
Args:
name: 入口节点名称
Returns:
StateGraph: 支持链式调用
"""
if name not in self.nodes:
raise ValueError(f"Entry node '{name}' not found")
self.entry_point = name
return self
def set_finish_point(self, name: str) -> "StateGraph":
"""设置结束点
Args:
name: 结束节点名称
Returns:
StateGraph: 支持链式调用
"""
if name not in self.nodes:
raise ValueError(f"Finish node '{name}' not found")
self.finish_point = name
return self
def compile(
self,
checkpointer: BaseCheckpointSaver = None,
interrupt_before: List[str] = None,
interrupt_after: List[str] = None,
debug: bool = False
) -> CompiledStateGraph:
"""编译图为可执行对象
Args:
checkpointer: 检查点保存器
interrupt_before: 在这些节点前中断
interrupt_after: 在这些节点后中断
debug: 是否启用调试模式
Returns:
CompiledStateGraph: 编译后的可执行图
"""
# 验证图结构
self._validate_graph()
# 创建通道系统
channels = self._create_channels()
# 创建Pregel执行器
pregel = Pregel(
nodes=self.nodes,
channels=channels,
input_channels=self._get_input_channels(),
output_channels=self._get_output_channels(),
stream_channels=self._get_stream_channels(),
checkpointer=checkpointer,
interrupt_before=interrupt_before or [],
interrupt_after=interrupt_after or [],
debug=debug
)
return CompiledStateGraph(pregel)
执行API
class CompiledStateGraph:
"""编译后的状态图
提供多种执行模式:
- invoke: 同步执行
- ainvoke: 异步执行
- stream: 流式执行
- astream: 异步流式执行
"""
def __init__(self, pregel: Pregel):
self.pregel = pregel
def invoke(
self,
input: Dict[str, Any],
config: RunnableConfig = None,
**kwargs
) -> Dict[str, Any]:
"""同步执行图
Args:
input: 输入状态
config: 运行配置,包含thread_id等
**kwargs: 额外参数
Returns:
Dict[str, Any]: 最终状态
Example:
>>> graph = build_basic_graph()
>>> result = graph.invoke(
... {"messages": [("user", "Hello")]},
... {"configurable": {"thread_id": "thread-1"}}
... )
"""
config = config or {}
# 执行图并返回最终状态
final_state = None
for state in self.pregel.stream(input, config, **kwargs):
final_state = state
return final_state
async def ainvoke(
self,
input: Dict[str, Any],
config: RunnableConfig = None,
**kwargs
) -> Dict[str, Any]:
"""异步执行图
Args:
input: 输入状态
config: 运行配置
**kwargs: 额外参数
Returns:
Dict[str, Any]: 最终状态
"""
config = config or {}
final_state = None
async for state in self.pregel.astream(input, config, **kwargs):
final_state = state
return final_state
def stream(
self,
input: Dict[str, Any],
config: RunnableConfig = None,
*,
stream_mode: str = "values",
**kwargs
) -> Iterator[Dict[str, Any]]:
"""流式执行图
Args:
input: 输入状态
config: 运行配置
stream_mode: 流模式 ("values", "updates", "debug")
**kwargs: 额外参数
Yields:
Dict[str, Any]: 中间状态或更新
Example:
>>> for state in graph.stream(input, config):
... print(f"Current state: {state}")
"""
yield from self.pregel.stream(
input,
config,
stream_mode=stream_mode,
**kwargs
)
async def astream(
self,
input: Dict[str, Any],
config: RunnableConfig = None,
*,
stream_mode: str = "values",
**kwargs
) -> AsyncIterator[Dict[str, Any]]:
"""异步流式执行图
Args:
input: 输入状态
config: 运行配置
stream_mode: 流模式
**kwargs: 额外参数
Yields:
Dict[str, Any]: 中间状态或更新
"""
async for state in self.pregel.astream(
input,
config,
stream_mode=stream_mode,
**kwargs
):
yield state
def get_state(self, config: RunnableConfig) -> StateSnapshot:
"""获取当前状态快照
Args:
config: 运行配置,必须包含thread_id
Returns:
StateSnapshot: 状态快照
"""
return self.pregel.get_state(config)
def get_state_history(
self,
config: RunnableConfig,
*,
filter: Dict[str, Any] = None,
before: RunnableConfig = None,
limit: int = None
) -> Iterator[StateSnapshot]:
"""获取状态历史
Args:
config: 运行配置
filter: 过滤条件
before: 获取此配置之前的状态
limit: 限制返回数量
Yields:
StateSnapshot: 历史状态快照
"""
yield from self.pregel.get_state_history(
config,
filter=filter,
before=before,
limit=limit
)
def update_state(
self,
config: RunnableConfig,
values: Dict[str, Any],
as_node: str = None
) -> RunnableConfig:
"""更新状态
Args:
config: 运行配置
values: 要更新的状态值
as_node: 以哪个节点的身份更新
Returns:
RunnableConfig: 更新后的配置
"""
return self.pregel.update_state(config, values, as_node)
3.2 Pregel执行引擎API
class Pregel:
"""Pregel执行引擎
基于Google Pregel模型的图计算引擎,特点:
- 超步执行模型
- 并行节点处理
- 消息传递机制
- 状态一致性保证
"""
def __init__(
self,
nodes: Dict[str, PregelNode],
channels: Dict[str, BaseChannel],
input_channels: List[str],
output_channels: List[str],
stream_channels: List[str] = None,
checkpointer: BaseCheckpointSaver = None,
interrupt_before: List[str] = None,
interrupt_after: List[str] = None,
debug: bool = False
):
"""初始化Pregel执行引擎
Args:
nodes: 节点映射
channels: 通道映射
input_channels: 输入通道列表
output_channels: 输出通道列表
stream_channels: 流式输出通道
checkpointer: 检查点保存器
interrupt_before: 中断前节点列表
interrupt_after: 中断后节点列表
debug: 调试模式
"""
self.nodes = nodes
self.channels = channels
self.input_channels = input_channels
self.output_channels = output_channels
self.stream_channels = stream_channels or []
self.checkpointer = checkpointer
self.interrupt_before = set(interrupt_before or [])
self.interrupt_after = set(interrupt_after or [])
self.debug = debug
def stream(
self,
input: Dict[str, Any],
config: RunnableConfig = None,
*,
stream_mode: str = "values",
**kwargs
) -> Iterator[Dict[str, Any]]:
"""流式执行核心方法
Args:
input: 输入数据
config: 运行配置
stream_mode: 流模式
**kwargs: 额外参数
Yields:
Dict[str, Any]: 执行结果
"""
config = config or {}
# 准备初始状态
checkpoint = self._prepare_initial_checkpoint(input, config)
# 执行超步循环
step = 0
while True:
# 1. 计划阶段:确定活跃节点
tasks = self._plan_step(checkpoint, config)
if not tasks:
break # 没有更多任务,执行完成
# 2. 检查中断点
if self._should_interrupt_before(tasks):
yield self._create_interrupt_state(checkpoint, tasks, "before")
break
# 3. 执行阶段:并行执行节点
step_results = self._execute_step(tasks, checkpoint, config)
# 4. 更新阶段:应用状态更新
checkpoint = self._update_checkpoint(
checkpoint,
step_results,
config
)
# 5. 保存检查点
if self.checkpointer:
self.checkpointer.put(
config,
checkpoint,
self._create_metadata(step, "loop"),
self._get_channel_versions(checkpoint)
)
# 6. 检查中断点
if self._should_interrupt_after(tasks):
yield self._create_interrupt_state(checkpoint, tasks, "after")
break
# 7. 输出中间结果
if stream_mode == "values":
yield self._extract_values(checkpoint)
elif stream_mode == "updates":
yield self._extract_updates(step_results)
elif stream_mode == "debug":
yield self._create_debug_info(checkpoint, step_results)
step += 1
# 输出最终结果
if stream_mode == "values":
yield self._extract_values(checkpoint)
def _plan_step(
self,
checkpoint: Checkpoint,
config: RunnableConfig
) -> List[PregelTask]:
"""计划执行步骤
Args:
checkpoint: 当前检查点
config: 运行配置
Returns:
List[PregelTask]: 待执行任务列表
"""
tasks = []
# 检查每个节点是否需要执行
for node_name, node in self.nodes.items():
if self._should_execute_node(node, checkpoint):
task = PregelTask(
name=node_name,
node=node,
input_state=self._prepare_node_input(node, checkpoint),
config=config
)
tasks.append(task)
return tasks
def _execute_step(
self,
tasks: List[PregelTask],
checkpoint: Checkpoint,
config: RunnableConfig
) -> Dict[str, Any]:
"""执行步骤中的所有任务
Args:
tasks: 任务列表
checkpoint: 当前检查点
config: 运行配置
Returns:
Dict[str, Any]: 执行结果
"""
import concurrent.futures
results = {}
# 并行执行任务
with concurrent.futures.ThreadPoolExecutor() as executor:
# 提交所有任务
future_to_task = {
executor.submit(self._execute_task, task): task
for task in tasks
}
# 收集结果
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
result = future.result()
results[task.name] = result
except Exception as e:
if self.debug:
print(f"Task {task.name} failed: {e}")
results[task.name] = PregelTaskError(task.name, e)
return results
def _execute_task(self, task: PregelTask) -> Any:
"""执行单个任务
Args:
task: 要执行的任务
Returns:
Any: 任务执行结果
"""
try:
# 调用节点函数
if asyncio.iscoroutinefunction(task.node.action):
# 异步节点
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
task.node.action(task.input_state)
)
finally:
loop.close()
else:
# 同步节点
result = task.node.action(task.input_state)
return result
except Exception as e:
return PregelTaskError(task.name, e)
def _update_checkpoint(
self,
checkpoint: Checkpoint,
step_results: Dict[str, Any],
config: RunnableConfig
) -> Checkpoint:
"""更新检查点状态
Args:
checkpoint: 当前检查点
step_results: 步骤执行结果
config: 运行配置
Returns:
Checkpoint: 更新后的检查点
"""
# 复制当前检查点
new_checkpoint = copy_checkpoint(checkpoint)
# 应用每个节点的更新
for node_name, result in step_results.items():
if isinstance(result, PregelTaskError):
continue # 跳过错误结果
# 更新通道值
self._apply_node_updates(new_checkpoint, node_name, result)
# 更新时间戳和版本
new_checkpoint["ts"] = datetime.now(timezone.utc).isoformat()
new_checkpoint["id"] = str(uuid6())
return new_checkpoint
3.3 检查点API详解
class BaseCheckpointSaver:
"""检查点保存器基类API
提供统一的检查点管理接口:
- 状态持久化
- 历史查询
- 版本管理
- 恢复机制
"""
def put(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: ChannelVersions
) -> RunnableConfig:
"""保存检查点
Args:
config: 运行配置
checkpoint: 检查点数据
metadata: 元数据
new_versions: 新版本信息
Returns:
RunnableConfig: 更新后的配置
"""
raise NotImplementedError
def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
"""获取检查点元组
Args:
config: 运行配置
Returns:
CheckpointTuple | None: 检查点元组或None
"""
raise NotImplementedError
def list(
self,
config: RunnableConfig | None,
*,
filter: Dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None
) -> Iterator[CheckpointTuple]:
"""列出检查点
Args:
config: 基础配置
filter: 过滤条件
before: 获取此配置之前的检查点
limit: 限制数量
Yields:
CheckpointTuple: 检查点元组
"""
raise NotImplementedError
# PostgreSQL实现示例
class PostgresCheckpointSaver(BaseCheckpointSaver):
"""PostgreSQL检查点保存器实现"""
def put(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: ChannelVersions
) -> RunnableConfig:
"""保存检查点到PostgreSQL"""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
checkpoint_id = checkpoint["id"]
# 序列化数据
checkpoint_data = self.serde.dumps(checkpoint)
metadata_data = self.serde.dumps(metadata)
with self._cursor() as cur:
# 插入或更新检查点
cur.execute("""
INSERT INTO checkpoints
(thread_id, checkpoint_ns, checkpoint_id, checkpoint, metadata)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (thread_id, checkpoint_ns, checkpoint_id)
DO UPDATE SET
checkpoint = EXCLUDED.checkpoint,
metadata = EXCLUDED.metadata
""", (
thread_id,
checkpoint_ns,
checkpoint_id,
checkpoint_data,
metadata_data
))
return {
**config,
"configurable": {
**config["configurable"],
"checkpoint_id": checkpoint_id
}
}
4. 实战案例
4.1 智能客服系统
def build_customer_service_agent():
"""构建智能客服系统"""
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
# 定义工具
@tool
def query_order_status(order_id: str) -> str:
"""查询订单状态"""
# 模拟数据库查询
orders = {
"12345": "已发货,预计明天到达",
"67890": "正在处理中",
"11111": "已完成"
}
return orders.get(order_id, "订单不存在")
@tool
def create_refund_request(order_id: str, reason: str) -> str:
"""创建退款申请"""
return f"已为订单 {order_id} 创建退款申请,原因:{reason}"
@tool
def escalate_to_human(issue: str) -> str:
"""升级到人工客服"""
return f"已将问题升级到人工客服:{issue}"
class CustomerServiceState(TypedDict):
messages: Annotated[List[BaseMessage], add_messages]
customer_info: dict
issue_category: str
resolution_status: str
escalation_needed: bool
def classify_issue(state: CustomerServiceState) -> CustomerServiceState:
"""问题分类节点"""
llm = ChatOpenAI(model="gpt-4")
classification_prompt = """
根据客户消息,将问题分类为以下类别之一:
- order_inquiry: 订单查询
- refund_request: 退款申请
- technical_support: 技术支持
- complaint: 投诉
- other: 其他
客户消息:{message}
只返回分类结果。
"""
last_message = state["messages"][-1].content
response = llm.invoke(classification_prompt.format(message=last_message))
return {
"issue_category": response.content.strip(),
"resolution_status": "classified"
}
def handle_order_inquiry(state: CustomerServiceState) -> CustomerServiceState:
"""处理订单查询"""
llm = ChatOpenAI(model="gpt-4")
tools = [query_order_status]
llm_with_tools = llm.bind_tools(tools)
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
def handle_refund_request(state: CustomerServiceState) -> CustomerServiceState:
"""处理退款申请"""
llm = ChatOpenAI(model="gpt-4")
tools = [create_refund_request]
llm_with_tools = llm.bind_tools(tools)
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
def check_escalation_needed(state: CustomerServiceState) -> str:
"""检查是否需要升级"""
issue_category = state.get("issue_category", "")
# 复杂问题自动升级
if issue_category in ["complaint", "technical_support"]:
return "escalate"
# 检查客户满意度
last_message = state["messages"][-1].content
if any(word in last_message.lower() for word in ["不满意", "投诉", "经理"]):
return "escalate"
return "resolve"
def escalate_issue(state: CustomerServiceState) -> CustomerServiceState:
"""升级问题"""
issue = state.get("issue_category", "未分类问题")
result = escalate_to_human(issue)
return {
"messages": [AIMessage(content=result)],
"escalation_needed": True,
"resolution_status": "escalated"
}
def finalize_resolution(state: CustomerServiceState) -> CustomerServiceState:
"""完成问题解决"""
return {
"resolution_status": "resolved",
"messages": [AIMessage(content="问题已解决,还有其他需要帮助的吗?")]
}
# 构建图
graph = StateGraph(CustomerServiceState)
# 添加节点
graph.add_node("classify", classify_issue)
graph.add_node("handle_order", handle_order_inquiry)
graph.add_node("handle_refund", handle_refund_request)
graph.add_node("escalate", escalate_issue)
graph.add_node("finalize", finalize_resolution)
graph.add_node("tools", ToolNode([query_order_status, create_refund_request]))
# 设置流程
graph.set_entry_point("classify")
# 根据问题类型路由
graph.add_conditional_edges(
"classify",
lambda state: state["issue_category"],
{
"order_inquiry": "handle_order",
"refund_request": "handle_refund",
"complaint": "escalate",
"technical_support": "escalate",
"other": "escalate"
}
)
# 工具调用处理
def route_after_llm(state: CustomerServiceState) -> str:
last_message = state["messages"][-1]
if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
return "tools"
else:
return "check_escalation"
graph.add_conditional_edges("handle_order", route_after_llm)
graph.add_conditional_edges("handle_refund", route_after_llm)
# 工具执行后检查升级
graph.add_edge("tools", "check_escalation")
# 升级检查
graph.add_conditional_edges(
"check_escalation",
check_escalation_needed,
{
"escalate": "escalate",
"resolve": "finalize"
}
)
return graph.compile()
4.2 代码生成与审查系统
def build_code_review_system():
"""构建代码生成与审查系统"""
class CodeReviewState(TypedDict):
messages: Annotated[List[BaseMessage], add_messages]
code_request: str
generated_code: str
review_feedback: List[str]
test_results: dict
approval_status: str
iteration_count: int
def code_generator(state: CodeReviewState) -> CodeReviewState:
"""代码生成节点"""
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4", temperature=0.1)
# 构建代码生成提示
request = state.get("code_request", "")
feedback = state.get("review_feedback", [])
if feedback:
prompt = f"""
根据以下需求和反馈,改进代码:
需求:{request}
之前的反馈:
{chr(10).join(f"- {fb}" for fb in feedback)}
请生成改进后的Python代码,包含适当的注释和错误处理。
"""
else:
prompt = f"""
根据以下需求生成Python代码:
需求:{request}
要求:
1. 代码结构清晰,有适当注释
2. 包含错误处理
3. 遵循PEP 8规范
4. 包含基本的单元测试
"""
response = llm.invoke(prompt)
return {
"generated_code": response.content,
"messages": [response],
"iteration_count": state.get("iteration_count", 0) + 1
}
def code_reviewer(state: CodeReviewState) -> CodeReviewState:
"""代码审查节点"""
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4", temperature=0.2)
code = state.get("generated_code", "")
review_prompt = f"""
请审查以下Python代码,从以下方面给出反馈:
1. 代码质量和可读性
2. 错误处理
3. 性能优化
4. 安全性
5. 测试覆盖度
代码:
```python
{code}
```
请提供具体的改进建议,如果代码质量良好,请说明"APPROVED"。
"""
response = llm.invoke(review_prompt)
review_content = response.content
# 解析审查结果
if "APPROVED" in review_content.upper():
approval_status = "approved"
feedback = []
else:
approval_status = "needs_improvement"
# 提取反馈点
feedback = [
line.strip()
for line in review_content.split('\n')
if line.strip() and not line.startswith('#')
]
return {
"review_feedback": feedback,
"approval_status": approval_status,
"messages": [response]
}
def code_tester(state: CodeReviewState) -> CodeReviewState:
"""代码测试节点"""
code = state.get("generated_code", "")
# 简单的代码测试(实际应用中会更复杂)
test_results = {
"syntax_check": True,
"basic_tests": True,
"coverage": 85.0
}
try:
# 语法检查
compile(code, '<string>', 'exec')
test_results["syntax_check"] = True
except SyntaxError as e:
test_results["syntax_check"] = False
test_results["syntax_error"] = str(e)
return {
"test_results": test_results,
"messages": [AIMessage(content=f"测试结果:{test_results}")]
}
def should_continue_iteration(state: CodeReviewState) -> str:
"""判断是否继续迭代"""
approval_status = state.get("approval_status", "")
iteration_count = state.get("iteration_count", 0)
test_results = state.get("test_results", {})
# 检查是否通过审查和测试
if (approval_status == "approved" and
test_results.get("syntax_check", False)):
return "finalize"
# 检查迭代次数限制
if iteration_count >= 3:
return "finalize" # 超过最大迭代次数
return "generate" # 继续迭代
def finalize_code(state: CodeReviewState) -> CodeReviewState:
"""完成代码生成"""
code = state.get("generated_code", "")
approval_status = state.get("approval_status", "")
if approval_status == "approved":
message = "代码已通过审查,可以使用。"
else:
message = "代码已达到最大迭代次数,请手动审查。"
return {
"messages": [AIMessage(content=f"{message}\n\n最终代码:\n```python\n{code}\n```")]
}
# 构建图
graph = StateGraph(CodeReviewState)
# 添加节点
graph.add_node("generate", code_generator)
graph.add_node("review", code_reviewer)
graph.add_node("test", code_tester)
graph.add_node("finalize", finalize_code)
# 设置流程
graph.set_entry_point("generate")
graph.add_edge("generate", "review")
graph.add_edge("review", "test")
# 条件路由
graph.add_conditional_edges(
"test",
should_continue_iteration,
{
"generate": "generate",
"finalize": "finalize"
}
)
return graph.compile()
5. 最佳实践
5.1 性能优化
并行执行优化
def optimize_parallel_execution():
"""并行执行优化示例"""
from langgraph.types import Send
def parallel_processor(state: dict) -> List[Send]:
"""并行处理器:将任务分发到多个工作节点"""
tasks = state.get("tasks", [])
# 根据任务类型分组
task_groups = defaultdict(list)
for task in tasks:
task_type = task.get("type", "default")
task_groups[task_type].append(task)
sends = []
for task_type, group_tasks in task_groups.items():
# 为每个任务组创建Send
for i, task in enumerate(group_tasks):
sends.append(Send(
f"worker_{task_type}",
{
"task": task,
"task_id": f"{task_type}_{i}",
"group_size": len(group_tasks)
}
))
return sends
def cpu_intensive_worker(state: dict) -> dict:
"""CPU密集型工作节点"""
import time
task = state["task"]
# 模拟CPU密集型任务
start_time = time.time()
result = sum(i * i for i in range(task.get("iterations", 1000)))
duration = time.time() - start_time
return {
"result": result,
"duration": duration,
"task_id": state["task_id"]
}
def io_intensive_worker(state: dict) -> dict:
"""IO密集型工作节点"""
import asyncio
import aiohttp
async def fetch_data():
# 模拟异步IO操作
await asyncio.sleep(0.1)
return {"data": "fetched"}
# 在同步上下文中运行异步操作
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(fetch_data())
finally:
loop.close()
return {
"result": result,
"task_id": state["task_id"]
}
内存优化
def optimize_memory_usage():
"""内存使用优化"""
class MemoryOptimizedState(TypedDict):
# 使用生成器减少内存占用
data_stream: Iterator[dict]
# 只保留必要的历史信息
recent_messages: Annotated[List[BaseMessage], add_messages]
# 使用弱引用避免循环引用
cache_refs: dict
def memory_efficient_processor(state: MemoryOptimizedState) -> MemoryOptimizedState:
"""内存高效的处理节点"""
import gc
import weakref
# 处理数据流,避免一次性加载所有数据
processed_count = 0
for data_chunk in state.get("data_stream", []):
# 处理单个数据块
process_chunk(data_chunk)
processed_count += 1
# 定期触发垃圾回收
if processed_count % 100 == 0:
gc.collect()
# 清理临时变量
del processed_count
return {
"processing_completed": True,
"cache_refs": {} # 清空缓存引用
}
def process_chunk(chunk: dict):
"""处理单个数据块"""
# 实际的数据处理逻辑
pass
5.2 错误处理
健壮的错误处理
def implement_robust_error_handling():
"""实现健壮的错误处理"""
from enum import Enum
class ErrorSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class ErrorHandlingState(TypedDict):
messages: Annotated[List[BaseMessage], add_messages]
errors: List[dict]
retry_count: int
fallback_used: bool
error_severity: str
def resilient_node(state: ErrorHandlingState) -> ErrorHandlingState:
"""具有弹性的节点实现"""
max_retries = 3
retry_count = state.get("retry_count", 0)
try:
# 主要处理逻辑
result = risky_operation(state)
# 重置错误状态
return {
"result": result,
"retry_count": 0,
"fallback_used": False
}
except ConnectionError as e:
# 网络错误 - 可重试
if retry_count < max_retries:
return {
"retry_count": retry_count + 1,
"errors": state.get("errors", []) + [{
"type": "ConnectionError",
"message": str(e),
"severity": ErrorSeverity.MEDIUM.value,
"retry_attempt": retry_count + 1
}]
}
else:
# 超过重试次数,使用降级方案
return use_fallback_strategy(state, e)
except ValueError as e:
# 数据错误 - 不可重试
return {
"errors": state.get("errors", []) + [{
"type": "ValueError",
"message": str(e),
"severity": ErrorSeverity.HIGH.value,
"recoverable": False
}],
"error_severity": ErrorSeverity.HIGH.value
}
except Exception as e:
# 未知错误 - 记录并降级
return {
"errors": state.get("errors", []) + [{
"type": "UnknownError",
"message": str(e),
"severity": ErrorSeverity.CRITICAL.value,
"stack_trace": traceback.format_exc()
}],
"error_severity": ErrorSeverity.CRITICAL.value,
"fallback_used": True
}
def use_fallback_strategy(state: ErrorHandlingState, error: Exception) -> ErrorHandlingState:
"""使用降级策略"""
# 实现降级逻辑
fallback_result = "使用缓存数据或默认响应"
return {
"result": fallback_result,
"fallback_used": True,
"errors": state.get("errors", []) + [{
"type": "FallbackUsed",
"message": f"使用降级策略: {str(error)}",
"severity": ErrorSeverity.MEDIUM.value
}]
}
def risky_operation(state: ErrorHandlingState) -> str:
"""可能失败的操作"""
import random
if random.random() < 0.3: # 30%失败率
raise ConnectionError("网络连接失败")
return "操作成功"
5.3 监控和调试
全面的监控系统
def implement_comprehensive_monitoring():
"""实现全面的监控系统"""
import time
import logging
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class PerformanceMetrics:
node_name: str
execution_time: float
memory_usage: int
success: bool
error_message: str = None
class MonitoringState(TypedDict):
messages: Annotated[List[BaseMessage], add_messages]
metrics: List[PerformanceMetrics]
start_time: float
trace_id: str
def monitored_node(state: MonitoringState) -> MonitoringState:
"""带监控的节点"""
import psutil
import uuid
node_name = "monitored_node"
start_time = time.time()
start_memory = psutil.Process().memory_info().rss
# 生成追踪ID
trace_id = state.get("trace_id") or str(uuid.uuid4())
try:
# 节点逻辑
result = perform_business_logic(state)
# 记录成功指标
end_time = time.time()
end_memory = psutil.Process().memory_info().rss
metrics = PerformanceMetrics(
node_name=node_name,
execution_time=end_time - start_time,
memory_usage=end_memory - start_memory,
success=True
)
# 记录日志
logging.info(
f"Node {node_name} completed successfully",
extra={
"trace_id": trace_id,
"execution_time": metrics.execution_time,
"memory_delta": metrics.memory_usage
}
)
return {
"result": result,
"metrics": state.get("metrics", []) + [metrics],
"trace_id": trace_id
}
except Exception as e:
# 记录失败指标
end_time = time.time()
end_memory = psutil.Process().memory_info().rss
metrics = PerformanceMetrics(
node_name=node_name,
execution_time=end_time - start_time,
memory_usage=end_memory - start_memory,
success=False,
error_message=str(e)
)
# 记录错误日志
logging.error(
f"Node {node_name} failed",
extra={
"trace_id": trace_id,
"error": str(e),
"execution_time": metrics.execution_time
},
exc_info=True
)
return {
"metrics": state.get("metrics", []) + [metrics],
"trace_id": trace_id,
"error": str(e)
}
def performance_analyzer(state: MonitoringState) -> MonitoringState:
"""性能分析节点"""
metrics = state.get("metrics", [])
if not metrics:
return state
# 分析性能指标
total_time = sum(m.execution_time for m in metrics)
avg_time = total_time / len(metrics)
max_time = max(m.execution_time for m in metrics)
success_rate = sum(1 for m in metrics if m.success) / len(metrics)
analysis = {
"total_execution_time": total_time,
"average_execution_time": avg_time,
"max_execution_time": max_time,
"success_rate": success_rate,
"total_nodes": len(metrics)
}
# 性能告警
if avg_time > 5.0: # 平均执行时间超过5秒
logging.warning(
"High average execution time detected",
extra={"analysis": analysis, "trace_id": state.get("trace_id")}
)
if success_rate < 0.95: # 成功率低于95%
logging.warning(
"Low success rate detected",
extra={"analysis": analysis, "trace_id": state.get("trace_id")}
)
return {
"performance_analysis": analysis,
"messages": [AIMessage(content=f"性能分析完成:{analysis}")]
}
def perform_business_logic(state: MonitoringState) -> str:
"""业务逻辑示例"""
import random
import time
# 模拟不同的执行时间
time.sleep(random.uniform(0.1, 2.0))
# 模拟偶发错误
if random.random() < 0.1:
raise Exception("随机业务错误")
return "业务逻辑执行成功"
5.4 部署和扩展
生产环境部署配置
def production_deployment_config():
"""生产环境部署配置"""
# Docker配置
dockerfile_content = """
FROM python:3.11-slim
# 安装系统依赖
RUN apt-get update && apt-get install -y \\
git \\
curl \\
&& rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 设置环境变量
ENV PYTHONPATH=/app
ENV ENVIRONMENT=production
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \\
CMD curl -f http://localhost:8000/health || exit 1
# 启动应用
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
"""
# Kubernetes部署配置
k8s_deployment = """
apiVersion: apps/v1
kind: Deployment
metadata:
name: langgraph-app
labels:
app: langgraph-app
spec:
replicas: 3
selector:
matchLabels:
app: langgraph-app
template:
metadata:
labels:
app: langgraph-app
spec:
containers:
- name: langgraph-app
image: langgraph-app:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: app-secrets
key: database-url
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: app-secrets
key: openai-api-key
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: langgraph-service
spec:
selector:
app: langgraph-app
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
"""
# 监控配置
monitoring_config = """
# Prometheus配置
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'langgraph-app'
static_configs:
- targets: ['langgraph-service:80']
metrics_path: /metrics
scrape_interval: 10s
# Grafana仪表盘配置
dashboard_config = {
"dashboard": {
"title": "LangGraph应用监控",
"panels": [
{
"title": "请求率",
"type": "graph",
"targets": [
{
"expr": "rate(http_requests_total[5m])",
"legendFormat": "{{method}} {{status}}"
}
]
},
{
"title": "响应时间",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))",
"legendFormat": "95th percentile"
}
]
},
{
"title": "错误率",
"type": "singlestat",
"targets": [
{
"expr": "rate(http_requests_total{status=~\"5..\"}[5m]) / rate(http_requests_total[5m])",
"legendFormat": "Error Rate"
}
]
}
]
}
}
"""
return {
"dockerfile": dockerfile_content,
"k8s_deployment": k8s_deployment,
"monitoring": monitoring_config
}
6. 总结
LangGraph框架提供了强大而灵活的多智能体应用开发能力。通过本手册的学习,您应该能够:
- 掌握核心概念:理解状态管理、图构建、检查点系统等核心概念
- 熟练使用API:掌握StateGraph、Pregel等核心API的使用方法
- 构建复杂应用:能够设计和实现多智能体协作系统
- 优化性能:了解性能优化、错误处理、监控等最佳实践
- 生产部署:具备生产环境部署和运维的能力
6.1 学习路径建议
- 基础阶段:熟悉基本概念和简单图构建
- 进阶阶段:学习条件路由、并行执行、检查点系统
- 高级阶段:掌握多智能体协作、性能优化、监控调试
- 专家阶段:深入源码、自定义扩展、生产优化
6.2 常见问题解答
Q: 如何选择合适的检查点保存器? A: 开发测试使用InMemorySaver,生产环境推荐PostgresCheckpointSaver。
Q: 如何处理长时间运行的任务? A: 使用检查点系统保存中间状态,支持任务暂停和恢复。
Q: 如何优化大规模并发性能? A: 使用Send机制实现真正的并行执行,配合资源池管理。
Q: 如何实现人工介入? A: 使用interrupt_before/interrupt_after参数在指定节点暂停执行。
通过持续实践和深入学习,您将能够充分发挥LangGraph的强大能力,构建出色的多智能体应用系统。
tommie blog