LangChain-99-最佳实践与案例

框架使用示例

示例1: 简单问答链

场景: 构建一个基础的问答系统

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

# 1. 定义提示模板
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个专业的{domain}专家。简洁准确地回答问题。"),
    ("human", "{question}")
])

# 2. 创建模型
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# 3. 创建输出解析器
parser = StrOutputParser()

# 4. 组合成链
chain = prompt | model | parser

# 5. 使用
response = chain.invoke({
    "domain": "人工智能",
    "question": "什么是Transformer?"
})

print(response)
# 输出: Transformer是一种基于自注意力机制的深度学习架构...

关键点:

  • 使用|操作符链式组合组件
  • 模板变量通过字典传递
  • temperature=0 确保稳定输出

示例2: 结构化输出提取

场景: 从文本中提取结构化信息

from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

# 1. 定义输出结构
class Person(BaseModel):
    """从文本提取的人物信息"""
    name: str = Field(description="人物姓名")
    age: int = Field(description="年龄")
    occupation: str = Field(description="职业")
    skills: list[str] = Field(description="技能列表")

# 2. 创建结构化输出模型
model = ChatOpenAI(model="gpt-4o")
structured_llm = model.with_structured_output(Person)

# 3. 定义提示
prompt = ChatPromptTemplate.from_template(
    "从以下文本中提取人物信息:\n\n{text}"
)

# 4. 组合链
extraction_chain = prompt | structured_llm

# 5. 使用
text = """
张三是一位30岁的软件工程师,精通Python、Java和Go语言,
同时也擅长系统架构设计和团队管理。
"""

person = extraction_chain.invoke({"text": text})

print(f"姓名: {person.name}")
print(f"年龄: {person.age}")
print(f"职业: {person.occupation}")
print(f"技能: {', '.join(person.skills)}")

关键点:

  • with_structured_output()强制输出特定格式
  • Pydantic模型自动验证数据
  • Field描述帮助模型理解字段含义

示例3: 多步推理链

场景: 复杂问题需要多步推理

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o")

# 步骤1: 分解问题
decompose_prompt = ChatPromptTemplate.from_template(
    "将以下复杂问题分解为3-5个子问题:\n{question}"
)

# 步骤2: 回答每个子问题
answer_prompt = ChatPromptTemplate.from_template(
    "回答以下子问题:\n{sub_question}\n\n基于上下文: {context}"
)

# 步骤3: 综合答案
synthesize_prompt = ChatPromptTemplate.from_template(
    """基于以下子问题的答案,综合回答原始问题。

原始问题: {original_question}

子问题答案:
{sub_answers}

请给出完整、连贯的最终答案。"""
)

# 构建多步链
chain = (
    {"question": RunnablePassthrough()}
    | RunnablePassthrough.assign(
        sub_questions=decompose_prompt | model | StrOutputParser()
    )
    # (此处省略子问题回答循环逻辑)
    | synthesize_prompt
    | model
    | StrOutputParser()
)

# 使用
question = "如何在大规模分布式系统中保证数据一致性?"
answer = chain.invoke(question)

关键点:

  • RunnablePassthrough.assign()添加中间结果
  • 多步骤链式推理提高复杂问题处理能力
  • 保留原始输入供后续步骤使用

示例4: 带工具的Agent

场景: Agent自主选择工具完成任务

from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent

# 1. 定义工具
@tool
def search_database(query: str) -> str:
    """在知识库中搜索相关信息

    Args:
        query: 搜索查询关键词
    """
    # 实际实现查询逻辑
    results = database.search(query)
    return f"找到{len(results)}条相关记录: {results[:3]}"

@tool
def calculator(expression: str) -> float:
    """计算数学表达式

    Args:
        expression: 数学表达式,如"2+2*3"
    """
    return eval(expression)

@tool
def get_current_time() -> str:
    """获取当前时间"""
    from datetime import datetime
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

# 2. 创建Agent
tools = [search_database, calculator, get_current_time]
model = ChatOpenAI(model="gpt-4o")

agent = create_react_agent(model, tools)

# 3. 使用Agent
result = agent.invoke({
    "messages": [("user", "帮我查询用户'张三'的信息,并计算他的订单总金额")]
})

print(result["messages"][-1].content)

关键点:

  • @tool装饰器自动生成工具schema
  • 详细的docstring帮助模型理解工具用途
  • Agent自动决策工具调用顺序

示例5: 流式聊天应用

