📚 目录

  1. 框架概述与设计理念
  2. 整体架构分析
  3. 核心API深度分析
  4. Schema模块详解
  5. Components模块详解
  6. Compose模块详解
  7. ADK模块详解
  8. Callbacks模块详解
  9. 关键数据结构与继承关系
  10. 实战经验与最佳实践

1. 框架概述与设计理念

1.1 项目基本信息

Eino 是 CloudWeGo 开源的 Go 语言 LLM 应用开发框架,旨在成为终极的大语言模型应用开发框架。

项目属性
项目名称Eino
开发语言Go 1.18+
许可证Apache-2.0
仓库地址github.com/cloudwego/eino
主要依赖sonic, kin-openapi, uuid, gonja

1.2 核心设计理念

  • 组件化架构:将常见的构建模块封装为组件抽象,每个组件都有明确的输入输出类型
  • 强大编排:通过 Graph、Chain、Workflow 三种编排方式实现复杂的业务逻辑
  • 流式处理:完整支持流式数据处理,自动处理流的拼接、合并、复制等操作
  • 类型安全:编译时类型检查,确保组件间的类型匹配
  • 切面机制:支持横切面关注点,如日志、追踪、指标等

1.3 模块结构概览

graph TB
    subgraph "Eino 核心框架"
        A[Schema 基础数据层] --> B[Components 组件抽象层]
        B --> C[Compose 编排引擎层]
        C --> D[Flow 流程预制层]
        C --> E[ADK 智能体开发套件]
        F[Callbacks 回调切面层] -.-> C
    end
    
    subgraph "组件类型"
        G[ChatModel 聊天模型] --> B
        H[Tool 工具] --> B
        I[ChatTemplate 模板] --> B
        J[Retriever 检索器] --> B
        K[Embedding 嵌入] --> B
        L[Indexer 索引器] --> B
    end
    
    subgraph "编排方式"
        M[Chain 链式编排] --> C
        N[Graph 图式编排] --> C
        O[Workflow 工作流编排] --> C
    end
    
    subgraph "执行模式"
        P[Invoke 同步执行] --> Q[Runnable 可执行接口]
        R[Stream 流式执行] --> Q
        S[Collect 收集执行] --> Q
        T[Transform 转换执行] --> Q
    end

2. 整体架构分析

2.1 分层架构图

graph TB
    subgraph "应用层 Application Layer"
        A1[用户应用] --> A2[ReAct Agent]
        A2 --> A3[自定义 Agent]
    end
    
    subgraph "流程层 Flow Layer"
        F1[Agent 流程] --> F2[多代理协作]
        F3[检索增强] --> F4[自定义流程]
    end
    
    subgraph "编排层 Compose Layer"
        C1[Chain 链式编排] --> C2[Graph 图式编排]
        C2 --> C3[Workflow 工作流]
        C4[Runnable 执行接口] --> C5[流式处理引擎]
    end
    
    subgraph "组件层 Components Layer"
        CM1[ChatModel] --> CM2[ChatTemplate]
        CM2 --> CM3[Tool]
        CM3 --> CM4[Retriever]
        CM4 --> CM5[Embedding]
        CM5 --> CM6[Indexer]
    end
    
    subgraph "基础层 Schema Layer"
        S1[Message 消息体系] --> S2[StreamReader 流处理]
        S2 --> S3[ToolInfo 工具信息]
        S3 --> S4[Document 文档]
    end
    
    subgraph "回调层 Callbacks Layer"
        CB1[OnStart/OnEnd] --> CB2[OnError]
        CB2 --> CB3[Stream Callbacks]
    end
    
    A1 --> F1
    F1 --> C1
    C1 --> CM1
    CM1 --> S1
    
    CB1 -.-> C1
    CB1 -.-> CM1

2.2 核心模块交互时序图

sequenceDiagram
    participant User as 用户应用
    participant Compose as 编排层
    participant Component as 组件层
    participant Schema as 基础层
    participant Callback as 回调层
    
    User->>Compose: 创建编排对象
    Compose->>Component: 注册组件
    Component->>Schema: 使用数据结构
    
    User->>Compose: 编译执行
    Compose->>Callback: 触发开始回调
    Compose->>Component: 执行组件逻辑
    Component->>Schema: 处理消息流
    Schema-->>Component: 返回处理结果
    Component-->>Compose: 返回组件输出
    Compose->>Callback: 触发结束回调
    Compose-->>User: 返回最终结果

3. 核心API深度分析

3.1 API分类总览

API 类型协议模块主要用途入口函数
编排 APIGo APIcompose构建 LLM 应用流程NewChain, NewGraph, NewWorkflow
组件 APIGo APIcomponents定义可复用组件各组件的 New* 函数
Agent APIGo APIadk构建智能体NewChatModelAgent, NewRunner
流程 APIGo APIflow预构建流程react.NewAgent, host.NewMultiAgent
回调 APIGo APIcallbacks切面与监控NewHandlerBuilder

3.2 核心接口定义

3.2.1 Runnable 可执行接口

// Runnable 是所有可执行对象的核心接口
// 位置: compose/runnable.go:32
type Runnable[I, O any] interface {
    // Invoke 同步执行:单输入 -> 单输出
    Invoke(ctx context.Context, input I, opts ...Option) (output O, err error)
    
    // Stream 流式执行:单输入 -> 流输出
    Stream(ctx context.Context, input I, opts ...Option) (output *schema.StreamReader[O], err error)
    
    // Collect 收集执行:流输入 -> 单输出
    Collect(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output O, err error)
    
    // Transform 转换执行:流输入 -> 流输出
    Transform(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output *schema.StreamReader[O], err error)
}

设计目的

  • 提供统一的执行接口,支持四种数据流模式
  • 自动处理流式和非流式间的转换
  • 确保类型安全的泛型设计

调用链路分析

  1. 用户调用任一执行方法
  2. 内部通过 composableRunnable 进行类型转换
  3. 根据实际实现自动选择最优执行路径
  4. 支持流式和非流式间的自动适配

3.3 编排API深度分析

3.3.1 NewChain - 链式编排入口

// NewChain 创建链式编排
// 位置: compose/chain.go:37
func NewChain[I, O any](opts ...NewGraphOption) *Chain[I, O] {
    ch := &Chain[I, O]{
        gg: NewGraph[I, O](opts...),  // 底层使用 Graph 实现
    }
    
    ch.gg.cmp = ComponentOfChain  // 标记为链式组件
    
    return ch
}

关键函数调用链

深度包/类函数作用备注
0composeNewChain创建链实例泛型类型检查
1composeNewGraph创建底层图结构复用图编排能力
2composenewGraphFromGeneric初始化图配置类型反射处理

Chain结构体定义

// Chain 链式编排结构
// 位置: compose/chain.go:47
type Chain[I, O any] struct {
    gg *Graph[I, O]  // 底层图实现
    
    nodeIdx     int      // 节点索引
    preNodeKeys []string // 前置节点键
    hasEnd      bool     // 是否已添加结束边
}

3.3.2 NewGraph - 图式编排入口

