LangGraph-02-langgraph核心模块综合文档

0. 模块概览

职责定义

langgraph核心模块是整个框架的引擎,实现了基于Pregel算法的有状态图计算框架。该模块负责图的构建、编译、执行和状态管理,支持复杂的多步骤Agent工作流程,提供中断恢复、人机交互、并发执行等高级功能。

输入/输出

  • 输入:图定义(节点、边、条件)、状态模式(StateSchema)、执行配置(RunnableConfig)
  • 输出:编译后的可执行图(CompiledStateGraph)、状态快照(StateSnapshot)、执行结果

上下游依赖

  • 上游依赖:checkpoint模块(状态持久化)、channels模块(状态通道)、types模块(核心类型)
  • 下游依赖:prebuilt模块(高级API)、应用层(用户定义的Agent)
  • 外部依赖:langchain_core(基础运行时)、pydantic(数据验证)

生命周期

  1. 图构建阶段:定义节点、边和状态结构
  2. 编译阶段:验证图结构、创建执行计划、初始化通道
  3. 执行阶段:按Pregel算法迭代执行、状态更新、检查点保存
  4. 恢复阶段:从检查点恢复、继续执行或处理中断

1. 模块架构图

flowchart TB
    subgraph "用户接口层"
        StateGraphAPI[StateGraph<br/>状态图构建器]
        CompiledGraph[CompiledStateGraph<br/>编译后图实例]
        NodeBuilder[NodeBuilder<br/>节点构建器]
    end
    
    subgraph "核心执行引擎"
        Pregel[Pregel<br/>核心执行引擎]
        PregelNode[PregelNode<br/>图节点]
        PregelLoop[PregelLoop<br/>执行循环]
        PregelRunner[PregelRunner<br/>任务运行器]
    end
    
    subgraph "状态管理层"
        Channels[Channels<br/>状态通道系统]
        BaseChannel[BaseChannel<br/>通道基类]
        LastValue[LastValue<br/>最新值通道]
        Topic[Topic<br/>主题通道]
        NamedBarrier[NamedBarrierValue<br/>命名屏障]
    end
    
    subgraph "通信协调层"
        Send[Send<br/>消息发送]
        Command[Command<br/>命令对象]
        Interrupt[Interrupt<br/>中断处理]
        StreamProtocol[StreamProtocol<br/>流协议]
    end
    
    subgraph "算法实现层"
        PregelAlgo[Pregel算法<br/>任务调度]
        TaskManager[任务管理器<br/>并发控制]
        WriteBuffer[写缓冲区<br/>状态更新]
        ChannelRead[通道读取<br/>状态获取]
    end
    
    StateGraphAPI --> CompiledGraph
    CompiledGraph --> Pregel
    NodeBuilder --> PregelNode
    
    Pregel --> PregelLoop
    Pregel --> PregelRunner
    PregelLoop --> PregelAlgo
    PregelRunner --> TaskManager
    
    PregelNode --> Channels
    Channels --> BaseChannel
    BaseChannel --> LastValue
    BaseChannel --> Topic
    BaseChannel --> NamedBarrier
    
    Send --> Command
    Command --> Interrupt
    Interrupt --> StreamProtocol
    
    PregelAlgo --> WriteBuffer
    PregelAlgo --> ChannelRead
    TaskManager --> WriteBuffer

架构说明

用户接口层设计

  • StateGraph提供声明式的图构建API,支持泛型状态类型
  • CompiledStateGraph是编译后的可执行实例,实现Runnable接口
  • NodeBuilder支持fluent API,简化复杂节点的构建过程

核心执行引擎

  • Pregel实现Google Pregel论文的核心算法,支持超步执行模式
  • PregelLoop管理执行循环,处理同步/异步执行模式
  • PregelRunner负责任务的实际执行,支持重试和缓存策略

状态管理系统

  • 通道系统提供类型安全的状态传递机制
  • 不同通道类型支持不同的聚合策略(最新值、累加、屏障同步)
  • 支持状态分片和并发更新,保证数据一致性

通信协调机制

  • Send支持动态消息传递,实现map-reduce等模式
  • Command提供复杂的执行控制能力(跳转、更新、恢复)
  • Interrupt机制支持人机交互和暂停恢复

2. 核心API详细分析

2.1 StateGraph图构建API