场景: Web应用实时流式输出

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
import json

app = FastAPI()

model = ChatOpenAI(model="gpt-4o", streaming=True)
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个友好的AI助手"),
    ("human", "{input}")
])

chain = prompt | model

@app.post("/chat/stream")
async def chat_stream(message: str):
    async def event_generator():
        try:
            # 流式生成响应
            async for chunk in chain.astream({"input": message}):
                # SSE格式
                data = json.dumps({"content": chunk.content})
                yield f"data: {data}\n\n"

            # 结束标记
            yield "data: [DONE]\n\n"
        except Exception as e:
            error_data = json.dumps({"error": str(e)})
            yield f"data: {error_data}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

# 客户端使用EventSource接收流式响应
# const eventSource = new EventSource('/chat/stream?message=你好');
# eventSource.onmessage = (event) => {
#     const data = JSON.parse(event.data);
#     if (data.content) {
#         console.log(data.content);
#     }
# };

关键点:

  • astream()异步流式输出
  • SSE格式适合单向实时推送
  • 异常处理确保连接正常关闭

实战经验

经验1: 提示工程最佳实践

清晰的角色定义:

# 不推荐: 模糊的角色
prompt = ChatPromptTemplate.from_template("回答问题: {question}")

# 推荐: 明确的角色和任务
prompt = ChatPromptTemplate.from_messages([
    ("system", """你是一个专业的技术文档撰写专家。

你的任务:
1. 用清晰、准确的语言回答技术问题
2. 提供代码示例时,确保代码可运行
3. 解释技术概念时,从基础到高级逐步深入
4. 避免使用行话,确保初学者也能理解

回答格式:
- 先给出简短总结(1-2句)
- 然后详细解释
- 最后提供实际示例"""),
    ("human", "{question}")
])

Few-Shot示例:

prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个情感分类专家"),
    ("human", "这部电影太棒了!"),
    ("ai", "positive"),
    ("human", "浪费时间,不推荐"),
    ("ai", "negative"),
    ("human", "还行吧,有些地方不错"),
    ("ai", "neutral"),
    ("human", "{text}")
])

关键点:

  • 明确任务边界
  • 提供输出格式示例
  • Few-Shot提高准确率

经验2: 错误处理策略

多层容错:

from langchain_core.runnables import RunnableWithFallbacks
from openai import APIError, RateLimitError

# 第1层: 模型级重试
primary_model = ChatOpenAI(model="gpt-4o").with_retry(
    retry_if_exception_type=(APIError, RateLimitError),
    stop_after_attempt=3,
    wait_exponential_jitter=True
)

# 第2层: Fallback到备用模型
robust_model = primary_model.with_fallbacks([
    ChatOpenAI(model="gpt-4o-mini"),  # 更便宜的备用
    ChatOpenAI(model="gpt-3.5-turbo"),  # 最后的备用
])

# 第3层: 应用层兜底
def safe_invoke(chain, input_data, default_response="抱歉,服务暂时不可用"):
    try:
        return chain.invoke(input_data)
    except Exception as e:
        logger.error(f"Chain failed: {e}")
        return default_response

超时控制:

import asyncio

