1. 核心抽象层:Runnable 统一接口

1.1 Runnable 接口设计哲学

LangChain 的核心设计理念是统一抽象,所有组件都实现 Runnable 接口:

// langchain_core/runnables/base.py
class Runnable(Generic[Input, Output], ABC):
    """LangChain 的核心抽象 - 可运行对象"""

    @abstractmethod
    def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
        """同步调用 - 核心执行方法"""

    @abstractmethod
    async def ainvoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
        """异步调用 - 支持高并发"""

    def stream(self, input: Input, config: Optional[RunnableConfig] = None) -> Iterator[Output]:
        """流式调用 - 实时响应"""
        yield from self._transform_stream_with_config(
            iter([input]), self._transform, config, **kwargs
        )

    def batch(self, inputs: List[Input], config: Optional[RunnableConfig] = None) -> List[Output]:
        """批量调用 - 提升吞吐量"""
        return [self.invoke(input, config) for input in inputs]

设计优势

  • 组合性:通过 | 操作符实现链式组合
  • 一致性:统一的调用接口,降低学习成本
  • 扩展性:新组件只需实现 Runnable 即可无缝集成

1.2 RunnableConfig 配置体系

class RunnableConfig(TypedDict, total=False):
    """运行时配置 - 贯穿整个执行链路"""

    tags: List[str]                    # 标签,用于分类和过滤
    metadata: Dict[str, Any]           # 元数据,携带上下文信息
    callbacks: Callbacks               # 回调处理器
    run_name: str                      # 运行名称,便于调试
    max_concurrency: Optional[int]     # 最大并发数
    recursion_limit: int               # 递归深度限制
    configurable: Dict[str, Any]       # 可配置参数

配置传递机制

def ensure_config(config: Optional[RunnableConfig] = None) -> RunnableConfig:
    """确保配置完整性"""
    empty_config: RunnableConfig = {}
    if config is None:
        return empty_config

    # 合并默认配置
    config = {**empty_config, **config}

    # 生成唯一运行ID
    if "run_id" not in config:
        config["run_id"] = uuid.uuid4()

    return config

2. Chain 模块:结构化调用序列

2.1 Chain 基类架构

// langchain/chains/base.py
class Chain(RunnableSerializable[dict[str, Any], dict[str, Any]], ABC):
    """链的抽象基类 - 结构化组件调用序列"""

    memory: Optional[BaseMemory] = None
    """记忆组件 - 维护状态"""

    callbacks: Callbacks = Field(default=None, exclude=True)
    """回调处理器 - 可观测性"""

    verbose: bool = Field(default_factory=_get_verbosity)
    """详细模式 - 调试输出"""

    @property
    @abstractmethod
    def input_keys(self) -> List[str]:
        """输入键列表 - 定义接口契约"""

    @property
    @abstractmethod
    def output_keys(self) -> List[str]:
        """输出键列表 - 定义返回格式"""

    @abstractmethod
    def _call(self, inputs: Dict[str, Any], run_manager: Optional[CallbackManagerForChainRun] = None) -> Dict[str, Any]:
        """核心执行逻辑 - 子类必须实现"""

2.2 Chain 执行流程解析

def __call__(self, inputs: Union[Dict[str, Any], Any], return_only_outputs: bool = False, callbacks: Callbacks = None, *, include_run_info: bool = False) -> Dict[str, Any]:
    """Chain 调用入口 - 完整的执行流程"""

    # 1. 输入预处理和验证
    inputs = self.prep_inputs(inputs)

    # 2. 回调管理器初始化
    callback_manager = CallbackManager.configure(
        callbacks,
        self.callbacks,
        self.verbose,
        self.tags,
        self.metadata
    )

    # 3. 执行前回调
    new_arg_supported = inspect.signature(self._call).parameters.get("run_manager")
    run_manager = callback_manager.on_chain_start(
        dumpd(self),
        inputs,
        name=run_name,
    )

    try:
        # 4. 核心执行逻辑
        outputs = (
            self._call(inputs, run_manager=run_manager)
            if new_arg_supported
            else self._call(inputs)
        )

        # 5. 执行后回调
        run_manager.on_chain_end(outputs)

        # 6. 输出后处理
        final_outputs: Dict[str, Any] = self.prep_outputs(
            inputs, outputs, return_only_outputs
        )

    except Exception as e:
        # 7. 错误处理
        run_manager.on_chain_error(e)
        raise e

    return final_outputs