// NewGraph 创建图式编排
// 位置: compose/generic_graph.go:68
func NewGraph[I, O any](opts ...NewGraphOption) *Graph[I, O] {
    options := &newGraphOptions{}
    for _, opt := range opts {
        opt(options)
    }

    g := &Graph[I, O]{
        newGraphFromGeneric[I, O](
            ComponentOfGraph,
            options.withState,
            options.stateType,
            opts,
        ),
    }

    return g
}

Graph核心结构

// graph 内部图结构
// 位置: compose/graph.go
type graph struct {
    // 节点管理
    nodes        map[string]*graphNode     // 节点映射表
    controlEdges map[string][]string      // 控制依赖边
    dataEdges    map[string][]string      // 数据流边
    branches     map[string][]*GraphBranch // 分支条件
    
    // 执行控制
    startNodes   []string                 // 起始节点
    endNodes     []string                 // 结束节点
    
    // 类型系统
    expectedInputType  reflect.Type       // 期望输入类型
    expectedOutputType reflect.Type       // 期望输出类型
    genericHelper      *genericHelper     // 泛型助手
    
    // 状态管理
    stateType      reflect.Type           // 状态类型
    stateGenerator func(ctx context.Context) any // 状态生成器
    
    // 编译状态
    compiled   bool                       // 是否已编译
    buildError error                      // 构建错误
}

3.3.3 NewWorkflow - 工作流编排入口

// NewWorkflow 创建工作流编排
// 位置: compose/workflow.go:61
func NewWorkflow[I, O any](opts ...NewGraphOption) *Workflow[I, O] {
    options := &newGraphOptions{}
    for _, opt := range opts {
        opt(options)
    }

    wf := &Workflow[I, O]{
        g: newGraphFromGeneric[I, O](
            ComponentOfWorkflow,
            options.withState,
            options.stateType,
            opts,
        ),
        workflowNodes: make(map[string]*WorkflowNode),
        dependencies:  make(map[string]map[string]dependencyType),
    }

    return wf
}

Workflow核心特性

// Workflow 工作流结构
// 位置: compose/workflow.go:45
type Workflow[I, O any] struct {
    g                *graph                    // 底层图
    workflowNodes    map[string]*WorkflowNode  // 工作流节点
    workflowBranches []*WorkflowBranch         // 工作流分支
    dependencies     map[string]map[string]dependencyType // 依赖关系
}

// WorkflowNode 工作流节点
type WorkflowNode struct {
    g                *graph
    key              string
    addInputs        []func() error           // 输入添加函数
    staticValues     map[string]any           // 静态值
    dependencySetter func(fromNodeKey string, typ dependencyType)
    mappedFieldPath  map[string]any           // 字段映射路径
}

4. Schema模块详解

4.1 模块架构图

graph TB
    subgraph "Schema 模块"
        A[Message 消息体系] --> B[StreamReader 流处理]
        A --> C[ToolInfo 工具信息]
        A --> D[Document 文档]
        B --> E[流式操作接口]
        C --> F[工具调用结构]
        D --> G[文档处理]
    end
    
    subgraph "核心数据结构"
        H[Message] --> I[ToolCall]
        H --> J[ChatMessagePart]
        H --> K[ResponseMeta]
        L[StreamReader] --> M[流读取接口]
        N[ToolInfo] --> O[函数签名]
    end

4.2 Message消息系统

4.2.1 Message结构定义

// Message 核心消息结构
// 位置: schema/message.go
type Message struct {
    Role    RoleType `json:"role"`    // 消息角色:user、assistant、system、tool
    Content string   `json:"content"` // 消息内容
    
    // 多媒体内容支持
    MultiContent []ChatMessagePart `json:"multi_content,omitempty"`
    
    // 工具调用相关
    ToolCalls  []ToolCall `json:"tool_calls,omitempty"`
    ToolCallID string     `json:"tool_call_id,omitempty"`
    ToolName   string     `json:"tool_name,omitempty"`
    
    // 响应元数据
    ResponseMeta *ResponseMeta `json:"response_meta,omitempty"`
    
    // 推理内容(思维链)
    ReasoningContent string `json:"reasoning_content,omitempty"`
    
    // 扩展字段
    Extra map[string]any `json:"extra,omitempty"`
}

4.2.2 关键函数:ConcatMessages

// ConcatMessages 合并消息流 - 核心流处理函数
// 位置: schema/message.go
func ConcatMessages(msgs []*Message) (*Message, error) {
    var (
        contents            []string
        contentLen          int
        reasoningContents   []string
        reasoningContentLen int
        toolCalls           []ToolCall
        ret                 = Message{}
        extraList           = make([]map[string]any, 0, len(msgs))
    )

    // 1. 验证消息一致性
    for idx, msg := range msgs {
        if msg == nil {
            return nil, fmt.Errorf("unexpected nil chunk in message stream, index: %d", idx)
        }

        // 验证角色一致性
        if msg.Role != "" {
            if ret.Role == "" {
                ret.Role = msg.Role
            } else if ret.Role != msg.Role {
                return nil, fmt.Errorf("cannot concat messages with different roles: '%s' '%s'", ret.Role, msg.Role)
            }
        }
        
        // 收集内容
        if msg.Content != "" {
            contents = append(contents, msg.Content)
            contentLen += len(msg.Content)
        }
        
        // 收集工具调用
        if len(msg.ToolCalls) > 0 {
            toolCalls = append(toolCalls, msg.ToolCalls...)
        }
    }

    // 2. 合并内容
    if len(contents) > 0 {
        var sb strings.Builder
        sb.Grow(contentLen)
        for _, content := range contents {
            sb.WriteString(content)
        }
        ret.Content = sb.String()
    }

    // 3. 合并工具调用
    if len(toolCalls) > 0 {
        merged, err := concatToolCalls(toolCalls)
        if err != nil {
            return nil, err
        }
        ret.ToolCalls = merged
    }

    return &ret, nil
}

功能说明

  • 验证一致性:确保所有消息块的角色一致
  • 内容合并:高效拼接字符串内容
  • 工具调用处理:按索引合并工具调用
  • 元数据处理:合并响应元数据

4.3 StreamReader流处理系统

4.3.1 StreamReader结构

// StreamReader 流式读取器
// 位置: schema/stream.go
type StreamReader[T any] struct {
    // 内部通道和状态管理
    ch     <-chan streamFrame[T]
    closed bool
    mu     sync.Mutex
}

// streamFrame 流数据帧
type streamFrame[T any] struct {
    data T
    err  error
}

4.3.2 关键方法实现

// Recv 接收下一个流元素
// 位置: schema/stream.go
func (sr *StreamReader[T]) Recv() (T, error) {
    sr.mu.Lock()
    defer sr.mu.Unlock()
    
    if sr.closed {
        var zero T
        return zero, io.EOF
    }
    
    frame, ok := <-sr.ch
    if !ok {
        sr.closed = true
        var zero T
        return zero, io.EOF
    }
    
    if frame.err != nil {
        return frame.data, frame.err
    }
    
    return frame.data, nil
}

// StreamReaderFromArray 从数组创建流读取器
func StreamReaderFromArray[T any](items []T) *StreamReader[T] {
    ch := make(chan streamFrame[T], len(items))
    
    for _, item := range items {
        ch <- streamFrame[T]{data: item}
    }
    close(ch)
    
    return &StreamReader[T]{ch: ch}
}