async def invoke_with_timeout(chain, input_data, timeout=30):
    try:
        return await asyncio.wait_for(
            chain.ainvoke(input_data),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        logger.warning("Request timed out")
        return "请求超时,请稍后重试"

关键点:

  • 分层错误处理
  • 合理的超时设置
  • 优雅降级而非完全失败

经验3: 性能优化

缓存策略:

from langchain_core.caches import InMemoryCache, RedisCache
from langchain_core.globals import set_llm_cache
import redis

# 开发环境: 内存缓存
set_llm_cache(InMemoryCache())

# 生产环境: Redis缓存(跨实例共享)
redis_client = redis.Redis(
    host="redis-server",
    port=6379,
    db=0,
    decode_responses=True
)
set_llm_cache(RedisCache(redis_=redis_client))

# 使用
model = ChatOpenAI(model="gpt-4o")
model.invoke("什么是AI?")  # 第一次调用API
model.invoke("什么是AI?")  # 从缓存返回,0延迟

批处理优化:

# 不推荐: 循环调用
results = []
for input_data in inputs:
    result = chain.invoke(input_data)  # 串行,慢
    results.append(result)

# 推荐: 批处理
results = chain.batch(
    inputs,
    config={"max_concurrency": 10}  # 并发控制
)

异步并发:

import asyncio

# 高并发处理
async def process_batch_async(inputs):
    tasks = [chain.ainvoke(input) for input in inputs]
    results = await asyncio.gather(*tasks)
    return results

# 在FastAPI中使用
@app.post("/batch")
async def batch_endpoint(inputs: list[str]):
    results = await process_batch_async(inputs)
    return {"results": results}

关键点:

  • 生产环境使用Redis缓存
  • 批处理优于循环
  • 异步处理提高吞吐量

经验4: 监控与可观测性

LangSmith集成:

import os

# 启用追踪
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production"

# 添加元数据
result = chain.invoke(
    input_data,
    config={
        "tags": ["production", "customer-service"],
        "metadata": {
            "user_id": "user_123",
            "session_id": "session_456",
            "request_id": "req_789"
        }
    }
)

# 在LangSmith UI中可以:
# - 查看完整调用链
# - 过滤特定用户/会话
# - 分析性能瓶颈
# - 调试错误情况

自定义Callback监控:

from langchain_core.callbacks import BaseCallbackHandler

class MetricsCallback(BaseCallbackHandler):
    def on_llm_start(self, serialized, prompts, **kwargs):
        # 记录请求开始
        metrics.increment("llm.requests")

    def on_llm_end(self, response, **kwargs):
        # 记录Token使用
        if response.llm_output:
            tokens = response.llm_output.get("token_usage", {})
            metrics.gauge("llm.input_tokens", tokens.get("prompt_tokens", 0))
            metrics.gauge("llm.output_tokens", tokens.get("completion_tokens", 0))

    def on_llm_error(self, error, **kwargs):
        # 记录错误
        metrics.increment("llm.errors")
        logger.error(f"LLM error: {error}")

# 使用
chain.invoke(input, config={"callbacks": [MetricsCallback()]})

关键点:

  • LangSmith追踪所有运行
  • 添加有意义的tags和metadata
  • 自定义callback接入监控系统

经验5: 成本优化

模型选择策略:

from langchain_core.runnables import RunnableBranch

def get_complexity(input_data):
    """评估任务复杂度"""
    word_count = len(input_data.get("text", "").split())
    if word_count < 50:
        return "simple"
    elif word_count < 200:
        return "medium"
    else:
        return "complex"

# 根据复杂度选择模型
model_router = RunnableBranch(
    (
        lambda x: get_complexity(x) == "simple",
        ChatOpenAI(model="gpt-4o-mini")  # 便宜
    ),
    (
        lambda x: get_complexity(x) == "medium",
        ChatOpenAI(model="gpt-4o")  # 中等
    ),
    ChatOpenAI(model="o1-preview")  # 复杂任务用最强模型
)

smart_chain = model_router | parser

Token使用监控:

class CostTracker:
    PRICING = {
        "gpt-4o": {"input": 0.03, "output": 0.06},  # per 1K tokens
        "gpt-4o-mini": {"input": 0.0015, "output": 0.003},
    }

    def __init__(self):
        self.total_cost = 0

    def track_usage(self, model_name, usage_metadata):
        pricing = self.PRICING.get(model_name, {"input": 0, "output": 0})
        cost = (
            usage_metadata["input_tokens"] * pricing["input"] / 1000 +
            usage_metadata["output_tokens"] * pricing["output"] / 1000
        )
        self.total_cost += cost
        return cost

tracker = CostTracker()

result = model.invoke(input)
if result.usage_metadata:
    cost = tracker.track_usage("gpt-4o", result.usage_metadata)
    print(f"本次调用花费: ${cost:.4f}")
    print(f"累计花费: ${tracker.total_cost:.4f}")

关键点:

  • 简单任务用便宜模型
  • 监控Token使用和成本
  • 设置预算告警

具体案例

案例1: 客户服务聊天机器人

需求: 构建智能客服系统,回答常见问题并转人工

实现:

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage

# 1. 知识库检索工具
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

vectorstore = Chroma(
    collection_name="faq",
    embedding_function=OpenAIEmbeddings()
)

def search_faq(query: str) -> str:
    """搜索常见问题知识库"""
    docs = vectorstore.similarity_search(query, k=3)
    return "\n\n".join([doc.page_content for doc in docs])

# 2. 构建客服链
system_prompt = """你是一个专业的客户服务AI助手。

知识库内容:
{faq_results}

指导原则:
1. 优先使用知识库中的信息回答
2. 如果知识库没有相关信息,说明"我不确定,让我为您转接人工客服"
3. 保持礼貌和专业
4. 如果用户明确要求人工客服,立即说"好的,正在为您转接人工客服"
"""

prompt = ChatPromptTemplate.from_messages([
    ("system", system_prompt),
    MessagesPlaceholder("chat_history"),
    ("human", "{input}")
])

model = ChatOpenAI(model="gpt-4o")

chain = (
    {
        "faq_results": lambda x: search_faq(x["input"]),
        "input": lambda x: x["input"],
        "chat_history": lambda x: x["chat_history"]
    }
    | prompt
    | model
)

# 3. 使用示例
chat_history = []

def chat(user_input: str):
    response = chain.invoke({
        "input": user_input,
        "chat_history": chat_history
    })

    # 更新历史
    chat_history.append(HumanMessage(user_input))
    chat_history.append(response)

    # 检测是否需要转人工
    if "转接人工" in response.content:
        return response.content, True  # 需要转人工

    return response.content, False

# 对话示例
response, need_human = chat("如何退货?")
print(response)

response, need_human = chat("我要投诉")
if need_human:
    print("正在转接人工客服...")

关键技术:

  • 向量检索增强回答准确性
  • MessagesPlaceholder保留对话历史
  • 转人工触发条件判断

案例2: 文档问答系统

需求: 基于公司内部文档回答问题

实现:

from langchain_community.document_loaders import DirectoryLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough

# 1. 加载和分割文档
loader = DirectoryLoader("./company_docs", glob="**/*.md")
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200
)
splits = text_splitter.split_documents(docs)