基本信息

  • 类名StateGraph[StateT, ContextT, InputT, OutputT]
  • 协议:泛型构建器,支持类型安全的图定义
  • 幂等性:构建操作幂等,可重复调用

构造函数结构

class StateGraphConstructor:
    state_schema: type[StateT]      # 状态类型定义
    context_schema: type[ContextT]  # 运行时上下文类型
    input_schema: type[InputT]      # 输入类型约束
    output_schema: type[OutputT]    # 输出类型约束
字段 类型 必填 默认 约束 说明
state_schema type[StateT] - TypedDict或Pydantic模型 图状态结构定义
context_schema type[ContextT] None 不可变类型 运行时上下文
input_schema type[InputT] state_schema 兼容state_schema 输入验证约束
output_schema type[OutputT] state_schema 兼容state_schema 输出格式约束

核心方法实现

def add_node(
    self,
    node: str | StateNode[NodeInputT, ContextT],
    action: StateNode[NodeInputT, ContextT] | None = None,
    *,
    input_schema: type[NodeInputT] | None = None,
    retry_policy: RetryPolicy | None = None,
    cache_policy: CachePolicy | None = None
) -> Self:
    """添加节点到状态图
    
    实现要点:
    1. 节点名称唯一性验证(避免重复定义)
    2. 输入类型推断(从函数签名或显式指定)
    3. 可运行对象转换(统一执行接口)
    4. 策略绑定(重试、缓存策略关联)
    5. 状态模式注册(更新通道映射)
    """
    # 节点名称处理逻辑(省略详细实现)
    if not isinstance(node, str):
        action = node
        node = getattr(action, "__name__", action.__class__.__name__)
    
    # 验证节点唯一性
    if node in self.nodes:
        raise ValueError(f"Node `{node}` already present.")
    
    # 输入类型推断
    inferred_input_schema = self._infer_input_schema(action)
    effective_schema = input_schema or inferred_input_schema or self.state_schema
    
    # 创建节点规格
    self.nodes[node] = StateNodeSpec(
        coerce_to_runnable(action, name=node),
        metadata=None,
        input_schema=effective_schema,
        retry_policy=retry_policy,
        cache_policy=cache_policy
    )
    
    # 注册状态模式
    if effective_schema != self.state_schema:
        self._add_schema(effective_schema)
    
    return self

2.2 Pregel执行引擎

基本信息

  • 类名Pregel[StateT, ContextT, InputT, OutputT]
  • 协议:实现Runnable接口,支持invoke/stream/batch操作
  • 执行模式:基于Pregel超步(superstep)算法

执行配置结构

class PregelExecutionConfig:
    checkpointer: BaseCheckpointSaver    # 检查点保存器
    interrupt_before: list[str]          # 前置中断节点
    interrupt_after: list[str]           # 后置中断节点
    debug: bool                          # 调试模式开关
    recursion_limit: int                 # 递归深度限制
    stream_mode: StreamMode              # 流输出模式
字段 类型 必填 默认 约束 说明
checkpointer BaseCheckpointSaver None 实现检查点接口 状态持久化器
interrupt_before list[str] [] 有效节点名称 执行前中断点
interrupt_after list[str] [] 有效节点名称 执行后中断点
debug bool False - 启用调试输出
recursion_limit int 25 > 0 防无限循环
stream_mode StreamMode “updates” 预定义模式 输出流格式

核心执行流程

def invoke(self, input: InputT, config: RunnableConfig | None = None) -> OutputT:
    """同步执行图实例
    
    执行算法:
    1. 状态初始化(从输入或检查点恢复)
    2. 超步循环执行(直到无可执行任务)
    3. 中断处理(支持暂停和恢复)
    4. 结果输出(根据output_schema过滤)
    """
    config = ensure_config(config)
    
    # 初始化执行上下文
    loop = SyncPregelLoop(
        input=input,
        config=config,
        checkpointer=self.checkpointer,
        nodes=self.nodes,
        channels=self.channels
    )
    
    # 执行主循环
    while True:
        # 计算下一轮可执行任务
        tasks = prepare_next_tasks(
            checkpoint=loop.checkpoint,
            channels=loop.channels,
            nodes=self.nodes
        )
        
        if not tasks:
            break  # 无更多任务,执行结束
            
        # 检查中断条件
        if self._should_interrupt(tasks, "before"):
            return self._handle_interrupt(loop, tasks)
            
        # 并发执行任务
        task_results = loop.runner.submit_sync(tasks)
        
        # 应用状态更新
        loop.checkpoint = apply_writes(
            checkpoint=loop.checkpoint,
            writes=task_results,
            channels=loop.channels
        )
        
        # 保存检查点
        if self.checkpointer:
            self.checkpointer.put(config, loop.checkpoint, metadata={})
            
        # 检查后置中断
        if self._should_interrupt(tasks, "after"):
            return self._handle_interrupt(loop, tasks)
    
    # 提取最终结果
    return read_channels(loop.channels, self.output_channels)

