📖 文档概述

本文档深入分析 Eino 框架的整体架构设计,包括分层架构、模块交互关系、核心设计模式和架构图解。

🏗️ 整体架构概览

架构分层设计

graph TB
    subgraph "应用层 (Application Layer)"
        A1[聊天机器人]
        A2[RAG应用]
        A3[智能体系统]
        A4[工作流应用]
    end
    
    subgraph "编排层 (Orchestration Layer)"
        O1[Chain 链式编排]
        O2[Graph 图式编排]
        O3[Workflow 工作流编排]
        O4[ADK 智能体开发套件]
    end
    
    subgraph "组件层 (Component Layer)"
        C1[Model 模型组件]
        C2[Prompt 提示组件]
        C3[Tool 工具组件]
        C4[Retriever 检索组件]
        C5[Document 文档组件]
        C6[Embedding 嵌入组件]
        C7[Indexer 索引组件]
    end
    
    subgraph "基础层 (Foundation Layer)"
        F1[Schema 数据模式]
        F2[Callbacks 回调系统]
        F3[Stream 流处理]
        F4[Serialization 序列化]
        F5[Generic 泛型工具]
    end
    
    A1 --> O1
    A2 --> O2
    A3 --> O4
    A4 --> O3
    
    O1 --> C1
    O2 --> C2
    O3 --> C3
    O4 --> C4
    
    C1 --> F1
    C2 --> F2
    C3 --> F3
    C4 --> F4
    C5 --> F5
    
    style A1 fill:#e8f5e8
    style O1 fill:#fff3e0
    style C1 fill:#f3e5f5
    style F1 fill:#e3f2fd

核心架构原则

1. 分层解耦

  • 应用层: 面向最终用户的具体应用实现
  • 编排层: 提供组件编排和执行引擎
  • 组件层: 提供可复用的功能组件
  • 基础层: 提供底层基础设施和工具

2. 接口驱动

  • 每层都定义清晰的接口契约
  • 支持依赖注入和组件替换
  • 便于测试和扩展

3. 流式优先

  • 所有层都支持流式数据处理
  • 自动处理流的生命周期管理
  • 提供统一的流式编程模型

🔄 模块交互关系

核心模块交互图

graph TB
    subgraph "Compose 编排模块"
        direction TB
        C1[Runnable 接口]
        C2[Chain 链式编排]
        C3[Graph 图式编排]
        C4[Workflow 工作流]
        C5[Lambda 函数]
        C6[Branch 分支]
        C7[Parallel 并行]
    end
    
    subgraph "Components 组件模块"
        direction TB
        CP1[Model 接口]
        CP2[Prompt 接口]
        CP3[Tool 接口]
        CP4[Retriever 接口]
        CP5[Document 接口]
        CP6[Embedding 接口]
    end
    
    subgraph "Schema 模式模块"
        direction TB
        S1[Message 消息]
        S2[StreamReader 流读取器]
        S3[Tool 工具定义]
        S4[Document 文档]
        S5[TokenUsage 令牌使用]
    end
    
    subgraph "Callbacks 回调模块"
        direction TB
        CB1[Handler 处理器]
        CB2[Manager 管理器]
        CB3[Aspect 切面]
    end
    
    subgraph "ADK 智能体模块"
        direction TB
        A1[Agent 接口]
        A2[AgentEvent 事件]
        A3[AgentAction 动作]
        A4[MessageVariant 消息变体]
    end
    
    C1 --> CP1
    C2 --> C1
    C3 --> C1
    C4 --> C1
    
    CP1 --> S1
    CP2 --> S1
    CP3 --> S3
    CP4 --> S4
    
    C1 --> CB1
    CB1 --> CB2
    CB2 --> CB3
    
    A1 --> C1
    A2 --> S2
    A3 --> S1
    
    style C1 fill:#e8f5e8
    style CP1 fill:#fff3e0
    style S1 fill:#f3e5f5
    style CB1 fill:#e3f2fd
    style A1 fill:#fce4ec

数据流向分析