# 2. 创建向量存储
vectorstore = Chroma.from_documents(
    documents=splits,
    embedding=OpenAIEmbeddings()
)

retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

# 3. 构建RAG链
template = """基于以下上下文回答问题。如果上下文中没有相关信息,说"我在文档中没有找到相关信息"。

上下文:
{context}

问题: {question}

回答:"""

prompt = ChatPromptTemplate.from_template(template)
model = ChatOpenAI(model="gpt-4o")

def format_docs(docs):
    return "\n\n".join([
        f"[来源: {doc.metadata['source']}]\n{doc.page_content}"
        for doc in docs
    ])

rag_chain = (
    {
        "context": lambda x: format_docs(retriever.invoke(x["question"])),
        "question": lambda x: x["question"]
    }
    | prompt
    | model
)

# 4. 使用
response = rag_chain.invoke({
    "question": "公司的休假政策是什么?"
})
print(response.content)

关键技术:

  • RAG(检索增强生成)架构
  • 文档分割保持语义完整性
  • 引用来源提高可信度

案例3: 数据分析Agent

需求: Agent自动分析数据并生成报告

实现:

from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
import pandas as pd

# 1. 定义数据分析工具
@tool
def load_data(file_path: str) -> str:
    """加载CSV数据文件

    Args:
        file_path: CSV文件路径
    """
    df = pd.read_csv(file_path)
    return f"成功加载数据,共{len(df)}{len(df.columns)}\n列名: {list(df.columns)}"

@tool
def describe_data(file_path: str) -> str:
    """获取数据统计摘要

    Args:
        file_path: CSV文件路径
    """
    df = pd.read_csv(file_path)
    return df.describe().to_string()

@tool
def calculate_correlation(file_path: str, column1: str, column2: str) -> str:
    """计算两列之间的相关性

    Args:
        file_path: CSV文件路径
        column1: 第一列名称
        column2: 第二列名称
    """
    df = pd.read_csv(file_path)
    corr = df[column1].corr(df[column2])
    return f"{column1}{column2}的相关系数: {corr:.4f}"

@tool
def plot_histogram(file_path: str, column: str) -> str:
    """绘制列的直方图

    Args:
        file_path: CSV文件路径
        column: 列名称
    """
    df = pd.read_csv(file_path)
    df[column].hist()
    plt.savefig(f"{column}_histogram.png")
    return f"已生成{column}的直方图: {column}_histogram.png"

# 2. 创建分析Agent
tools = [load_data, describe_data, calculate_correlation, plot_histogram]
model = ChatOpenAI(model="gpt-4o")

analyst_agent = create_react_agent(model, tools)