4.4 Schema模块时序图

sequenceDiagram
    participant App as 应用层
    participant Msg as Message
    participant Stream as StreamReader
    participant Concat as ConcatMessages
    
    App->>Msg: 创建消息
    Msg->>Stream: 转换为流
    
    loop 流处理
        Stream->>Stream: Recv()
        Stream-->>App: 返回消息块
    end
    
    App->>Concat: 合并消息流
    Concat->>Concat: 验证一致性
    Concat->>Concat: 合并内容
    Concat->>Concat: 处理工具调用
    Concat-->>App: 返回合并结果

5. Components模块详解

5.1 模块架构图

graph TB
    subgraph "Components 组件层"
        A[ChatModel] --> B[BaseChatModel]
        A --> C[ToolCallingChatModel]
        D[Tool] --> E[InvokableTool]
        D --> F[StreamableTool]
        G[ChatTemplate] --> H[MessagesTemplate]
        I[Retriever] --> J[检索接口]
        K[Embedding] --> L[嵌入接口]
        M[Indexer] --> N[索引接口]
    end
    
    subgraph "组件特性"
        O[类型安全] --> P[编译时检查]
        Q[流式支持] --> R[自动转换]
        S[回调机制] --> T[切面注入]
    end

5.2 ChatModel组件详解

5.2.1 接口定义

// BaseChatModel 基础聊天模型接口
// 位置: components/model/interface.go
type BaseChatModel interface {
    Generate(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.Message, error)
    Stream(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.StreamReader[*schema.Message], error)
}

// ToolCallingChatModel 支持工具调用的聊天模型
type ToolCallingChatModel interface {
    BaseChatModel
    WithTools(tools []*schema.ToolInfo) (ToolCallingChatModel, error)
}

5.2.2 调用链路时序图

sequenceDiagram
    participant User as 用户代码
    participant Model as ChatModel
    participant Callback as 回调系统
    participant Stream as 流处理器
    
    User->>Model: Generate(messages)
    Model->>Callback: OnStart
    Model->>Model: 处理输入消息
    Model->>Model: 调用底层API
    Model->>Callback: OnEnd
    Model-->>User: 返回消息
    
    User->>Model: Stream(messages)
    Model->>Callback: OnStartWithStreamInput
    Model->>Stream: 创建流
    
    loop 流式输出
        Model->>Stream: 发送消息块
        Stream-->>User: 接收消息块
    end
    
    Model->>Callback: OnEndWithStreamOutput

5.3 Tool组件详解

5.3.1 接口定义与实现

// InvokableTool 可调用工具接口
// 位置: components/tool/interface.go
type InvokableTool interface {
    BaseTool
    InvokableRun(ctx context.Context, argumentsInJSON string, opts ...Option) (string, error)
}

// StreamableTool 流式工具接口  
type StreamableTool interface {
    BaseTool
    StreamableRun(ctx context.Context, argumentsInJSON string, opts ...Option) (*schema.StreamReader[string], error)
}

// BaseTool 基础工具接口
type BaseTool interface {
    Info(ctx context.Context) (*schema.ToolInfo, error)
}

5.3.2 ToolsNode工具节点实现

// ToolsNode 工具节点结构
// 位置: compose/tool_node.go
type ToolsNode struct {
    tuple                   []tool.BaseTool
    unknownToolsHandler     func(ctx context.Context, name, input string) (string, error)
    executeSequentially     bool
    toolArgumentsHandler    func(ctx context.Context, name, arguments string) (string, error)
}

// Invoke 工具执行的核心逻辑
func (tn *ToolsNode) Invoke(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) ([]*schema.Message, error) {
    // 1. 解析工具调用
    tasks, err := tn.genToolCallTasks(ctx, tn.tuple, input, opt.executedTools, false)
    if err != nil {
        return nil, err
    }

    // 2. 执行工具(并行或串行)
    if tn.executeSequentially {
        sequentialRunToolCall(ctx, runToolCallTaskByInvoke, tasks, opt.ToolOptions...)
    } else {
        parallelRunToolCall(ctx, runToolCallTaskByInvoke, tasks, opt.ToolOptions...)
    }

    // 3. 收集结果
    output := make([]*schema.Message, len(tasks))
    for i, task := range tasks {
        if task.err != nil {
            return nil, fmt.Errorf("tool execution failed: %w", task.err)
        }
        output[i] = schema.ToolMessage(task.output, task.callID, schema.WithToolName(task.name))
    }

    return output, nil
}

5.4 ChatTemplate组件详解

5.4.1 模板系统架构

// ChatTemplate 聊天模板接口
// 位置: components/prompt/interface.go
type ChatTemplate interface {
    Format(ctx context.Context, vs map[string]any, opts ...Option) ([]*schema.Message, error)
}

// 支持的模板格式
type FormatType uint8

const (
    FString    FormatType = 0  // Python 风格格式化
    GoTemplate FormatType = 1  // Go 标准模板
    Jinja2     FormatType = 2  // Jinja2 模板
)

5.4.2 模板处理核心函数

// formatContent 格式化内容的核心函数
// 位置: components/prompt/chat_template.go
func formatContent(content string, vs map[string]any, formatType FormatType) (string, error) {
    switch formatType {
    case FString:
        return pyfmt.Fmt(content, vs)
    case GoTemplate:
        parsedTmpl, err := template.New("template").
            Option("missingkey=error").
            Parse(content)
        if err != nil {
            return "", err
        }
        sb := new(strings.Builder)
        err = parsedTmpl.Execute(sb, vs)
        return sb.String(), err
    case Jinja2:
        env, err := getJinjaEnv()
        if err != nil {
            return "", err
        }
        tpl, err := env.FromString(content)
        if err != nil {
            return "", err
        }
        return tpl.Execute(vs)
    default:
        return "", fmt.Errorf("unknown format type: %v", formatType)
    }
}

6. Compose模块详解

6.1 模块职责与边界

负责

  • 编排能力: 提供 Chain、Graph、Workflow 三种编排模式
  • 类型安全: 编译时和运行时的类型检查与转换
  • 流式处理: 自动处理流的合并、分发、转换
  • 执行引擎: 提供高性能的图执行引擎
  • 状态管理: 支持有状态的图执行
  • 回调机制: 集成切面编程能力

不负责

  • 具体组件实现: 不实现具体的 LLM、工具等组件
  • 网络通信: 不处理外部服务调用
  • 持久化: 不负责数据持久化存储
  • 业务逻辑: 不包含特定领域的业务逻辑

6.2 模块架构图