2.3 通道系统详细分析

BaseChannel抽象接口

class BaseChannel(Generic[Value, Update, Checkpoint], ABC):
    """通道基类定义统一接口
    
    核心职责:
    1. 类型安全的状态存储和传递
    2. 支持序列化和反序列化
    3. 提供更新语义和聚合策略
    4. 并发安全的读写操作
    """
    
    @abstractmethod
    def get(self) -> Value:
        """获取当前值,空通道抛出EmptyChannelError"""
        
    @abstractmethod
    def update(self, values: Sequence[Update]) -> bool:
        """批量更新通道值,返回是否产生变化"""
        
    @abstractmethod
    def from_checkpoint(self, checkpoint: Checkpoint) -> Self:
        """从检查点数据恢复通道状态"""

LastValue通道实现

class LastValue(BaseChannel[Value, Value, Value]):
    """最新值通道,保留最后更新的值
    
    更新策略:新值覆盖旧值
    使用场景:单一状态字段,配置信息等
    """
    
    def __init__(self, typ: type[Value]):
        super().__init__(typ)
        self.value: Value | None = None
        
    def update(self, values: Sequence[Value]) -> bool:
        if values:
            old_value = self.value
            self.value = values[-1]  # 取最后一个值
            return old_value != self.value
        return False
        
    def get(self) -> Value:
        if self.value is None:
            raise EmptyChannelError()
        return self.value

BinaryOperatorAggregate通道

class BinaryOperatorAggregate(BaseChannel[Value, Update, Value]):
    """二元操作聚合通道,支持自定义聚合函数
    
    更新策略:使用reducer函数逐步聚合所有更新
    使用场景:列表累加、数值求和、复杂对象合并
    """
    
    def __init__(self, typ: type[Value], reducer: Callable[[Value, Update], Value]):
        super().__init__(typ)
        self.value: Value | None = None
        self.reducer = reducer
        
    def update(self, values: Sequence[Update]) -> bool:
        if not values:
            return False
            
        old_value = self.value
        current = self.value
        
        for update in values:
            if current is None:
                current = update  # 首次更新直接赋值
            else:
                current = self.reducer(current, update)  # 应用聚合函数
                
        self.value = current
        return old_value != current

3. 数据结构UML图