# 3. 使用Agent
result = analyst_agent.invoke({
    "messages": [(
        "user",
        """分析sales_data.csv文件:
        1. 加载数据并查看基本信息
        2. 计算销售额和利润的相关性
        3. 生成销售额的分布直方图
        4. 给出分析结论"""
    )]
})

print(result["messages"][-1].content)

关键技术:

  • 工具自动选择和组合
  • 多步骤数据分析流程
  • Agent自主决策执行顺序

案例4: 内容审核系统

需求: 自动审核用户生成内容

实现:

from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

# 1. 定义审核结果结构
class ModerationResult(BaseModel):
    """内容审核结果"""
    is_appropriate: bool = Field(description="内容是否合适")
    categories: list[str] = Field(description="违规类别列表")
    severity: int = Field(description="违规严重程度1-5,1最轻5最重")
    reason: str = Field(description="判断理由")
    suggested_action: str = Field(description="建议采取的行动")

# 2. 构建审核链
moderation_prompt = ChatPromptTemplate.from_template("""作为内容审核专家,评估以下内容是否合适。

审核维度:
- 暴力内容
- 仇恨言论
- 色情内容
- 垃圾广告
- 个人信息泄露

内容:
{content}

请严格评估并给出详细结果。""")

model = ChatOpenAI(model="gpt-4o")
moderation_chain = (
    moderation_prompt
    | model.with_structured_output(ModerationResult)
)

# 3. 使用
content = "用户发布的内容..."
result = moderation_chain.invoke({"content": content})

if not result.is_appropriate:
    print(f"内容违规!")
    print(f"类别: {', '.join(result.categories)}")
    print(f"严重程度: {result.severity}/5")
    print(f"理由: {result.reason}")
    print(f"建议: {result.suggested_action}")

    # 根据严重程度采取行动
    if result.severity >= 4:
        # 直接删除
        delete_content(content)
    elif result.severity >= 2:
        # 需要人工复审
        queue_for_human_review(content, result)

关键技术:

  • 结构化输出确保结果可程序化处理
  • 多维度审核提高准确率
  • 分级处理降低人工成本

案例5: 多语言翻译系统

需求: 高质量翻译并保持术语一致性

实现:

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnablePassthrough

# 1. 术语表管理
glossary = {
    "AI": "人工智能",
    "machine learning": "机器学习",
    "neural network": "神经网络",
}

def apply_glossary(text: str, glossary: dict) -> str:
    """应用术语表"""
    for en, zh in glossary.items():
        text = text.replace(en, zh)
    return text

# 2. 翻译链
translation_prompt = ChatPromptTemplate.from_template("""你是一个专业的技术翻译专家。

术语表(必须使用):
{glossary_text}

原文({source_lang}):
{text}

要求:
1. 翻译为{target_lang}
2. 保持技术术语的一致性
3. 保留原文格式(Markdown等)
4. 确保译文流畅自然

译文:""")

model = ChatOpenAI(model="gpt-4o")

translation_chain = (
    {
        "text": lambda x: x["text"],
        "source_lang": lambda x: x.get("source_lang", "English"),
        "target_lang": lambda x: x.get("target_lang", "Chinese"),
        "glossary_text": lambda x: "\n".join([
            f"{k}: {v}" for k, v in glossary.items()
        ])
    }
    | translation_prompt
    | model
)

# 3. 使用
english_text = """
# Introduction to AI

AI (Artificial Intelligence) and machine learning are transforming industries.
Neural networks are the core of modern AI systems.
"""

result = translation_chain.invoke({
    "text": english_text,
    "source_lang": "English",
    "target_lang": "Chinese"
})

print(result.content)

关键技术:

  • 术语表注入提示词
  • 保持格式的翻译
  • 上下文一致性

总结

本文档提供了LangChain框架的全面实践指南,涵盖:

  1. 5个基础使用示例: 从简单问答到复杂Agent
  2. 5个实战经验: 提示工程、错误处理、性能优化、监控、成本控制
  3. 5个完整案例: 客服机器人、文档问答、数据分析、内容审核、翻译系统

关键要点:

  • 使用LCEL(|)构建链式工作流
  • 结构化输出确保数据可靠性
  • 多层容错保证系统稳定性
  • 缓存和批处理提高性能
  • LangSmith追踪调试和优化
  • 根据任务复杂度选择模型降低成本

进阶学习:

  • LangGraph: 构建复杂的有状态Agent
  • LangSmith: 生产级监控和评估
  • LangServe: 快速部署REST API
  • 各Partner包文档: 特定集成的最佳实践