graph TD
    subgraph "编排接口层"
        Chain[Chain 链式编排]
        Graph[Graph 图编排]
        Workflow[Workflow 工作流]
    end
    
    subgraph "核心抽象层"
        Runnable[Runnable 可执行接口]
        ComposableRunnable[ComposableRunnable 可组合执行器]
    end
    
    subgraph "执行引擎层"
        Runner[Runner 执行器]
        GraphManager[GraphManager 图管理器]
        ChannelManager[ChannelManager 通道管理器]
        TaskManager[TaskManager 任务管理器]
    end
    
    subgraph "支撑组件层"
        State[State 状态管理]
        Stream[Stream 流式处理]
        Branch[Branch 分支逻辑]
        FieldMapping[FieldMapping 字段映射]
    end
    
    subgraph "基础设施层"
        TypeSystem[Type System 类型系统]
        ErrorHandling[Error Handling 错误处理]
        Callbacks[Callbacks 回调机制]
    end
    
    %% 编排接口层关系
    Chain --> Runnable
    Graph --> Runnable
    Workflow --> Runnable
    
    %% 核心抽象层关系
    Runnable --> ComposableRunnable
    ComposableRunnable --> Runner
    
    %% 执行引擎层关系
    Runner --> GraphManager
    Runner --> ChannelManager
    Runner --> TaskManager
    
    %% 支撑组件层关系
    Runner --> State
    Runner --> Stream
    Runner --> Branch
    Workflow --> FieldMapping
    
    %% 基础设施层关系
    ComposableRunnable --> TypeSystem
    Runner --> ErrorHandling
    Runner --> Callbacks

6.3 核心执行引擎:Runner

6.3.1 Runner结构定义

// runner 执行引擎结构
// 位置: compose/graph_run.go:41
type runner struct {
    // 图结构
    chanSubscribeTo     map[string]*chanCall      // 通道订阅映射
    controlPredecessors map[string][]string       // 控制前驱
    dataPredecessors    map[string][]string       // 数据前驱
    successors          map[string][]string       // 后继节点
    
    // 执行控制
    inputChannels *chanCall                      // 输入通道
    eager         bool                           // 是否急切执行
    dag           bool                           // 是否为DAG模式
    
    // 类型信息
    inputType     reflect.Type                   // 输入类型
    outputType    reflect.Type                   // 输出类型
    genericHelper *genericHelper                 // 泛型助手
    
    // 处理器管理
    preBranchHandlerManager *preBranchHandlerManager // 分支前处理器管理
    preNodeHandlerManager   *preNodeHandlerManager   // 节点前处理器管理
    edgeHandlerManager      *edgeHandlerManager      // 边处理器管理
    
    // 运行时配置
    runCtx        func(ctx context.Context) context.Context // 运行时上下文
    chanBuilder   chanBuilder                               // 通道构建器
    mergeConfigs  map[string]FanInMergeConfig              // 合并配置
    
    // 中断和检查点
    checkPointer          *checkPointer    // 检查点管理
    interruptBeforeNodes  []string         // 前置中断节点
    interruptAfterNodes   []string         // 后置中断节点
    options              graphCompileOptions // 编译选项
}

6.3.2 核心执行函数:run

// run 核心执行引擎 - 框架性能的关键热点
// 位置: compose/graph_run.go:107
func (r *runner) run(ctx context.Context, isStream bool, input any, opts ...Option) (result any, err error) {
    // 🔥 热点 1: 回调处理 - 每次执行都会调用
    ctx, input = onGraphStart(ctx, input, isStream)
    defer func() {
        if err != nil {
            ctx, err = onGraphError(ctx, err)  // 🔥 错误处理热点
        } else {
            ctx, result = onGraphEnd(ctx, result, isStream)  // 🔥 结束处理热点
        }
    }()
    
    // 🔥 热点 2: 管理器初始化 - 每次执行都需要
    cm := r.initChannelManager(isStream)     // 🔥 通道管理器创建
    tm := r.initTaskManager(runWrapper, getGraphCancel(ctx), opts...)  // 🔥 任务管理器创建
    
    // 计算初始任务
    nextTasks, result, isEnd, err := r.calculateNextTasks(ctx, []*task{{
        nodeKey: START,
        call:    r.inputChannels,
        output:  input,
    }}, isStream, cm, optMap)
    
    if isEnd {
        return result, nil
    }

    // 🔥 热点 3: 主执行循环 - 最大的性能瓶颈
    for step := 0; step < maxSteps; step++ {
        // 检查上下文取消
        select {
        case <-ctx.Done():
            return nil, newGraphRunError(ctx.Err())
        default:
        }
        
        // 🔥 热点 3.1: 任务调度
        if len(nextTasks) == 0 {
            break
        }
        
        // 🔥 热点 3.2: 并发任务执行
        err := tm.submit(nextTasks)  // 🔥🔥 最大热点
        if err != nil {
            return nil, newGraphRunError(err)
        }

        // 🔥 热点 3.3: 等待任务完成
        completedTasks, canceled, canceledTasks := tm.wait()  // 🔥 同步等待开销
        if canceled {
            return nil, r.handleInterrupt(ctx, canceledTasks, cm, optMap)
        }

        // 🔥 热点 3.4: 计算下一批任务
        nextTasks, result, isEnd, err = r.calculateNextTasks(ctx, completedTasks, isStream, cm, optMap)
        if err != nil {
            return nil, newGraphRunError(err)
        }
        
        if isEnd {
            return result, nil
        }
    }
    
    return result, nil
}

性能特征:

  • 时间复杂度: O((V + E) * Steps * C),其中 C 是平均组件执行时间
  • 空间复杂度: O(V + E + B),其中 B 是缓冲区大小
  • 主要开销: 任务调度 (30%) + 组件执行 (60%) + 状态管理 (10%)

6.4 编译时序图

sequenceDiagram
    participant U as 用户
    participant C as Chain/Graph
    participant G as graph (内部)
    participant CR as ComposableRunnable
    participant R as Runner
    
    U->>C: NewChain/NewGraph()
    C->>G: 创建内部图结构
    
    U->>C: AddNode/AppendXX()
    C->>G: 添加节点到图中
    G->>G: 类型检查与验证
    
    U->>C: Compile(ctx)
    C->>G: compile(ctx, options)
    G->>G: 构建执行计划
    G->>G: 优化图结构
    G->>CR: 创建可组合执行器
    CR->>R: 创建运行器
    G-->>C: 返回 Runnable
    C-->>U: 返回编译结果

6.5 执行时序图

sequenceDiagram
    participant U as 用户
    participant R as Runnable
    participant Runner as Runner
    participant CM as ChannelManager
    participant TM as TaskManager
    participant Node as GraphNode
    
    U->>R: Invoke(ctx, input)
    R->>Runner: run(ctx, false, input)
    Runner->>CM: 初始化通道管理器
    Runner->>TM: 初始化任务管理器
    
    loop 执行步骤
        Runner->>TM: 获取就绪任务
        TM-->>Runner: 返回可执行节点列表
        
        par 并行执行节点
            Runner->>Node: 执行节点1
            Runner->>Node: 执行节点2
        end
        
        Node-->>Runner: 返回执行结果
        Runner->>CM: 更新通道数据
        CM->>CM: 检查后续节点就绪状态
    end
    
    Runner-->>R: 返回最终结果
    R-->>U: 返回执行结果

7. ADK模块详解

7.1 模块职责与边界

ADK负责

  • Agent 抽象: 定义统一的智能体接口和生命周期
  • 智能体实现: 提供 ChatModel Agent、ReAct Agent 等基础实现
  • 多智能体协调: 支持智能体间的转移和协作
  • 状态管理: 管理智能体运行时状态和会话信息
  • 中断恢复: 支持智能体执行的中断和恢复机制
  • 工具集成: 将智能体包装为可调用的工具