2.3 LLMChain:最基础的链实现

class LLMChain(Chain):
    """LLM 调用链 - 最基础的实现"""

    prompt: BasePromptTemplate
    """Prompt 模板"""

    llm: BaseLanguageModel
    """语言模型"""

    output_parser: BaseOutputParser = Field(default_factory=NoOpOutputParser)
    """输出解析器"""

    def _call(self, inputs: Dict[str, Any], run_manager: Optional[CallbackManagerForChainRun] = None) -> Dict[str, Any]:
        # 1. 格式化 Prompt
        response = self.generate([inputs], run_manager=run_manager)
        return self.create_outputs(response)[0]

    def generate(self, input_list: List[Dict[str, Any]], run_manager: Optional[CallbackManagerForChainRun] = None) -> LLMResult:
        """批量生成 - 核心实现"""

        # 1. 构建 Prompt 列表
        prompts, stop = self.prep_prompts(input_list, run_manager=run_manager)

        # 2. 调用 LLM
        response = self.llm.generate_prompt(
            prompts,
            stop,
            callbacks=run_manager.get_child() if run_manager else None,
            **self.llm_kwargs,
        )

        return response

    def prep_prompts(self, input_list: List[Dict[str, Any]], run_manager: Optional[CallbackManagerForChainRun] = None) -> Tuple[List[PromptValue], Optional[List[str]]]:
        """准备 Prompt 列表"""
        stop = None
        if "stop" in input_list[0]:
            stop = input_list[0]["stop"]

        prompts = []
        for inputs in input_list:
            selected_inputs = {k: inputs[k] for k in self.prompt.input_variables}
            prompt = self.prompt.format_prompt(**selected_inputs)

            # 记录 Prompt 格式化过程
            if run_manager:
                run_manager.on_text(prompt.to_string(), verbose=self.verbose)

            prompts.append(prompt)

        return prompts, stop

2.4 SequentialChain:顺序执行链

class SequentialChain(Chain):
    """顺序链 - 多个链的串行执行"""

    chains: List[Chain]
    """子链列表"""

    input_variables: List[str]
    """输入变量名"""

    output_variables: List[str]
    """输出变量名"""

    return_all: bool = False
    """是否返回所有中间结果"""

    def _call(self, inputs: Dict[str, Any], run_manager: Optional[CallbackManagerForChainRun] = None) -> Dict[str, Any]:
        known_values = inputs.copy()

        # 逐个执行子链
        for i, chain in enumerate(self.chains):
            # 提取当前链需要的输入
            chain_inputs = {k: known_values[k] for k in chain.input_keys if k in known_values}

            # 执行子链
            if run_manager:
                child_manager = run_manager.get_child(f"step_{i}")
                outputs = chain(chain_inputs, callbacks=child_manager.handlers, include_run_info=True)
            else:
                outputs = chain(chain_inputs, return_only_outputs=True)

            # 更新已知值
            known_values.update(outputs)

        # 返回指定的输出变量
        if self.return_all:
            return known_values
        else:
            return {k: known_values[k] for k in self.output_variables}

3. Agent 模块:智能决策与工具调用

3.1 Agent 架构层次

graph TB
    A[AgentExecutor] --> B[BaseSingleActionAgent]
    A --> C[BaseMultiActionAgent]

    B --> D[ReActSingleActionAgent]
    B --> E[OpenAIFunctionsAgent]
    B --> F[ConversationalAgent]

    C --> G[OpenAIMultiFunctionsAgent]

    A --> H[Tools List]
    A --> I[Memory]
    A --> J[Callbacks]

    style A fill:#e1f5fe
    style B fill:#f3e5f5
    style H fill:#e8f5e8