classDiagram
    class StateGraph~StateT,ContextT,InputT,OutputT~ {
        +dict~str,StateNodeSpec~ nodes
        +set~tuple[str,str]~ edges
        +defaultdict branches
        +dict~str,BaseChannel~ channels
        +type[StateT] state_schema
        +add_node(node, action) Self
        +add_edge(start, end) Self
        +add_conditional_edges(source, path, path_map) Self
        +compile() CompiledStateGraph
        +validate() Self
    }
    
    class CompiledStateGraph~StateT,ContextT,InputT,OutputT~ {
        +StateGraph builder
        +dict~type,Callable~ schema_to_mapper
        +attach_node(key, node) None
        +attach_edge(starts, end) None
        +attach_branch(start, name, branch) None
        +_migrate_checkpoint(checkpoint) None
    }
    
    class Pregel~StateT,ContextT,InputT,OutputT~ {
        +dict~str,PregelNode~ nodes
        +dict~str,BaseChannel~ channels
        +BaseCheckpointSaver checkpointer
        +list~str~ interrupt_before_nodes
        +list~str~ interrupt_after_nodes
        +invoke(input, config) OutputT
        +stream(input, config) Iterator
        +batch(inputs, config) list[OutputT]
    }
    
    class PregelNode {
        +list~str~ triggers
        +str|list~str~ channels
        +Callable mapper
        +list~ChannelWrite~ writers
        +Runnable bound
        +list~RetryPolicy~ retry_policy
        +CachePolicy cache_policy
    }
    
    class BaseChannel~Value,Update,Checkpoint~ {
        <<abstract>>
        +str key
        +Any typ
        +get() Value
        +update(values) bool
        +checkpoint() Checkpoint
        +from_checkpoint(checkpoint) Self
        +is_available() bool
        +consume() bool
    }
    
    class LastValue~Value~ {
        +Value value
        +update(values) bool
        +get() Value
        +from_checkpoint(checkpoint) Self
    }
    
    class BinaryOperatorAggregate~Value,Update~ {
        +Value value
        +Callable reducer
        +update(values) bool
        +get() Value
    }
    
    class Send {
        +str node
        +Any arg
        +__init__(node, arg)
        +__hash__() int
        +__eq__(value) bool
    }
    
    class Command~N~ {
        +str|None graph
        +Any|None update
        +dict|Any|None resume
        +Send|Sequence~Send|N~|N goto
        +_update_as_tuples() Sequence~tuple~
        +PARENT: Literal["__parent__"]
    }
    
    class Interrupt {
        +Any value
        +str id
        +__init__(value, id)
        +from_ns(value, ns) Interrupt
    }
    
    class StateSnapshot {
        +dict~str,Any~|Any values
        +tuple~str~ next
        +RunnableConfig config
        +CheckpointMetadata metadata
        +str created_at
        +RunnableConfig parent_config
        +tuple~PregelTask~ tasks
        +tuple~Interrupt~ interrupts
    }
    
    StateGraph --> CompiledStateGraph : compiles_to
    CompiledStateGraph --|> Pregel : extends
    Pregel --> PregelNode : contains
    PregelNode --> BaseChannel : reads_from
    BaseChannel <|-- LastValue : implements
    BaseChannel <|-- BinaryOperatorAggregate : implements
    Send --> Command : used_by
    Command --> Interrupt : triggers
    Pregel --> StateSnapshot : creates
    StateSnapshot --> Interrupt : contains

数据结构说明

StateGraph构建器模式

  • 采用Builder模式提供fluent API,支持链式调用
  • 泛型设计保证编译时类型安全
  • 延迟验证,在compile阶段进行完整性检查

Pregel执行引擎架构

  • 实现Google Pregel论文的核心算法
  • 支持同步/异步执行模式,适配不同性能需求
  • 内置检查点机制,支持故障恢复和暂停续传

通道系统类型层次

  • BaseChannel定义统一接口契约
  • 不同实现支持不同聚合语义(覆盖、累加、合并)
  • 支持自定义通道类型,扩展性良好

消息传递机制

  • Send支持动态路由,实现复杂的执行拓扑
  • Command提供丰富的控制能力,支持跳转、更新、恢复
  • Interrupt机制支持人机交互和暂停恢复

4. 时序图分析

4.1 图构建与编译时序

sequenceDiagram
    autonumber
    participant User as 用户代码
    participant StateGraph as StateGraph
    participant Compiler as 编译器
    participant Validator as 验证器
    participant Pregel as Pregel引擎
    
    User->>StateGraph: 创建图实例
    StateGraph->>StateGraph: 初始化状态模式
    User->>StateGraph: add_node("node1", func1)
    StateGraph->>StateGraph: 注册节点规格
    StateGraph->>StateGraph: 推断输入类型
    User->>StateGraph: add_edge("node1", "node2")
    StateGraph->>StateGraph: 添加边关系
    User->>StateGraph: add_conditional_edges(source, condition)
    StateGraph->>StateGraph: 注册条件分支
    
    User->>StateGraph: compile()
    StateGraph->>Validator: validate(interrupt_nodes)
    Validator->>Validator: 检查节点完整性
    Validator->>Validator: 验证边连通性
    Validator-->>StateGraph: 验证通过
    
    StateGraph->>Compiler: 创建CompiledStateGraph
    Compiler->>Compiler: 生成通道映射
    Compiler->>Compiler: 构建执行节点
    Compiler->>Pregel: 实例化执行引擎
    Pregel->>Pregel: 初始化运行时组件
    Compiler-->>StateGraph: 返回编译后图实例
    StateGraph-->>User: CompiledStateGraph