ADK不负责

  • 具体模型实现: 不实现具体的 LLM 模型
  • 工具具体实现: 不实现具体的工具逻辑
  • 网络通信: 不处理外部服务调用
  • UI 交互: 不处理用户界面逻辑

7.2 ADK架构图

graph TD
    subgraph "智能体接口层"
        Agent[Agent 智能体接口]
        Runner[Runner 运行器]
        ResumableAgent[ResumableAgent 可恢复智能体]
    end
    
    subgraph "智能体实现层"
        ChatModelAgent[ChatModelAgent 聊天模型智能体]
        FlowAgent[FlowAgent 流程智能体]
        WorkflowAgent[WorkflowAgent 工作流智能体]
        ReactAgent[ReactAgent ReAct智能体]
    end
    
    subgraph "协调机制层"
        SubAgentManager[SubAgent Manager 子智能体管理]
        TransferMechanism[Transfer Mechanism 转移机制]
        HistoryRewriter[History Rewriter 历史重写]
    end
    
    subgraph "状态管理层"
        State[State 状态管理]
        SessionValues[Session Values 会话值]
        CheckPointStore[CheckPoint Store 检查点存储]
        InterruptInfo[Interrupt Info 中断信息]
    end
    
    subgraph "工具集成层"
        AgentTool[Agent Tool 智能体工具]
        ToolsConfig[Tools Config 工具配置]
        ExitTool[Exit Tool 退出工具]
    end
    
    subgraph "异步处理层"
        AsyncIterator[AsyncIterator 异步迭代器]
        AsyncGenerator[AsyncGenerator 异步生成器]
        EventStream[Event Stream 事件流]
    end
    
    %% 接口层关系
    Agent --> ChatModelAgent
    Agent --> FlowAgent
    Runner --> Agent
    ResumableAgent --> Agent
    
    %% 实现层关系
    FlowAgent --> WorkflowAgent
    FlowAgent --> ReactAgent
    ChatModelAgent --> State
    
    %% 协调机制关系
    FlowAgent --> SubAgentManager
    FlowAgent --> TransferMechanism
    FlowAgent --> HistoryRewriter
    
    %% 状态管理关系
    Runner --> CheckPointStore
    Agent --> SessionValues
    ResumableAgent --> InterruptInfo
    
    %% 工具集成关系
    ChatModelAgent --> ToolsConfig
    AgentTool --> Agent
    
    %% 异步处理关系
    Runner --> AsyncIterator
    Agent --> AsyncGenerator
    AsyncIterator --> EventStream

7.3 核心接口定义

7.3.1 Agent接口

// Agent 智能体核心接口
// 位置: adk/interface.go
type Agent interface {
    Name(ctx context.Context) string
    Description(ctx context.Context) string
    Run(ctx context.Context, input *AgentInput, options ...AgentRunOption) *AsyncIterator[*AgentEvent]
}

// AgentInput 智能体输入
type AgentInput struct {
    Messages        []Message
    EnableStreaming bool
}

// AgentEvent 智能体事件
type AgentEvent struct {
    AgentName string
    RunPath   []RunStep
    Output    *AgentOutput
    Action    *AgentAction
    Err       error
}

7.4 ChatModelAgent实现

7.4.1 结构定义

// ChatModelAgent 聊天模型智能体
// 位置: adk/chatmodel.go
type ChatModelAgent struct {
    config *ChatModelAgentConfig
    model  model.BaseChatModel
    tools  []tool.BaseTool
}

// ChatModelAgentConfig 配置结构
type ChatModelAgentConfig struct {
    Model         model.BaseChatModel  // 聊天模型
    Tools         []tool.BaseTool      // 工具列表
    MaxIterations int                  // 最大迭代次数
    Temperature   float64              // 生成温度
    SystemPrompt  string               // 系统提示
}

7.4.2 关键函数:NewChatModelAgent

// NewChatModelAgent 创建聊天模型智能体
// 位置: adk/chatmodel.go:179
func NewChatModelAgent(_ context.Context, config *ChatModelAgentConfig) (*ChatModelAgent, error) {
    // 验证配置
    if config.Model == nil {
        return nil, errors.New("model is required")
    }

    // 创建智能体实例
    agent := &ChatModelAgent{
        config: config,
        model:  config.Model,
        tools:  config.Tools,
    }

    // 设置默认配置
    if agent.config.MaxIterations == 0 {
        agent.config.MaxIterations = 10
    }

    return agent, nil
}

7.4.3 执行函数:Run

// Run 执行智能体,返回异步事件流
// 位置: adk/chatmodel.go
func (a *ChatModelAgent) Run(ctx context.Context, input *AgentInput, opts ...Option) *AsyncIterator[*AgentEvent] {
    // 创建异步生成器
    generator := NewAsyncGenerator[*AgentEvent]()

    // 启动执行协程
    go func() {
        defer generator.Close()

        // 初始化状态
        state := &State{
            Messages:    input.Messages,
            Iterations:  0,
            MaxIterations: a.config.MaxIterations,
        }

        // 执行循环
        for state.Iterations < state.MaxIterations {
            // 生成回复
            response, err := a.model.Generate(ctx, state.Messages)
            if err != nil {
                generator.SendError(err)
                return
            }

            // 检查工具调用
            if hasToolCalls(response) {
                // 执行工具
                toolResults, err := a.executeTools(ctx, response.ToolCalls)
                if err != nil {
                    generator.SendError(err)
                    return
                }

                // 更新消息历史
                state.Messages = append(state.Messages, response)
                state.Messages = append(state.Messages, toolResults...)
                state.Iterations++

                // 发送工具执行事件
                generator.Send(&AgentEvent{
                    Type: EventTypeToolExecution,
                    Data: toolResults,
                })
            } else {
                // 发送最终回复事件
                generator.Send(&AgentEvent{
                    Type: EventTypeResponse,
                    Data: response,
                })
                return
            }
        }
    }()

    return generator.Iterator()
}

7.5 ReAct Agent实现

7.5.1 ReAct Agent结构

// Agent ReAct智能体结构
// 位置: flow/agent/react/agent.go
type Agent struct {
    runnable         compose.Runnable[[]*schema.Message, *schema.Message]
    graph            *compose.Graph[[]*schema.Message, *schema.Message]
    graphAddNodeOpts []compose.GraphAddNodeOpt
}

// state ReAct状态结构
type state struct {
    Messages                 []*schema.Message
    ReturnDirectlyToolCallID string
}

7.5.2 ReAct Agent构建过程

