LangChain-99-最佳实践与案例
框架使用示例
示例1: 简单问答链
场景: 构建一个基础的问答系统
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
# 1. 定义提示模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的{domain}专家。简洁准确地回答问题。"),
("human", "{question}")
])
# 2. 创建模型
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# 3. 创建输出解析器
parser = StrOutputParser()
# 4. 组合成链
chain = prompt | model | parser
# 5. 使用
response = chain.invoke({
"domain": "人工智能",
"question": "什么是Transformer?"
})
print(response)
# 输出: Transformer是一种基于自注意力机制的深度学习架构...
关键点:
- 使用
|操作符链式组合组件 - 模板变量通过字典传递
- temperature=0 确保稳定输出
示例2: 结构化输出提取
场景: 从文本中提取结构化信息
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
# 1. 定义输出结构
class Person(BaseModel):
"""从文本提取的人物信息"""
name: str = Field(description="人物姓名")
age: int = Field(description="年龄")
occupation: str = Field(description="职业")
skills: list[str] = Field(description="技能列表")
# 2. 创建结构化输出模型
model = ChatOpenAI(model="gpt-4o")
structured_llm = model.with_structured_output(Person)
# 3. 定义提示
prompt = ChatPromptTemplate.from_template(
"从以下文本中提取人物信息:\n\n{text}"
)
# 4. 组合链
extraction_chain = prompt | structured_llm
# 5. 使用
text = """
张三是一位30岁的软件工程师,精通Python、Java和Go语言,
同时也擅长系统架构设计和团队管理。
"""
person = extraction_chain.invoke({"text": text})
print(f"姓名: {person.name}")
print(f"年龄: {person.age}")
print(f"职业: {person.occupation}")
print(f"技能: {', '.join(person.skills)}")
关键点:
with_structured_output()强制输出特定格式- Pydantic模型自动验证数据
- Field描述帮助模型理解字段含义
示例3: 多步推理链
场景: 复杂问题需要多步推理
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o")
# 步骤1: 分解问题
decompose_prompt = ChatPromptTemplate.from_template(
"将以下复杂问题分解为3-5个子问题:\n{question}"
)
# 步骤2: 回答每个子问题
answer_prompt = ChatPromptTemplate.from_template(
"回答以下子问题:\n{sub_question}\n\n基于上下文: {context}"
)
# 步骤3: 综合答案
synthesize_prompt = ChatPromptTemplate.from_template(
"""基于以下子问题的答案,综合回答原始问题。
原始问题: {original_question}
子问题答案:
{sub_answers}
请给出完整、连贯的最终答案。"""
)
# 构建多步链
chain = (
{"question": RunnablePassthrough()}
| RunnablePassthrough.assign(
sub_questions=decompose_prompt | model | StrOutputParser()
)
# (此处省略子问题回答循环逻辑)
| synthesize_prompt
| model
| StrOutputParser()
)
# 使用
question = "如何在大规模分布式系统中保证数据一致性?"
answer = chain.invoke(question)
关键点:
RunnablePassthrough.assign()添加中间结果- 多步骤链式推理提高复杂问题处理能力
- 保留原始输入供后续步骤使用
示例4: 带工具的Agent
场景: Agent自主选择工具完成任务
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# 1. 定义工具
@tool
def search_database(query: str) -> str:
"""在知识库中搜索相关信息
Args:
query: 搜索查询关键词
"""
# 实际实现查询逻辑
results = database.search(query)
return f"找到{len(results)}条相关记录: {results[:3]}"
@tool
def calculator(expression: str) -> float:
"""计算数学表达式
Args:
expression: 数学表达式,如"2+2*3"
"""
return eval(expression)
@tool
def get_current_time() -> str:
"""获取当前时间"""
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 2. 创建Agent
tools = [search_database, calculator, get_current_time]
model = ChatOpenAI(model="gpt-4o")
agent = create_react_agent(model, tools)
# 3. 使用Agent
result = agent.invoke({
"messages": [("user", "帮我查询用户'张三'的信息,并计算他的订单总金额")]
})
print(result["messages"][-1].content)
关键点:
@tool装饰器自动生成工具schema- 详细的docstring帮助模型理解工具用途
- Agent自动决策工具调用顺序
示例5: 流式聊天应用
场景: Web应用实时流式输出
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
import json
app = FastAPI()
model = ChatOpenAI(model="gpt-4o", streaming=True)
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个友好的AI助手"),
("human", "{input}")
])
chain = prompt | model
@app.post("/chat/stream")
async def chat_stream(message: str):
async def event_generator():
try:
# 流式生成响应
async for chunk in chain.astream({"input": message}):
# SSE格式
data = json.dumps({"content": chunk.content})
yield f"data: {data}\n\n"
# 结束标记
yield "data: [DONE]\n\n"
except Exception as e:
error_data = json.dumps({"error": str(e)})
yield f"data: {error_data}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
# 客户端使用EventSource接收流式响应
# const eventSource = new EventSource('/chat/stream?message=你好');
# eventSource.onmessage = (event) => {
# const data = JSON.parse(event.data);
# if (data.content) {
# console.log(data.content);
# }
# };
关键点:
astream()异步流式输出- SSE格式适合单向实时推送
- 异常处理确保连接正常关闭
实战经验
经验1: 提示工程最佳实践
清晰的角色定义:
# 不推荐: 模糊的角色
prompt = ChatPromptTemplate.from_template("回答问题: {question}")
# 推荐: 明确的角色和任务
prompt = ChatPromptTemplate.from_messages([
("system", """你是一个专业的技术文档撰写专家。
你的任务:
1. 用清晰、准确的语言回答技术问题
2. 提供代码示例时,确保代码可运行
3. 解释技术概念时,从基础到高级逐步深入
4. 避免使用行话,确保初学者也能理解
回答格式:
- 先给出简短总结(1-2句)
- 然后详细解释
- 最后提供实际示例"""),
("human", "{question}")
])
Few-Shot示例:
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个情感分类专家"),
("human", "这部电影太棒了!"),
("ai", "positive"),
("human", "浪费时间,不推荐"),
("ai", "negative"),
("human", "还行吧,有些地方不错"),
("ai", "neutral"),
("human", "{text}")
])
关键点:
- 明确任务边界
- 提供输出格式示例
- Few-Shot提高准确率
经验2: 错误处理策略
多层容错:
from langchain_core.runnables import RunnableWithFallbacks
from openai import APIError, RateLimitError
# 第1层: 模型级重试
primary_model = ChatOpenAI(model="gpt-4o").with_retry(
retry_if_exception_type=(APIError, RateLimitError),
stop_after_attempt=3,
wait_exponential_jitter=True
)
# 第2层: Fallback到备用模型
robust_model = primary_model.with_fallbacks([
ChatOpenAI(model="gpt-4o-mini"), # 更便宜的备用
ChatOpenAI(model="gpt-3.5-turbo"), # 最后的备用
])
# 第3层: 应用层兜底
def safe_invoke(chain, input_data, default_response="抱歉,服务暂时不可用"):
try:
return chain.invoke(input_data)
except Exception as e:
logger.error(f"Chain failed: {e}")
return default_response
超时控制:
import asyncio
async def invoke_with_timeout(chain, input_data, timeout=30):
try:
return await asyncio.wait_for(
chain.ainvoke(input_data),
timeout=timeout
)
except asyncio.TimeoutError:
logger.warning("Request timed out")
return "请求超时,请稍后重试"
关键点:
- 分层错误处理
- 合理的超时设置
- 优雅降级而非完全失败
经验3: 性能优化
缓存策略:
from langchain_core.caches import InMemoryCache, RedisCache
from langchain_core.globals import set_llm_cache
import redis
# 开发环境: 内存缓存
set_llm_cache(InMemoryCache())
# 生产环境: Redis缓存(跨实例共享)
redis_client = redis.Redis(
host="redis-server",
port=6379,
db=0,
decode_responses=True
)
set_llm_cache(RedisCache(redis_=redis_client))
# 使用
model = ChatOpenAI(model="gpt-4o")
model.invoke("什么是AI?") # 第一次调用API
model.invoke("什么是AI?") # 从缓存返回,0延迟
批处理优化:
# 不推荐: 循环调用
results = []
for input_data in inputs:
result = chain.invoke(input_data) # 串行,慢
results.append(result)
# 推荐: 批处理
results = chain.batch(
inputs,
config={"max_concurrency": 10} # 并发控制
)
异步并发:
import asyncio
# 高并发处理
async def process_batch_async(inputs):
tasks = [chain.ainvoke(input) for input in inputs]
results = await asyncio.gather(*tasks)
return results
# 在FastAPI中使用
@app.post("/batch")
async def batch_endpoint(inputs: list[str]):
results = await process_batch_async(inputs)
return {"results": results}
关键点:
- 生产环境使用Redis缓存
- 批处理优于循环
- 异步处理提高吞吐量
经验4: 监控与可观测性
LangSmith集成:
import os
# 启用追踪
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production"
# 添加元数据
result = chain.invoke(
input_data,
config={
"tags": ["production", "customer-service"],
"metadata": {
"user_id": "user_123",
"session_id": "session_456",
"request_id": "req_789"
}
}
)
# 在LangSmith UI中可以:
# - 查看完整调用链
# - 过滤特定用户/会话
# - 分析性能瓶颈
# - 调试错误情况
自定义Callback监控:
from langchain_core.callbacks import BaseCallbackHandler
class MetricsCallback(BaseCallbackHandler):
def on_llm_start(self, serialized, prompts, **kwargs):
# 记录请求开始
metrics.increment("llm.requests")
def on_llm_end(self, response, **kwargs):
# 记录Token使用
if response.llm_output:
tokens = response.llm_output.get("token_usage", {})
metrics.gauge("llm.input_tokens", tokens.get("prompt_tokens", 0))
metrics.gauge("llm.output_tokens", tokens.get("completion_tokens", 0))
def on_llm_error(self, error, **kwargs):
# 记录错误
metrics.increment("llm.errors")
logger.error(f"LLM error: {error}")
# 使用
chain.invoke(input, config={"callbacks": [MetricsCallback()]})
关键点:
- LangSmith追踪所有运行
- 添加有意义的tags和metadata
- 自定义callback接入监控系统
经验5: 成本优化
模型选择策略:
from langchain_core.runnables import RunnableBranch
def get_complexity(input_data):
"""评估任务复杂度"""
word_count = len(input_data.get("text", "").split())
if word_count < 50:
return "simple"
elif word_count < 200:
return "medium"
else:
return "complex"
# 根据复杂度选择模型
model_router = RunnableBranch(
(
lambda x: get_complexity(x) == "simple",
ChatOpenAI(model="gpt-4o-mini") # 便宜
),
(
lambda x: get_complexity(x) == "medium",
ChatOpenAI(model="gpt-4o") # 中等
),
ChatOpenAI(model="o1-preview") # 复杂任务用最强模型
)
smart_chain = model_router | parser
Token使用监控:
class CostTracker:
PRICING = {
"gpt-4o": {"input": 0.03, "output": 0.06}, # per 1K tokens
"gpt-4o-mini": {"input": 0.0015, "output": 0.003},
}
def __init__(self):
self.total_cost = 0
def track_usage(self, model_name, usage_metadata):
pricing = self.PRICING.get(model_name, {"input": 0, "output": 0})
cost = (
usage_metadata["input_tokens"] * pricing["input"] / 1000 +
usage_metadata["output_tokens"] * pricing["output"] / 1000
)
self.total_cost += cost
return cost
tracker = CostTracker()
result = model.invoke(input)
if result.usage_metadata:
cost = tracker.track_usage("gpt-4o", result.usage_metadata)
print(f"本次调用花费: ${cost:.4f}")
print(f"累计花费: ${tracker.total_cost:.4f}")
关键点:
- 简单任务用便宜模型
- 监控Token使用和成本
- 设置预算告警
具体案例
案例1: 客户服务聊天机器人
需求: 构建智能客服系统,回答常见问题并转人工
实现:
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
# 1. 知识库检索工具
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
vectorstore = Chroma(
collection_name="faq",
embedding_function=OpenAIEmbeddings()
)
def search_faq(query: str) -> str:
"""搜索常见问题知识库"""
docs = vectorstore.similarity_search(query, k=3)
return "\n\n".join([doc.page_content for doc in docs])
# 2. 构建客服链
system_prompt = """你是一个专业的客户服务AI助手。
知识库内容:
{faq_results}
指导原则:
1. 优先使用知识库中的信息回答
2. 如果知识库没有相关信息,说明"我不确定,让我为您转接人工客服"
3. 保持礼貌和专业
4. 如果用户明确要求人工客服,立即说"好的,正在为您转接人工客服"
"""
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
MessagesPlaceholder("chat_history"),
("human", "{input}")
])
model = ChatOpenAI(model="gpt-4o")
chain = (
{
"faq_results": lambda x: search_faq(x["input"]),
"input": lambda x: x["input"],
"chat_history": lambda x: x["chat_history"]
}
| prompt
| model
)
# 3. 使用示例
chat_history = []
def chat(user_input: str):
response = chain.invoke({
"input": user_input,
"chat_history": chat_history
})
# 更新历史
chat_history.append(HumanMessage(user_input))
chat_history.append(response)
# 检测是否需要转人工
if "转接人工" in response.content:
return response.content, True # 需要转人工
return response.content, False
# 对话示例
response, need_human = chat("如何退货?")
print(response)
response, need_human = chat("我要投诉")
if need_human:
print("正在转接人工客服...")
关键技术:
- 向量检索增强回答准确性
- MessagesPlaceholder保留对话历史
- 转人工触发条件判断
案例2: 文档问答系统
需求: 基于公司内部文档回答问题
实现:
from langchain_community.document_loaders import DirectoryLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
# 1. 加载和分割文档
loader = DirectoryLoader("./company_docs", glob="**/*.md")
docs = loader.load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
splits = text_splitter.split_documents(docs)
# 2. 创建向量存储
vectorstore = Chroma.from_documents(
documents=splits,
embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
# 3. 构建RAG链
template = """基于以下上下文回答问题。如果上下文中没有相关信息,说"我在文档中没有找到相关信息"。
上下文:
{context}
问题: {question}
回答:"""
prompt = ChatPromptTemplate.from_template(template)
model = ChatOpenAI(model="gpt-4o")
def format_docs(docs):
return "\n\n".join([
f"[来源: {doc.metadata['source']}]\n{doc.page_content}"
for doc in docs
])
rag_chain = (
{
"context": lambda x: format_docs(retriever.invoke(x["question"])),
"question": lambda x: x["question"]
}
| prompt
| model
)
# 4. 使用
response = rag_chain.invoke({
"question": "公司的休假政策是什么?"
})
print(response.content)
关键技术:
- RAG(检索增强生成)架构
- 文档分割保持语义完整性
- 引用来源提高可信度
案例3: 数据分析Agent
需求: Agent自动分析数据并生成报告
实现:
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
import pandas as pd
# 1. 定义数据分析工具
@tool
def load_data(file_path: str) -> str:
"""加载CSV数据文件
Args:
file_path: CSV文件路径
"""
df = pd.read_csv(file_path)
return f"成功加载数据,共{len(df)}行{len(df.columns)}列\n列名: {list(df.columns)}"
@tool
def describe_data(file_path: str) -> str:
"""获取数据统计摘要
Args:
file_path: CSV文件路径
"""
df = pd.read_csv(file_path)
return df.describe().to_string()
@tool
def calculate_correlation(file_path: str, column1: str, column2: str) -> str:
"""计算两列之间的相关性
Args:
file_path: CSV文件路径
column1: 第一列名称
column2: 第二列名称
"""
df = pd.read_csv(file_path)
corr = df[column1].corr(df[column2])
return f"{column1}和{column2}的相关系数: {corr:.4f}"
@tool
def plot_histogram(file_path: str, column: str) -> str:
"""绘制列的直方图
Args:
file_path: CSV文件路径
column: 列名称
"""
df = pd.read_csv(file_path)
df[column].hist()
plt.savefig(f"{column}_histogram.png")
return f"已生成{column}的直方图: {column}_histogram.png"
# 2. 创建分析Agent
tools = [load_data, describe_data, calculate_correlation, plot_histogram]
model = ChatOpenAI(model="gpt-4o")
analyst_agent = create_react_agent(model, tools)
# 3. 使用Agent
result = analyst_agent.invoke({
"messages": [(
"user",
"""分析sales_data.csv文件:
1. 加载数据并查看基本信息
2. 计算销售额和利润的相关性
3. 生成销售额的分布直方图
4. 给出分析结论"""
)]
})
print(result["messages"][-1].content)
关键技术:
- 工具自动选择和组合
- 多步骤数据分析流程
- Agent自主决策执行顺序
案例4: 内容审核系统
需求: 自动审核用户生成内容
实现:
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
# 1. 定义审核结果结构
class ModerationResult(BaseModel):
"""内容审核结果"""
is_appropriate: bool = Field(description="内容是否合适")
categories: list[str] = Field(description="违规类别列表")
severity: int = Field(description="违规严重程度1-5,1最轻5最重")
reason: str = Field(description="判断理由")
suggested_action: str = Field(description="建议采取的行动")
# 2. 构建审核链
moderation_prompt = ChatPromptTemplate.from_template("""作为内容审核专家,评估以下内容是否合适。
审核维度:
- 暴力内容
- 仇恨言论
- 色情内容
- 垃圾广告
- 个人信息泄露
内容:
{content}
请严格评估并给出详细结果。""")
model = ChatOpenAI(model="gpt-4o")
moderation_chain = (
moderation_prompt
| model.with_structured_output(ModerationResult)
)
# 3. 使用
content = "用户发布的内容..."
result = moderation_chain.invoke({"content": content})
if not result.is_appropriate:
print(f"内容违规!")
print(f"类别: {', '.join(result.categories)}")
print(f"严重程度: {result.severity}/5")
print(f"理由: {result.reason}")
print(f"建议: {result.suggested_action}")
# 根据严重程度采取行动
if result.severity >= 4:
# 直接删除
delete_content(content)
elif result.severity >= 2:
# 需要人工复审
queue_for_human_review(content, result)
关键技术:
- 结构化输出确保结果可程序化处理
- 多维度审核提高准确率
- 分级处理降低人工成本
案例5: 多语言翻译系统
需求: 高质量翻译并保持术语一致性
实现:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnablePassthrough
# 1. 术语表管理
glossary = {
"AI": "人工智能",
"machine learning": "机器学习",
"neural network": "神经网络",
}
def apply_glossary(text: str, glossary: dict) -> str:
"""应用术语表"""
for en, zh in glossary.items():
text = text.replace(en, zh)
return text
# 2. 翻译链
translation_prompt = ChatPromptTemplate.from_template("""你是一个专业的技术翻译专家。
术语表(必须使用):
{glossary_text}
原文({source_lang}):
{text}
要求:
1. 翻译为{target_lang}
2. 保持技术术语的一致性
3. 保留原文格式(Markdown等)
4. 确保译文流畅自然
译文:""")
model = ChatOpenAI(model="gpt-4o")
translation_chain = (
{
"text": lambda x: x["text"],
"source_lang": lambda x: x.get("source_lang", "English"),
"target_lang": lambda x: x.get("target_lang", "Chinese"),
"glossary_text": lambda x: "\n".join([
f"{k}: {v}" for k, v in glossary.items()
])
}
| translation_prompt
| model
)
# 3. 使用
english_text = """
# Introduction to AI
AI (Artificial Intelligence) and machine learning are transforming industries.
Neural networks are the core of modern AI systems.
"""
result = translation_chain.invoke({
"text": english_text,
"source_lang": "English",
"target_lang": "Chinese"
})
print(result.content)
关键技术:
- 术语表注入提示词
- 保持格式的翻译
- 上下文一致性
总结
本文档提供了LangChain框架的全面实践指南,涵盖:
- 5个基础使用示例: 从简单问答到复杂Agent
- 5个实战经验: 提示工程、错误处理、性能优化、监控、成本控制
- 5个完整案例: 客服机器人、文档问答、数据分析、内容审核、翻译系统
关键要点:
- 使用LCEL(
|)构建链式工作流 - 结构化输出确保数据可靠性
- 多层容错保证系统稳定性
- 缓存和批处理提高性能
- LangSmith追踪调试和优化
- 根据任务复杂度选择模型降低成本
进阶学习:
- LangGraph: 构建复杂的有状态Agent
- LangSmith: 生产级监控和评估
- LangServe: 快速部署REST API
- 各Partner包文档: 特定集成的最佳实践