sequenceDiagram
    participant App as 应用
    participant Chain as Chain编排器
    participant Node as 组件节点
    participant Model as 模型组件
    participant Stream as 流处理器
    participant Callback as 回调系统
    
    App->>Chain: 创建编排链
    Chain->>Node: 添加组件节点
    Node->>Model: 封装模型组件
    
    App->>Chain: 编译执行
    Chain->>Callback: 注册回调
    
    App->>Chain: Invoke(input)
    Chain->>Callback: OnStart
    Chain->>Node: 执行节点
    Node->>Model: 调用模型
    Model->>Stream: 生成流数据
    Stream-->>Node: 返回结果
    Node-->>Chain: 返回结果
    Chain->>Callback: OnEnd
    Chain-->>App: 返回最终结果
    
    Note over App,Callback: 支持四种执行模式:Invoke、Stream、Collect、Transform

🎯 核心设计模式

1. 建造者模式 (Builder Pattern)

// Chain 使用建造者模式构建编排链
type Chain[I, O any] struct {
    gg *Graph[I, O]
    nodeIdx int
    preNodeKeys []string
    hasEnd bool
}

// 链式调用构建
func (c *Chain[I, O]) AppendChatModel(node model.BaseChatModel, opts ...GraphAddNodeOpt) *Chain[I, O] {
    gNode, options := toChatModelNode(node, opts...)
    c.addNode(gNode, options)
    return c // 返回自身支持链式调用
}