// NewAgent 创建ReAct智能体
// 位置: flow/agent/react/agent.go
func NewAgent(ctx context.Context, config *AgentConfig) (*Agent, error) {
    // 1. 准备组件
    chatModel, err := agent.ChatModelWithTools(config.Model, config.ToolCallingModel, toolInfos)
    if err != nil {
        return nil, err
    }

    toolsNode, err := compose.NewToolNode(ctx, &config.ToolsConfig)
    if err != nil {
        return nil, err
    }

    // 2. 构建图
    graph := compose.NewGraph[[]*schema.Message, *schema.Message](
        compose.WithGenLocalState(func(ctx context.Context) *state {
            return &state{Messages: make([]*schema.Message, 0, config.MaxStep+1)}
        }))

    // 3. 添加模型节点
    modelPreHandle := func(ctx context.Context, input []*schema.Message, state *state) ([]*schema.Message, error) {
        state.Messages = append(state.Messages, input...)
        if config.MessageModifier != nil {
            return config.MessageModifier(ctx, state.Messages), nil
        }
        return state.Messages, nil
    }

    err = graph.AddChatModelNode(nodeKeyModel, chatModel, 
        compose.WithStatePreHandler(modelPreHandle))
    if err != nil {
        return nil, err
    }

    // 4. 添加工具节点
    toolsNodePreHandle := func(ctx context.Context, input *schema.Message, state *state) (*schema.Message, error) {
        if input != nil {
            state.Messages = append(state.Messages, input)
            state.ReturnDirectlyToolCallID = getReturnDirectlyToolCallID(input, config.ToolReturnDirectly)
        }
        return input, nil
    }

    err = graph.AddToolsNode(nodeKeyTools, toolsNode, 
        compose.WithStatePreHandler(toolsNodePreHandle))
    if err != nil {
        return nil, err
    }

    // 5. 添加分支逻辑
    modelPostBranchCondition := func(ctx context.Context, sr *schema.StreamReader[*schema.Message]) (string, error) {
        isToolCall, err := config.StreamToolCallChecker(ctx, sr)
        if err != nil {
            return "", err
        }
        if isToolCall {
            return nodeKeyTools, nil
        }
        return compose.END, nil
    }

    err = graph.AddBranch(nodeKeyModel, compose.NewStreamGraphBranch(
        modelPostBranchCondition, 
        map[string]bool{nodeKeyTools: true, compose.END: true}))
    if err != nil {
        return nil, err
    }

    // 6. 编译图
    runnable, err := graph.Compile(ctx, compileOpts...)
    if err != nil {
        return nil, err
    }

    return &Agent{
        runnable: runnable,
        graph:    graph,
    }, nil
}

7.6 ADK执行时序图

sequenceDiagram
    participant U as 用户
    participant R as Runner
    participant CMA as ChatModelAgent
    participant M as ChatModel
    participant T as Tools
    participant S as State
    
    U->>R: Run(ctx, messages)
    R->>CMA: Run(ctx, input)
    CMA->>S: 初始化状态
    
    loop 推理循环 (最大迭代次数)
        CMA->>M: Generate(ctx, messages)
        
        alt 包含工具调用
            M-->>CMA: 工具调用消息
            CMA->>T: 执行工具调用
            T-->>CMA: 工具执行结果
            CMA->>S: 更新状态
            
            alt 工具返回直接结果
                CMA-->>R: 返回工具结果
            else 继续推理
                CMA->>M: 继续生成 (带工具结果)
            end
        else 直接回复
            M-->>CMA: 最终回复
            CMA->>S: 更新输出状态
            CMA-->>R: 返回最终结果
        end
    end
    
    R-->>U: 返回事件流

8. Callbacks模块详解

8.1 回调系统架构

graph TB
    subgraph "回调系统"
        A[Handler 接口] --> B[HandlerBuilder]
        A --> C[TimingChecker]
        D[CallbackManager] --> E[全局处理器]
        D --> F[节点处理器]
        G[回调时机] --> H[OnStart/OnEnd]
        G --> I[OnError]
        G --> J[Stream 回调]
    end

8.2 回调接口定义