4.2 图执行与状态更新时序

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant Pregel as Pregel引擎
    participant Loop as 执行循环
    participant TaskRunner as 任务运行器
    participant Channels as 通道系统
    participant Checkpointer as 检查点保存器
    
    Client->>Pregel: invoke(input, config)
    Pregel->>Loop: 创建执行循环
    Loop->>Checkpointer: 加载或创建检查点
    Checkpointer-->>Loop: 返回初始状态
    Loop->>Channels: 初始化通道值
    
    loop 超步执行循环
        Loop->>Loop: prepare_next_tasks()
        Loop->>Loop: 计算可执行节点集合
        
        alt 存在可执行任务
            Loop->>TaskRunner: submit_sync(tasks)
            
            par 并发执行任务
                TaskRunner->>TaskRunner: 执行task1
                TaskRunner->>TaskRunner: 执行task2  
                TaskRunner->>TaskRunner: 执行taskN
            end
            
            TaskRunner-->>Loop: 返回任务结果
            Loop->>Channels: apply_writes(results)
            Channels->>Channels: 更新通道状态
            Loop->>Checkpointer: put(checkpoint)
            Checkpointer-->>Loop: 保存成功
        else 无可执行任务
            Loop->>Loop: 退出执行循环
        end
    end
    
    Loop->>Channels: read_channels(output_channels)
    Channels-->>Loop: 最终状态值
    Loop-->>Pregel: 执行结果
    Pregel-->>Client: 返回输出

4.3 中断与恢复时序

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant Pregel as Pregel引擎
    participant Node as 执行节点
    participant InterruptHandler as 中断处理器
    participant Checkpointer as 检查点保存器
    
    Client->>Pregel: invoke(input, config)
    Pregel->>Pregel: 开始执行循环
    Pregel->>Node: 执行节点函数
    Node->>Node: 业务逻辑处理
    Node->>InterruptHandler: interrupt(value)
    InterruptHandler->>InterruptHandler: 创建Interrupt对象
    InterruptHandler->>Pregel: 抛出GraphInterrupt异常
    
    Pregel->>Checkpointer: 保存中断状态
    Checkpointer-->>Pregel: 确认保存
    Pregel->>Pregel: 构造StateSnapshot
    Pregel-->>Client: 返回中断信息
    
    Note over Client: 用户处理中断,准备恢复值
    
    Client->>Pregel: invoke(Command(resume=value), config)
    Pregel->>Checkpointer: 加载中断检查点
    Checkpointer-->>Pregel: 返回状态
    Pregel->>Node: 重新执行节点(从头开始)
    Node->>InterruptHandler: interrupt(value) [第二次调用]
    InterruptHandler->>InterruptHandler: 返回恢复值(不抛异常)
    Node->>Node: 使用恢复值继续处理
    Node-->>Pregel: 返回节点结果
    Pregel->>Pregel: 继续执行后续节点
    Pregel-->>Client: 返回最终结果

5. 关键设计决策与权衡

5.1 Pregel算法选择

设计理由

  • 并行性:天然支持节点间的并发执行,提升性能
  • 容错性:超步模式便于实现检查点和故障恢复
  • 可扩展性:分布式友好,支持大规模图计算
  • 确定性:相同输入保证相同输出,便于调试和测试

实现权衡

  • 相比事件驱动模式,可能产生额外的同步开销
  • 需要显式的状态管理,增加了系统复杂性
  • 但获得了更强的一致性保证和故障恢复能力

5.2 通道系统设计

类型安全策略

  • 基于Python类型系统提供编译时检查
  • 通道泛型设计支持不同的值和更新类型
  • 序列化/反序列化保证类型一致性

聚合策略多样性

  • LastValue:适用于配置和状态覆盖场景
  • BinaryOperatorAggregate:支持自定义聚合逻辑
  • Topic:支持发布/订阅模式的消息传递
  • NamedBarrierValue:实现同步屏障,控制执行顺序

5.3 中断与恢复机制

中断语义设计

  • interrupt()函数在首次调用时抛出异常,暂停执行
  • 第二次调用时返回恢复值,继续执行
  • 基于执行顺序的匹配机制,支持多个中断点