func (c *Chain[I, O]) AppendChatTemplate(node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Chain[I, O] {
    gNode, options := toChatTemplateNode(node, opts...)
    c.addNode(gNode, options)
    return c
}

// 最终构建
func (c *Chain[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error) {
    if err := c.addEndIfNeeded(); err != nil {
        return nil, err
    }
    return c.gg.Compile(ctx, opts...)
}

设计优势:

  • 提供流畅的API体验
  • 支持复杂对象的分步构建
  • 隐藏内部构建复杂性

2. 适配器模式 (Adapter Pattern)

适配器模式在 Eino 中主要体现在 composableRunnable 的实现上,它负责在不同执行模式之间进行自动适配。详细的 Runnable 接口定义和适配机制请参考 核心API深度分析

设计优势:

  • 统一不同组件的接口
  • 自动处理执行模式转换
  • 简化组件开发复杂度

3. 观察者模式 (Observer Pattern)

// 回调系统实现观察者模式
type Handler interface {
    OnStart(ctx context.Context, info *RunInfo, input any) context.Context
    OnEnd(ctx context.Context, info *RunInfo, output any) context.Context
    OnError(ctx context.Context, info *RunInfo, err error) context.Context
    OnStartWithStreamInput(ctx context.Context, info *RunInfo, input *schema.StreamReader[any]) context.Context
    OnEndWithStreamOutput(ctx context.Context, info *RunInfo, output *schema.StreamReader[any]) context.Context
}

// 回调管理器
type Manager struct {
    handlers []Handler
}

func (m *Manager) AddHandler(handler Handler) {
    m.handlers = append(m.handlers, handler)
}

// 事件通知
func (m *Manager) notifyStart(ctx context.Context, info *RunInfo, input any) context.Context {
    for _, handler := range m.handlers {
        ctx = handler.OnStart(ctx, info, input)
    }
    return ctx
}

设计优势:

  • 支持松耦合的事件通知
  • 便于扩展监控和调试功能
  • 支持横切面关注点处理

4. 策略模式 (Strategy Pattern)

// 不同的图执行策略
type graphRunType string

const (
    runTypePregel graphRunType = "Pregel" // 支持循环的图执行
    runTypeDAG    graphRunType = "DAG"    // 有向无环图执行
)

// 不同的通道构建策略
type chanBuilder func(dependencies []string, indirectDependencies []string, 
                     zeroValue func() any, emptyStream func() streamReader) channel

func pregelChannelBuilder(dependencies []string, indirectDependencies []string, 
                         zeroValue func() any, emptyStream func() streamReader) channel {
    // Pregel 模式的通道实现
    return &pregelChannel{
        dependencies: dependencies,
        zeroValue: zeroValue,
        emptyStream: emptyStream,
    }
}

func dagChannelBuilder(dependencies []string, indirectDependencies []string, 
                      zeroValue func() any, emptyStream func() streamReader) channel {
    // DAG 模式的通道实现
    return &dagChannel{
        dependencies: dependencies,
        indirectDependencies: indirectDependencies,
        zeroValue: zeroValue,
    }
}

设计优势:

  • 支持不同的执行策略
  • 便于扩展新的执行模式
  • 运行时策略选择

🔧 关键架构组件

1. Runnable 核心接口

Runnable 是框架的核心抽象,定义了四种数据流模式:单输入单输出(Invoke)、单输入流输出(Stream)、流输入单输出(Collect)、流输入流输出(Transform)。

详细的接口定义、自动适配机制和使用示例请参考 核心API深度分析

2. 流处理系统

流处理系统是 Eino 的核心特性之一,提供了完整的流式数据处理能力。系统包含 StreamReader、StreamWriter 等核心组件,支持流的拼接、合并、复制等操作。

详细的流处理架构、接口定义和使用方法请参考 Schema模块详解

3. 类型系统

// 泛型辅助器,提供类型安全的转换
type genericHelper struct {
    inputType  reflect.Type
    outputType reflect.Type
    
    inputConverter  handlerPair  // 输入类型转换器
    outputConverter handlerPair  // 输出类型转换器
    
    inputFieldMappingConverter handlerPair // 字段映射转换器
}

// 类型检查和转换
func checkAssignable(from, to reflect.Type) assignableType {
    if from == to {
        return assignableTypeExact // 精确匹配
    }
    
    if from == nil || to == nil {
        return assignableTypeMay // 可能匹配,需要运行时检查
    }
    
    if from.AssignableTo(to) {
        return assignableTypeExact // 可赋值
    }
    
    // 检查接口转换
    if to.Kind() == reflect.Interface {
        if from.Implements(to) {
            return assignableTypeExact
        }
    }
    
    // 检查 any 类型
    if to == reflect.TypeOf((*any)(nil)).Elem() {
        return assignableTypeExact
    }
    
    return assignableTypeMustNot // 不匹配
}

📊 性能架构设计

1. 并发执行模型

graph TB
    subgraph "并发执行引擎"
        direction TB
        E1[执行协调器]
        E2[节点调度器]
        E3[依赖管理器]
        E4[资源池]
    end
    
    subgraph "执行模式"
        direction TB
        M1[Pregel 模式]
        M2[DAG 模式]
        M3[Eager 模式]
        M4[Lazy 模式]
    end
    
    subgraph "通道系统"
        direction TB
        C1[数据通道]
        C2[控制通道]
        C3[错误通道]
        C4[状态通道]
    end
    
    E1 --> M1
    E2 --> M2
    E3 --> M3
    E4 --> M4
    
    M1 --> C1
    M2 --> C2
    M3 --> C3
    M4 --> C4
    
    style E1 fill:#e8f5e8
    style M1 fill:#fff3e0
    style C1 fill:#f3e5f5
// 并发执行的核心结构
type runner struct {
    chanSubscribeTo map[string]*chanCall // 节点通道映射
    
    successors          map[string][]string // 后继节点
    dataPredecessors    map[string][]string // 数据依赖
    controlPredecessors map[string][]string // 控制依赖
    
    inputChannels *chanCall // 输入通道
    
    chanBuilder chanBuilder // 通道构建器
    eager       bool        // 是否急切执行
    dag         bool        // 是否DAG模式
    
    runCtx func(ctx context.Context) context.Context // 运行上下文
}

// 并发执行逻辑
func (r *runner) run(ctx context.Context, isStream bool, input any, opts ...Option) (any, error) {
    if r.runCtx != nil {
        ctx = r.runCtx(ctx)
    }
    
    // 创建执行上下文
    runCtx := &runContext{
        ctx:         ctx,
        isStream:    isStream,
        channels:    make(map[string]channel),
        completed:   make(map[string]bool),
        errors:      make(chan error, len(r.chanSubscribeTo)),
    }
    
    // 初始化通道
    for name := range r.chanSubscribeTo {
        dependencies := r.dataPredecessors[name]
        indirectDeps := r.controlPredecessors[name]
        
        runCtx.channels[name] = r.chanBuilder(dependencies, indirectDeps, 
            func() any { return reflect.Zero(r.inputType).Interface() },
            func() streamReader { return emptyStreamReader() })
    }
    
    // 启动输入处理
    go r.processInput(runCtx, input)
    
    // 并发执行节点
    var wg sync.WaitGroup
    for name, call := range r.chanSubscribeTo {
        wg.Add(1)
        go func(nodeName string, nodeCall *chanCall) {
            defer wg.Done()
            r.executeNode(runCtx, nodeName, nodeCall)
        }(name, call)
    }
    
    // 等待完成或错误
    go func() {
        wg.Wait()
        close(runCtx.errors)
    }()
    
    // 收集结果
    return r.collectResult(runCtx)
}

2. 内存管理策略

// 流资源自动管理
type streamReader[T any] struct {
    ch     <-chan T
    errCh  <-chan error
    ctx    context.Context
    cancel context.CancelFunc
    
    closed    int32 // 原子操作标记
    closeOnce sync.Once
}

func (s *streamReader[T]) Close() error {
    s.closeOnce.Do(func() {
        atomic.StoreInt32(&s.closed, 1)
        if s.cancel != nil {
            s.cancel() // 取消上下文,释放资源
        }
    })
    return nil
}

// 自动关闭机制
func (s *streamReader[T]) SetAutomaticClose() {
    go func() {
        defer s.Close()
        for {
            _, err := s.Recv()
            if err == io.EOF {
                break
            }
            if err != nil {
                break
            }
        }
    }()
}

🔍 架构扩展点

1. 组件扩展

// 自定义组件接口
type CustomComponent interface {
    // 组件标识
    GetType() string
    
    // 执行接口(至少实现一个)
    Invoke(ctx context.Context, input any) (any, error)
    Stream(ctx context.Context, input any) (*schema.StreamReader[any], error)
    Collect(ctx context.Context, input *schema.StreamReader[any]) (any, error)
    Transform(ctx context.Context, input *schema.StreamReader[any]) (*schema.StreamReader[any], error)
}

// 注册自定义组件
func RegisterComponent(name string, factory func() CustomComponent) {
    componentRegistry[name] = factory
}

2. 回调扩展

// 自定义回调处理器
type CustomHandler struct {
    name string
}

func (h *CustomHandler) OnStart(ctx context.Context, info *RunInfo, input any) context.Context {
    // 自定义开始逻辑
    log.Printf("组件 %s 开始执行,输入: %v", info.ComponentName, input)
    return ctx
}

func (h *CustomHandler) OnEnd(ctx context.Context, info *RunInfo, output any) context.Context {
    // 自定义结束逻辑
    log.Printf("组件 %s 执行完成,输出: %v", info.ComponentName, output)
    return ctx
}

// 注册回调
func RegisterCallback(handler Handler) {
    callbackManager.AddHandler(handler)
}

3. 序列化扩展

// 自定义序列化器
type CustomSerializer struct{}

func (s *CustomSerializer) Serialize(data any) ([]byte, error) {
    // 自定义序列化逻辑
    return json.Marshal(data)
}

func (s *CustomSerializer) Deserialize(data []byte, target any) error {
    // 自定义反序列化逻辑
    return json.Unmarshal(data, target)
}

// 注册序列化器
func RegisterSerializer(name string, serializer Serializer) {
    serializerRegistry[name] = serializer
}

📈 架构演进路线

当前架构 (v1.0)

  • ✅ 基础编排能力
  • ✅ 流式处理支持
  • ✅ 类型安全保障
  • ✅ 基础组件库

近期规划 (v1.1-v1.2)

  • 🔄 性能优化
  • 🔄 更多组件实现
  • 🔄 可视化调试工具
  • 🔄 分布式执行支持

长期规划 (v2.0+)

  • 📋 云原生部署
  • 📋 图形化编排界面
  • 📋 AI辅助开发
  • 📋 企业级治理功能

上一篇: 框架概述与设计理念 下一篇: 核心API深度分析 - 深入分析Runnable接口和编排API

更新时间: 2024-12-19 | 文档版本: v1.0