// Handler 回调处理器接口
// 位置: callbacks/interface.go
type Handler interface {
    OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
    OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
    OnError(ctx context.Context, info *RunInfo, err error) context.Context
    OnStartWithStreamInput(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
    OnEndWithStreamOutput(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
}

// TimingChecker 时机检查器
type TimingChecker interface {
    NeedTiming(timing CallbackTiming) bool
}

8.3 HandlerBuilder构建器

// HandlerBuilder 回调处理器构建器
// 位置: callbacks/handler_builder.go
type HandlerBuilder struct {
    onStartFn                func(context.Context, *RunInfo, CallbackInput) context.Context
    onEndFn                  func(context.Context, *RunInfo, CallbackOutput) context.Context
    onErrorFn                func(context.Context, *RunInfo, error) context.Context
    onStartWithStreamInputFn func(context.Context, *RunInfo, CallbackInput) context.Context
    onEndWithStreamOutputFn  func(context.Context, *RunInfo, CallbackOutput) context.Context
}

// NewHandlerBuilder 创建处理器构建器
func NewHandlerBuilder() *HandlerBuilder {
    return &HandlerBuilder{}
}

// OnStartFn 设置开始回调
func (hb *HandlerBuilder) OnStartFn(fn func(context.Context, *RunInfo, CallbackInput) context.Context) *HandlerBuilder {
    hb.onStartFn = fn
    return hb
}

// OnEndFn 设置结束回调
func (hb *HandlerBuilder) OnEndFn(fn func(context.Context, *RunInfo, CallbackOutput) context.Context) *HandlerBuilder {
    hb.onEndFn = fn
    return hb
}

// OnErrorFn 设置错误回调
func (hb *HandlerBuilder) OnErrorFn(fn func(context.Context, *RunInfo, error) context.Context) *HandlerBuilder {
    hb.onErrorFn = fn
    return hb
}

// Build 构建处理器
func (hb *HandlerBuilder) Build() Handler {
    return &handlerImpl{
        onStartFn:                hb.onStartFn,
        onEndFn:                  hb.onEndFn,
        onErrorFn:                hb.onErrorFn,
        onStartWithStreamInputFn: hb.onStartWithStreamInputFn,
        onEndWithStreamOutputFn:  hb.onEndWithStreamOutputFn,
    }
}

8.4 回调执行流程

// executeWithCallbacks 回调执行的核心逻辑
// 位置: callbacks/interface.go
func executeWithCallbacks[I, O any](
    ctx context.Context,
    executor func(context.Context, I) (O, error),
    input I,
    handlers []Handler,
) (O, error) {
    // 1. 执行 OnStart 回调
    for _, handler := range handlers {
        if checker, ok := handler.(TimingChecker); ok {
            if !checker.NeedTiming(TimingOnStart) {
                continue
            }
        }
        ctx = handler.OnStart(ctx, runInfo, input)
    }

    // 2. 执行主逻辑
    output, err := executor(ctx, input)

    // 3. 执行回调
    if err != nil {
        // 错误回调
        for _, handler := range handlers {
            ctx = handler.OnError(ctx, runInfo, err)
        }
    } else {
        // 成功回调
        for _, handler := range handlers {
            ctx = handler.OnEnd(ctx, runInfo, output)
        }
    }

    return output, err
}

9. 关键数据结构与继承关系

9.1 核心接口继承图

classDiagram
    class Runnable~I,O~ {
        <<interface>>
        +Invoke(ctx, I, ...Option) (O, error)
        +Stream(ctx, I, ...Option) (*StreamReader~O~, error)
        +Collect(ctx, *StreamReader~I~, ...Option) (O, error)
        +Transform(ctx, *StreamReader~I~, ...Option) (*StreamReader~O~, error)
    }
    
    class BaseChatModel {
        <<interface>>
        +Generate(ctx, []*Message, ...Option) (*Message, error)
        +Stream(ctx, []*Message, ...Option) (*StreamReader~*Message~, error)
    }
    
    class ToolCallingChatModel {
        <<interface>>
        +WithTools([]*ToolInfo) (ToolCallingChatModel, error)
    }
    
    class InvokableTool {
        <<interface>>
        +Info(ctx) (*ToolInfo, error)
        +InvokableRun(ctx, string, ...Option) (string, error)
    }
    
    class StreamableTool {
        <<interface>>
        +StreamableRun(ctx, string, ...Option) (*StreamReader~string~, error)
    }
    
    class Agent {
        <<interface>>
        +Name(ctx) string
        +Description(ctx) string
        +Run(ctx, *AgentInput, ...AgentRunOption) *AsyncIterator~*AgentEvent~
    }
    
    BaseChatModel <|-- ToolCallingChatModel
    BaseTool <|-- InvokableTool
    BaseTool <|-- StreamableTool
    InvokableTool <|-- StreamableTool

9.2 核心数据结构关系图

classDiagram
    class Message {
        +Role RoleType
        +Content string
        +MultiContent []ChatMessagePart
        +ToolCalls []ToolCall
        +ToolCallID string
        +ToolName string
        +ResponseMeta *ResponseMeta
        +ReasoningContent string
        +Extra map[string]any
    }
    
    class ToolCall {
        +Index *int
        +ID string
        +Type string
        +Function FunctionCall
        +Extra map[string]any
    }
    
    class FunctionCall {
        +Name string
        +Arguments string
    }
    
    class StreamReader~T~ {
        -ch <-chan streamFrame~T~
        -closed bool
        -mu sync.Mutex
        +Recv() (T, error)
        +Close() error
    }
    
    class graph {
        +nodes map[string]*graphNode
        +controlEdges map[string][]string
        +dataEdges map[string][]string
        +branches map[string][]*GraphBranch
        +expectedInputType reflect.Type
        +expectedOutputType reflect.Type
        +genericHelper *genericHelper
        +stateType reflect.Type
        +stateGenerator func(context.Context) any
    }
    
    class composableRunnable {
        +i invoke
        +t transform
        +inputType reflect.Type
        +outputType reflect.Type
        +optionType reflect.Type
        +genericHelper *genericHelper
        +isPassthrough bool
        +meta *executorMeta
        +nodeInfo *nodeInfo
    }
    
    Message --> ToolCall
    ToolCall --> FunctionCall
    graph --> composableRunnable

9.3 编排类型层次结构

classDiagram
    class Graph~I,O~ {
        +*graph
        +AddChatModelNode(string, BaseChatModel, ...GraphAddNodeOpt) error
        +AddToolsNode(string, *ToolsNode, ...GraphAddNodeOpt) error
        +AddEdge(string, string, ...GraphAddEdgeOpt) error
        +AddBranch(string, *GraphBranch, ...GraphAddBranchOpt) error
        +Compile(context.Context, ...GraphCompileOption) (Runnable~I,O~, error)
    }
    
    class Chain~I,O~ {
        +gg *Graph~I,O~
        +nodeIdx int
        +preNodeKeys []string
        +hasEnd bool
        +AppendChatModel(BaseChatModel, ...GraphAddNodeOpt) *Chain~I,O~
        +AppendChatTemplate(ChatTemplate, ...GraphAddNodeOpt) *Chain~I,O~
        +AppendToolsNode(*ToolsNode, ...GraphAddNodeOpt) *Chain~I,O~
        +Compile(context.Context, ...GraphCompileOption) (Runnable~I,O~, error)
    }
    
    class Workflow~I,O~ {
        +g *graph
        +workflowNodes map[string]*WorkflowNode
        +workflowBranches []*WorkflowBranch
        +dependencies map[string]map[string]dependencyType
        +AddChatModelNode(string, BaseChatModel, ...GraphAddNodeOpt) *WorkflowNode
        +Compile(context.Context, ...GraphCompileOption) (Runnable~I,O~, error)
    }
    
    class WorkflowNode {
        +g *graph
        +key string
        +addInputs []func() error
        +staticValues map[string]any
        +dependencySetter func(string, dependencyType)
        +mappedFieldPath map[string]any
        +AddInput(string, ...*FieldMapping) *WorkflowNode
        +SetStaticValue(FieldPath, any) *WorkflowNode
    }
    
    Graph~I,O~ <|-- Chain~I,O~
    Workflow~I,O~ --> WorkflowNode

10. 实战经验与最佳实践

10.1 性能优化实战

10.1.1 热点函数优化

问题识别:通过性能分析发现 runner.run() 是最大的性能瓶颈

优化策略

// 优化前: 每次都创建新的管理器
func (r *runner) run(ctx context.Context, isStream bool, input any, opts ...Option) {
    cm := r.initChannelManager(isStream)     // 🔥 热点
    tm := r.initTaskManager(...)             // 🔥 热点
    // ...
}

// 优化后: 管理器复用
type runner struct {
    cmPool sync.Pool  // 通道管理器池
    tmPool sync.Pool  // 任务管理器池
    // ...
}

func (r *runner) run(ctx context.Context, isStream bool, input any, opts ...Option) {
    cm := r.cmPool.Get().(*channelManager)   // 复用对象
    defer r.cmPool.Put(cm)
    
    tm := r.tmPool.Get().(*taskManager)      // 复用对象
    defer r.tmPool.Put(tm)
    // ...
}

10.1.2 内存优化实战

流式处理优化

// 优化前: 无限制缓冲
type StreamReader[T any] struct {
    buffer []T  // 可能无限增长
}

// 优化后: 环形缓冲区
type StreamReader[T any] struct {
    buffer    []T
    head, tail int
    size       int
    maxSize    int  // 最大缓冲区限制
}

func (sr *StreamReader[T]) Recv() (T, error) {
    if sr.size >= sr.maxSize {
        return sr.zero, ErrBufferFull  // 背压控制
    }
    // ...
}

10.2 架构设计实战

10.2.1 组件选择指南

场景推荐编排方式理由示例
简单顺序处理Chain代码简洁,性能最优模板→模型→输出
复杂分支逻辑Graph支持条件分支和循环ReAct Agent
字段级数据映射Workflow精确的数据流控制复杂数据处理管道
智能代理ADK专门的代理抽象对话机器人

10.2.2 类型安全实战

// 推荐:使用泛型确保类型安全
chain := compose.NewChain[map[string]any, *schema.Message]().
    AppendChatTemplate(template).
    AppendChatModel(model)

// 编译时类型检查
runnable, err := chain.Compile(ctx)
if err != nil {
    // 处理编译错误
}

// 运行时类型安全
result, err := runnable.Invoke(ctx, map[string]any{
    "query": "Hello",
})

10.3 错误处理实战

10.3.1 统一错误处理模式

// 统一错误处理
func handleGraphExecution(ctx context.Context, runnable compose.Runnable[Input, Output], input Input) (Output, error) {
    result, err := runnable.Invoke(ctx, input)
    if err != nil {
        // 检查是否为中断错误
        if interruptErr, ok := err.(compose.InterruptError); ok {
            info := interruptErr.GetInterruptInfo()
            log.Printf("图执行被中断: %+v", info)
            
            // 可以选择恢复执行
            // return resumeExecution(ctx, runnable, info)
        }
        
        return result, fmt.Errorf("图执行失败: %w", err)
    }
    
    return result, nil
}

10.3.2 回调错误处理

// 错误恢复回调
handler := callbacks.NewHandlerBuilder().
    OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
        // 记录错误
        log.Error("组件执行错误", "component", info.Name, "error", err)
        
        // 错误统计
        metrics.IncrementErrorCount(info.Name)
        
        // 可以在这里实现重试逻辑
        if isRetryableError(err) {
            // 设置重试标记
            ctx = context.WithValue(ctx, "retry", true)
        }
        
        return ctx
    }).
    Build()