状态一致性保证

  • 节点重新执行保证状态的完整性
  • 检查点在中断时立即保存,避免状态丢失
  • 恢复时的幂等性设计,多次恢复结果一致

5.4 并发执行策略

任务级并发

  • 同一超步内的独立任务可并发执行
  • 通过依赖分析确定并发边界
  • 支持同步和异步两种执行模式

状态更新协调

  • 写操作在超步结束时统一应用
  • 通道更新的原子性保证
  • 版本控制机制检测冲突更新

6. 性能特征与优化

6.1 执行性能指标

性能维度 典型值 优化目标 监控指标
节点执行延迟 < 100ms < 50ms P95响应时间
超步切换开销 < 10ms < 5ms 状态同步时间
检查点保存 < 50ms < 20ms 序列化延迟
内存使用 < 1GB < 500MB 峰值内存占用
并发任务数 100+ 1000+ 活跃任务数量

6.2 关键优化策略

执行路径优化

  • 依赖分析减少不必要的同步等待
  • 惰性状态加载,按需读取通道值
  • 智能任务调度,优先执行关键路径

内存管理优化

  • 通道生命周期管理,及时释放过期状态
  • 大对象的引用传递,减少拷贝开销
  • 垃圾回收友好的数据结构设计

I/O性能优化

  • 检查点批量写入,减少磁盘I/O
  • 异步检查点保存,不阻塞执行流程
  • 压缩序列化减少存储空间

7. 错误处理与边界情况

7.1 图结构验证

def validate(self) -> Self:
    """图结构完整性验证
    
    验证规则:
    1. 所有边的起点和终点节点必须存在
    2. 必须有至少一个START边作为入口
    3. 中断节点必须是已定义的节点
    4. 条件分支的目标节点必须有效
    """
    # 验证节点存在性
    all_sources = {src for src, _ in self._all_edges}
    for source in all_sources:
        if source not in self.nodes and source != START:
            raise ValueError(f"Found edge starting at unknown node '{source}'")
    
    # 验证入口存在性  
    if START not in all_sources:
        raise ValueError("Graph must have an entrypoint")
    
    # 验证条件分支目标
    for start, branches in self.branches.items():
        for cond, branch in branches.items():
            if branch.ends:
                for end in branch.ends.values():
                    if end not in self.nodes and end != END:
                        raise ValueError(f"Branch '{cond}' has unknown target '{end}'")

7.2 执行时异常处理

递归深度控制

  • 默认限制25步,防止无限循环
  • 超出限制时抛出GraphRecursionError
  • 支持动态调整限制,适应复杂工作流

状态更新验证

  • 类型检查确保更新值符合通道类型
  • 键值验证避免无效的状态字段
  • 幂等性检查防止重复更新

并发安全保护

  • 通道更新的原子性操作
  • 检查点写入的并发控制
  • 资源竞争的检测和处理

7.3 故障恢复机制

检查点一致性

  • 写入前的完整性校验
  • 损坏检查点的自动修复
  • 多版本备份策略

网络分区处理

  • 分布式场景下的一致性协议
  • 脑裂检测和恢复机制
  • 优雅降级策略

8. 扩展性与集成

8.1 自定义通道类型

class CustomChannel(BaseChannel[MyValue, MyUpdate, MyCheckpoint]):
    """自定义通道实现示例
    
    支持特定业务逻辑的状态聚合策略
    """
    
    def update(self, values: Sequence[MyUpdate]) -> bool:
        # 实现自定义聚合逻辑
        pass
        
    def get(self) -> MyValue:
        # 实现自定义值获取逻辑
        pass

8.2 节点执行扩展

中间件机制

  • pre_model_hook:执行前的预处理
  • post_model_hook:执行后的后处理
  • 支持链式中间件组合

插件系统

  • 可插拔的重试策略
  • 可配置的缓存策略
  • 自定义的监控和日志

8.3 与LangChain生态集成

Runnable兼容性

  • 完全实现Runnable接口
  • 支持LangChain的链式组合
  • 与LCEL语法无缝集成

工具集成

  • 与LangChain Tools的原生支持
  • 支持工具调用的并发执行
  • 工具错误的自动重试

通过以上全面的架构分析,langgraph核心模块为构建复杂的AI Agent系统提供了强大的基础设施,平衡了性能、可扩展性和易用性的需求。