10.4 监控与可观测性实战

10.4.1 性能监控

// 性能监控回调
performanceHandler := callbacks.NewHandlerBuilder().
    OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
        // 记录开始时间
        startTime := time.Now()
        ctx = context.WithValue(ctx, "start_time", startTime)
        
        // 记录输入大小
        if msg, ok := input.(*schema.Message); ok {
            metrics.RecordInputSize(info.Name, len(msg.Content))
        }
        
        return ctx
    }).
    OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
        // 计算执行时间
        if startTime, ok := ctx.Value("start_time").(time.Time); ok {
            duration := time.Since(startTime)
            metrics.RecordExecutionTime(info.Name, duration)
        }
        
        // 记录输出大小
        if msg, ok := output.(*schema.Message); ok {
            metrics.RecordOutputSize(info.Name, len(msg.Content))
        }
        
        return ctx
    }).
    Build()

10.4.2 链路追踪

// OpenTelemetry 集成
func (a *ChatModelAgent) Run(ctx context.Context, input *AgentInput) {
    ctx, span := tracer.Start(ctx, "agent.run")
    defer span.End()

    span.SetAttributes(
        attribute.String("agent.name", a.Name(ctx)),
        attribute.Int("input.message_count", len(input.Messages)),
    )
    
    // 执行逻辑...
}

10.5 部署与运维实战

10.5.1 配置管理

// 配置结构
type Config struct {
    // 应用配置
    Port    int    `json:"port"`
    Env     string `json:"env"`
    LogLevel string `json:"log_level"`
    
    // LLM 配置
    OpenAI struct {
        APIKey string `json:"api_key"`
        Model  string `json:"model"`
    } `json:"openai"`
    
    // 性能配置
    Performance struct {
        MaxConcurrency int           `json:"max_concurrency"`
        Timeout        time.Duration `json:"timeout"`
        BufferSize     int           `json:"buffer_size"`
    } `json:"performance"`
}

// 环境变量加载
func LoadConfig() *Config {
    cfg := &Config{}
    
    // 从环境变量加载
    cfg.OpenAI.APIKey = os.Getenv("OPENAI_API_KEY")
    cfg.Performance.MaxConcurrency = getEnvInt("MAX_CONCURRENCY", 10)
    
    return cfg
}

10.5.2 优雅关闭

// 优雅关闭实现
func (app *Application) Shutdown(ctx context.Context) error {
    // 停止接收新请求
    app.server.Shutdown(ctx)
    
    // 等待现有请求完成
    app.waitGroup.Wait()
    
    // 清理资源
    if app.agent != nil {
        app.agent.Close()
    }
    
    // 关闭数据库连接
    if app.db != nil {
        app.db.Close()
    }
    
    return nil
}

10.6 常见问题与解决方案

10.6.1 内存泄漏问题

问题:StreamReader 没有正确关闭导致 goroutine 泄漏

解决方案

// 确保流正确关闭
func processStream(stream *schema.StreamReader[*schema.Message]) error {
    defer stream.Close()  // 确保关闭
    
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        
        // 处理消息
        processMessage(msg)
    }
    
    return nil
}

10.6.2 类型转换问题

问题:运行时类型转换失败

解决方案

// 安全的类型转换
func safeTypeConvert[T any](input any) (T, error) {
    var zero T
    
    if input == nil {
        // 处理 nil 值
        if reflect.TypeOf((*T)(nil)).Elem().Kind() == reflect.Interface {
            return zero, nil
        }
        return zero, errors.New("nil input for non-interface type")
    }
    
    if result, ok := input.(T); ok {
        return result, nil
    }
    
    return zero, fmt.Errorf("type conversion failed: expected %T, got %T", zero, input)
}

10.6.3 性能调优经验

经验总结

  1. 热点识别:使用 pprof 定期分析性能热点
  2. 内存优化:合理使用对象池,避免频繁分配
  3. 并发控制:根据硬件资源调整并发度
  4. 缓存策略:对重复计算结果进行缓存
  5. 监控告警:设置合理的性能阈值和告警

性能基准

  • 编译延迟: < 10ms (简单链)
  • 执行开销: < 1ms (框架层面)
  • 内存占用: 与组件数量线性相关
  • P95 执行延迟: < 200ms
  • P99 执行延迟: < 500ms

10.7 生产环境最佳实践

10.7.1 资源限制

// 设置资源限制
config := &Config{
    MaxConcurrency: 100,
    MaxMemoryMB:    512,
    Timeout:        30 * time.Second,
    BufferSize:     1000,
}

// 应用限制
semaphore := make(chan struct{}, config.MaxConcurrency)

func limitedExecution(ctx context.Context, fn func() error) error {
    select {
    case semaphore <- struct{}{}:
        defer func() { <-semaphore }()
        return fn()
    case <-ctx.Done():
        return ctx.Err()
    }
}

10.7.2 健康检查

// 健康检查端点
func (app *Application) HealthCheck() error {
    // 检查数据库连接
    if err := app.db.Ping(); err != nil {
        return fmt.Errorf("database unhealthy: %w", err)
    }
    
    // 检查外部服务
    if err := app.checkExternalServices(); err != nil {
        return fmt.Errorf("external services unhealthy: %w", err)
    }
    
    // 检查内存使用
    var m runtime.MemStats
    runtime.ReadMemStats(&m)
    if m.Alloc > app.config.MaxMemoryBytes {
        return fmt.Errorf("memory usage too high: %d bytes", m.Alloc)
    }
    
    return nil
}

通过以上完整的源码剖析,我们深入了解了 Eino 框架的设计理念、架构实现、关键函数和最佳实践。这个框架通过其精心设计的分层架构,实现了高度模块化、类型安全、流式优先、灵活编排和高性能的特性,为 LLM 应用开发提供了强大而灵活的基础。


总结:Eino 框架是一个设计精良的 Go 语言 LLM 应用开发框架,通过组件化架构、强大的编排能力和完整的流式处理机制,为开发者提供了构建复杂 AI 应用的强大工具。掌握其核心概念和实现原理,能够帮助开发者更好地利用这个框架构建高质量的 LLM 应用。