📖 文档概述

本文档深入分析 Eino 框架的核心API,包括 Runnable 接口、编排API、Lambda函数、以及各种执行模式的详细实现和调用链路。

🎯 核心API概览

API层次结构

graph TB
    subgraph "核心接口层"
        R[Runnable Interface]
        AG[AnyGraph Interface]
        C[Component Interface]
    end
    
    subgraph "编排API层"
        Chain[Chain API]
        Graph[Graph API]
        Workflow[Workflow API]
        Lambda[Lambda API]
    end
    
    subgraph "组件API层"
        Model[Model API]
        Prompt[Prompt API]
        Tool[Tool API]
        Retriever[Retriever API]
    end
    
    subgraph "执行引擎层"
        Runner[Runner Engine]
        Channel[Channel System]
        Stream[Stream Processing]
    end
    
    R --> Chain
    R --> Graph
    R --> Workflow
    AG --> R
    
    Chain --> Model
    Graph --> Prompt
    Workflow --> Tool
    Lambda --> Retriever
    
    Model --> Runner
    Prompt --> Channel
    Tool --> Stream
    
    style R fill:#e8f5e8
    style Chain fill:#fff3e0
    style Model fill:#f3e5f5
    style Runner fill:#e3f2fd

🔧 Runnable 核心接口详解

接口定义与设计理念

// Runnable 是框架的核心抽象,定义了四种数据流模式
// 支持自动降级兼容,组件只需实现一种或多种方法即可
type Runnable[I, O any] interface {
    // Invoke: 单输入 => 单输出 (ping => pong)
    // 最基础的执行模式,适用于简单的请求-响应场景
    Invoke(ctx context.Context, input I, opts ...Option) (output O, err error)
    
    // Stream: 单输入 => 流输出 (ping => stream)
    // 适用于需要实时输出的场景,如聊天对话、长文本生成
    Stream(ctx context.Context, input I, opts ...Option) (output *schema.StreamReader[O], err error)
    
    // Collect: 流输入 => 单输出 (stream => pong)
    // 适用于需要处理流式输入并产生最终结果的场景
    Collect(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output O, err error)
    
    // Transform: 流输入 => 流输出 (stream => stream)
    // 适用于流式数据转换场景,如实时数据处理管道
    Transform(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output *schema.StreamReader[O], err error)
}

自动适配机制

// composableRunnable 实现自动适配不同执行模式
type composableRunnable struct {
    i invoke    // Invoke 方法适配器
    t transform // Transform 方法适配器
    
    inputType  reflect.Type  // 输入类型
    outputType reflect.Type  // 输出类型
    optionType reflect.Type  // 选项类型
    
    *genericHelper           // 泛型辅助器
    isPassthrough bool       // 是否为透传节点
    meta *executorMeta       // 执行器元数据
    nodeInfo *nodeInfo       // 节点信息
}

// 自动适配:如果组件只实现了 Stream,自动适配到 Invoke
func invokeByStream[I, O, TOption any](s Stream[I, O, TOption]) Invoke[I, O, TOption] {
    return func(ctx context.Context, input I, opts ...TOption) (O, error) {
        // 调用 Stream 方法获取流
        stream, err := s(ctx, input, opts...)
        if err != nil {
            return *new(O), err
        }
        defer stream.Close()
        
        // 自动拼接流数据为单一结果
        return schema.ConcatStreamReader(stream)
    }
}

// 自动适配:如果组件只实现了 Invoke,自动适配到 Stream
func streamByInvoke[I, O, TOption any](i Invoke[I, O, TOption]) Stream[I, O, TOption] {
    return func(ctx context.Context, input I, opts ...TOption) (*schema.StreamReader[O], error) {
        // 调用 Invoke 方法获取结果
        result, err := i(ctx, input, opts...)
        if err != nil {
            return nil, err
        }
        
        // 将单一结果转换为流
        return schema.StreamReaderFromArray([]O{result}), nil
    }
}

// 自动适配:Transform 到 Collect
func collectByTransform[I, O, TOption any](t Transform[I, O, TOption]) Collect[I, O, TOption] {
    return func(ctx context.Context, input *schema.StreamReader[I], opts ...TOption) (O, error) {
        // 调用 Transform 获取输出流
        outputStream, err := t(ctx, input, opts...)
        if err != nil {
            return *new(O), err
        }
        defer outputStream.Close()
        
        // 拼接输出流为单一结果
        return schema.ConcatStreamReader(outputStream)
    }
}

执行时序图

sequenceDiagram
    participant Client as 客户端
    participant Runnable as Runnable
    participant Adapter as 适配器
    participant Component as 组件
    participant Stream as 流处理器
    
    Note over Client,Stream: Invoke 执行模式
    Client->>Runnable: Invoke(ctx, input)
    Runnable->>Adapter: 检查组件实现
    
    alt 组件实现了 Invoke
        Adapter->>Component: 直接调用 Invoke
        Component-->>Adapter: 返回结果
    else 组件只实现了 Stream
        Adapter->>Component: 调用 Stream
        Component-->>Stream: 返回流
        Stream-->>Adapter: 拼接为单一结果
    end
    
    Adapter-->>Runnable: 返回结果
    Runnable-->>Client: 返回最终结果
    
    Note over Client,Stream: Stream 执行模式
    Client->>Runnable: Stream(ctx, input)
    Runnable->>Adapter: 检查组件实现
    
    alt 组件实现了 Stream
        Adapter->>Component: 直接调用 Stream
        Component-->>Stream: 返回流
    else 组件只实现了 Invoke
        Adapter->>Component: 调用 Invoke
        Component-->>Adapter: 返回结果
        Adapter->>Stream: 转换为流
    end
    
    Stream-->>Runnable: 返回流
    Runnable-->>Client: 返回流读取器

🔗 Chain 链式编排API

Chain 核心结构

// Chain 是组件的链式编排,支持顺序、并行、分支执行
type Chain[I, O any] struct {
    err error           // 构建过程中的错误
    gg *Graph[I, O]     // 底层图结构
    nodeIdx int         // 节点索引计数器
    preNodeKeys []string // 前置节点键列表
    hasEnd bool         // 是否已添加结束节点
}

// 创建新的链式编排
func NewChain[I, O any](opts ...NewGraphOption) *Chain[I, O] {
    ch := &Chain[I, O]{
        gg: NewGraph[I, O](opts...),
    }
    ch.gg.cmp = ComponentOfChain
    return ch
}

链式API方法详解

1. AppendChatModel - 添加聊天模型

// AppendChatModel 添加聊天模型节点到链中
// 参数:
//   - node: 实现 model.BaseChatModel 接口的聊天模型
//   - opts: 节点配置选项
// 返回: 链实例,支持链式调用
func (c *Chain[I, O]) AppendChatModel(node model.BaseChatModel, opts ...GraphAddNodeOpt) *Chain[I, O] {
    // 将聊天模型转换为图节点
    gNode, options := toChatModelNode(node, opts...)
    // 添加节点到链中
    c.addNode(gNode, options)
    return c
}

// toChatModelNode 将聊天模型转换为图节点
func toChatModelNode(node model.BaseChatModel, opts ...GraphAddNodeOpt) (*graphNode, *graphAddNodeOpts) {
    options := getGraphAddNodeOpts(opts...)
    
    // 创建可组合的可执行对象
    cr := runnableLambda(
        // Invoke 实现
        func(ctx context.Context, messages []*schema.Message, opts ...model.Option) (*schema.Message, error) {
            return node.Generate(ctx, messages, opts...)
        },
        // Stream 实现
        func(ctx context.Context, messages []*schema.Message, opts ...model.Option) (*schema.StreamReader[*schema.Message], error) {
            return node.Stream(ctx, messages, opts...)
        },
        nil, nil, // Collect 和 Transform 为 nil,将自动适配
        true, // 启用回调
    )
    
    // 设置执行器元数据
    cr.meta = &executorMeta{
        component:                  ComponentOfChatModel,
        isComponentCallbackEnabled: false,
        componentImplType:          getComponentImplType(node),
    }
    
    return &graphNode{
        cr:       cr,
        instance: node,
        opts:     options,
    }, options
}

2. AppendChatTemplate - 添加聊天模板

// AppendChatTemplate 添加聊天模板节点到链中
func (c *Chain[I, O]) AppendChatTemplate(node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Chain[I, O] {
    gNode, options := toChatTemplateNode(node, opts...)
    c.addNode(gNode, options)
    return c
}

// toChatTemplateNode 实现细节
func toChatTemplateNode(node prompt.ChatTemplate, opts ...GraphAddNodeOpt) (*graphNode, *graphAddNodeOpts) {
    options := getGraphAddNodeOpts(opts...)
    
    cr := runnableLambda(
        // Invoke 实现:格式化模板
        func(ctx context.Context, input map[string]any, opts ...prompt.Option) ([]*schema.Message, error) {
            return node.Format(ctx, input, opts...)
        },
        // Stream 实现:将格式化结果转换为流
        func(ctx context.Context, input map[string]any, opts ...prompt.Option) (*schema.StreamReader[[]*schema.Message], error) {
            messages, err := node.Format(ctx, input, opts...)
            if err != nil {
                return nil, err
            }
            return schema.StreamReaderFromArray([][]*schema.Message{messages}), nil
        },
        nil, nil,
        true,
    )
    
    cr.meta = &executorMeta{
        component:                  ComponentOfChatTemplate,
        isComponentCallbackEnabled: false,
        componentImplType:          getComponentImplType(node),
    }
    
    return &graphNode{
        cr:       cr,
        instance: node,
        opts:     options,
    }, options
}

3. AppendLambda - 添加自定义Lambda函数

// AppendLambda 添加Lambda节点到链中
// Lambda 是用户自定义逻辑的包装器
func (c *Chain[I, O]) AppendLambda(node *Lambda, opts ...GraphAddNodeOpt) *Chain[I, O] {
    gNode, options := toLambdaNode(node, opts...)
    c.addNode(gNode, options)
    return c
}

// toLambdaNode 实现
func toLambdaNode(node *Lambda, opts ...GraphAddNodeOpt) (*graphNode, *graphAddNodeOpts) {
    options := getGraphAddNodeOpts(opts...)
    
    return &graphNode{
        cr:       node.executor, // 直接使用 Lambda 的执行器
        instance: node,
        opts:     options,
    }, options
}

并行和分支支持

1. AppendParallel - 添加并行节点

// AppendParallel 添加并行结构到链中
// 多个节点将并发执行,结果合并后传递给下一个节点
func (c *Chain[I, O]) AppendParallel(p *Parallel) *Chain[I, O] {
    if p == nil {
        c.reportError(fmt.Errorf("append parallel invalid, parallel is nil"))
        return c
    }
    
    if len(p.nodes) <= 1 {
        c.reportError(fmt.Errorf("append parallel invalid, not enough nodes, count = %d", len(p.nodes)))
        return c
    }
    
    // 确定起始节点
    var startNode string
    if len(c.preNodeKeys) == 0 {
        startNode = START
    } else if len(c.preNodeKeys) == 1 {
        startNode = c.preNodeKeys[0]
    } else {
        c.reportError(fmt.Errorf("append parallel invalid, multiple previous nodes: %v", c.preNodeKeys))
        return c
    }
    
    // 为每个并行节点创建边
    prefix := c.nextNodeKey()
    var nodeKeys []string
    
    for i, node := range p.nodes {
        nodeKey := fmt.Sprintf("%s_parallel_%d", prefix, i)
        
        // 添加节点到图中
        if err := c.gg.addNode(nodeKey, node.First, node.Second); err != nil {
            c.reportError(fmt.Errorf("add parallel node failed: %w", err))
            return c
        }
        
        // 添加从起始节点到并行节点的边
        if err := c.gg.AddEdge(startNode, nodeKey); err != nil {
            c.reportError(fmt.Errorf("add parallel edge failed: %w", err))
            return c
        }
        
        nodeKeys = append(nodeKeys, nodeKey)
    }
    
    // 更新前置节点列表
    c.preNodeKeys = nodeKeys
    return c
}

2. AppendBranch - 添加条件分支

// AppendBranch 添加条件分支到链中
// 根据条件函数的返回值选择执行路径
func (c *Chain[I, O]) AppendBranch(b *ChainBranch) *Chain[I, O] {
    if b == nil {
        c.reportError(fmt.Errorf("append branch invalid, branch is nil"))
        return c
    }
    
    // 验证分支配置
    if len(b.key2BranchNode) <= 1 {
        c.reportError(fmt.Errorf("append branch invalid, need at least 2 branches"))
        return c
    }
    
    // 确定起始节点
    var startNode string
    if len(c.preNodeKeys) == 1 {
        startNode = c.preNodeKeys[0]
    } else {
        c.reportError(fmt.Errorf("append branch invalid, multiple previous nodes: %v", c.preNodeKeys))
        return c
    }
    
    // 为每个分支创建节点
    prefix := c.nextNodeKey()
    key2NodeKey := make(map[string]string)
    
    for key, node := range b.key2BranchNode {
        nodeKey := fmt.Sprintf("%s_branch_%s", prefix, key)
        
        if err := c.gg.addNode(nodeKey, node.First, node.Second); err != nil {
            c.reportError(fmt.Errorf("add branch node failed: %w", err))
            return c
        }
        
        key2NodeKey[key] = nodeKey
    }
    
    // 创建分支逻辑
    gBranch := *b.internalBranch
    gBranch.invoke = func(ctx context.Context, in any) ([]string, error) {
        ends, err := b.internalBranch.invoke(ctx, in)
        if err != nil {
            return nil, err
        }
        
        // 将分支键转换为节点键
        nodeKeyEnds := make([]string, 0, len(ends))
        for _, end := range ends {
            if nodeKey, ok := key2NodeKey[end]; ok {
                nodeKeyEnds = append(nodeKeyEnds, nodeKey)
            } else {
                return nil, fmt.Errorf("branch returns unknown end node: %s", end)
            }
        }
        
        return nodeKeyEnds, nil
    }
    
    // 添加分支到图中
    if err := c.gg.AddBranch(startNode, &gBranch); err != nil {
        c.reportError(fmt.Errorf("chain append branch failed: %w", err))
        return c
    }
    
    // 更新前置节点列表
    c.preNodeKeys = gmap.Values(key2NodeKey)
    return c
}

编译和执行

// Compile 编译链为可执行的 Runnable
func (c *Chain[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error) {
    // 添加结束节点(如果需要)
    if err := c.addEndIfNeeded(); err != nil {
        return nil, err
    }
    
    // 委托给底层图进行编译
    return c.gg.Compile(ctx, opts...)
}

// addEndIfNeeded 添加结束边
func (c *Chain[I, O]) addEndIfNeeded() error {
    if c.hasEnd {
        return nil
    }
    
    if c.err != nil {
        return c.err
    }
    
    if len(c.preNodeKeys) == 0 {
        return fmt.Errorf("no nodes in chain")
    }
    
    // 为所有前置节点添加到 END 的边
    for _, nodeKey := range c.preNodeKeys {
        if err := c.gg.AddEdge(nodeKey, END); err != nil {
            return err
        }
    }
    
    c.hasEnd = true
    return nil
}

📊 Graph 图式编排API

Graph 核心结构

// graph 是图式编排的核心结构
type graph struct {
    nodes        map[string]*graphNode     // 节点映射
    controlEdges map[string][]string      // 控制边(执行依赖)
    dataEdges    map[string][]string      // 数据边(数据流)
    branches     map[string][]*GraphBranch // 分支映射
    startNodes   []string                 // 起始节点
    endNodes     []string                 // 结束节点
    
    stateType      reflect.Type           // 状态类型
    stateGenerator func(ctx context.Context) any // 状态生成器
    
    expectedInputType, expectedOutputType reflect.Type // 期望的输入输出类型
    
    *genericHelper                        // 泛型辅助器
    
    fieldMappingRecords map[string][]*FieldMapping // 字段映射记录
    buildError error                     // 构建错误
    cmp component                        // 组件类型
    compiled bool                        // 是否已编译
    
    // 处理器映射
    handlerOnEdges   map[string]map[string][]handlerPair // 边处理器
    handlerPreNode   map[string][]handlerPair            // 节点前处理器
    handlerPreBranch map[string][][]handlerPair          // 分支前处理器
}

Graph API方法详解

1. AddChatModelNode - 添加聊天模型节点

// AddChatModelNode 添加聊天模型节点到图中
func (g *graph) AddChatModelNode(key string, node model.BaseChatModel, opts ...GraphAddNodeOpt) error {
    gNode, options := toChatModelNode(node, opts...)
    return g.addNode(key, gNode, options)
}

// addNode 添加节点的核心逻辑
func (g *graph) addNode(key string, node *graphNode, options *graphAddNodeOpts) error {
    if g.buildError != nil {
        return g.buildError
    }
    
    if g.compiled {
        return ErrGraphCompiled
    }
    
    // 检查节点键是否为保留字
    if key == END || key == START {
        return fmt.Errorf("node '%s' is reserved", key)
    }
    
    // 检查节点是否已存在
    if _, ok := g.nodes[key]; ok {
        return fmt.Errorf("node '%s' already exists", key)
    }
    
    // 检查状态需求
    if options.needState && g.stateGenerator == nil {
        return fmt.Errorf("node '%s' needs state but graph state is not enabled", key)
    }
    
    // 验证处理器类型
    if options.processor != nil {
        if err := g.validateProcessor(key, node, options.processor); err != nil {
            return err
        }
    }
    
    g.nodes[key] = node
    return nil
}

2. AddEdge - 添加边

// AddEdge 添加数据和控制边
func (g *graph) AddEdge(startNode, endNode string) error {
    return g.addEdgeWithMappings(startNode, endNode, false, false)
}

// AddEdgeWithMapping 添加带字段映射的边
func (g *graph) AddEdgeWithMapping(startNode, endNode string, mappings ...*FieldMapping) error {
    return g.addEdgeWithMappings(startNode, endNode, false, false, mappings...)
}

// addEdgeWithMappings 添加边的核心实现
func (g *graph) addEdgeWithMappings(startNode, endNode string, noControl bool, noData bool, mappings ...*FieldMapping) error {
    if g.buildError != nil {
        return g.buildError
    }
    
    if g.compiled {
        return ErrGraphCompiled
    }
    
    // 验证边的有效性
    if startNode == END {
        return errors.New("END cannot be a start node")
    }
    if endNode == START {
        return errors.New("START cannot be an end node")
    }
    
    // 检查节点是否存在
    if _, ok := g.nodes[startNode]; !ok && startNode != START {
        return fmt.Errorf("start node '%s' not found", startNode)
    }
    if _, ok := g.nodes[endNode]; !ok && endNode != END {
        return fmt.Errorf("end node '%s' not found", endNode)
    }
    
    // 添加控制边
    if !noControl {
        g.controlEdges[startNode] = append(g.controlEdges[startNode], endNode)
        if startNode == START {
            g.startNodes = append(g.startNodes, endNode)
        }
        if endNode == END {
            g.endNodes = append(g.endNodes, startNode)
        }
    }
    
    // 添加数据边
    if !noData {
        g.addToValidateMap(startNode, endNode, mappings)
        if err := g.updateToValidateMap(); err != nil {
            return err
        }
        g.dataEdges[startNode] = append(g.dataEdges[startNode], endNode)
    }
    
    return nil
}

3. AddBranch - 添加分支

// AddBranch 添加分支到图中
func (g *graph) AddBranch(startNode string, branch *GraphBranch) error {
    return g.addBranch(startNode, branch, false)
}

// addBranch 分支添加的核心实现
func (g *graph) addBranch(startNode string, branch *GraphBranch, skipData bool) error {
    if g.buildError != nil {
        return g.buildError
    }
    
    if g.compiled {
        return ErrGraphCompiled
    }
    
    // 验证起始节点
    if startNode == END {
        return errors.New("END cannot be a start node")
    }
    
    if _, ok := g.nodes[startNode]; !ok && startNode != START {
        return fmt.Errorf("branch start node '%s' not found", startNode)
    }
    
    // 初始化分支处理器
    if _, ok := g.handlerPreBranch[startNode]; !ok {
        g.handlerPreBranch[startNode] = [][]handlerPair{}
    }
    branch.idx = len(g.handlerPreBranch[startNode])
    
    // 更新透传节点类型
    if startNode != START && g.nodes[startNode].executorMeta.component == ComponentOfPassthrough {
        g.nodes[startNode].cr.inputType = branch.inputType
        g.nodes[startNode].cr.outputType = branch.inputType
        g.nodes[startNode].cr.genericHelper = branch.genericHelper.forPredecessorPassthrough()
    }
    
    // 检查分支条件类型
    result := checkAssignable(g.getNodeOutputType(startNode), branch.inputType)
    if result == assignableTypeMustNot {
        return fmt.Errorf("branch input type mismatch")
    } else if result == assignableTypeMay {
        g.handlerPreBranch[startNode] = append(g.handlerPreBranch[startNode], []handlerPair{branch.inputConverter})
    } else {
        g.handlerPreBranch[startNode] = append(g.handlerPreBranch[startNode], []handlerPair{})
    }
    
    // 处理分支结束节点
    if !skipData {
        for endNode := range branch.endNodes {
            if _, ok := g.nodes[endNode]; !ok && endNode != END {
                return fmt.Errorf("branch end node '%s' not found", endNode)
            }
            
            g.addToValidateMap(startNode, endNode, nil)
            if err := g.updateToValidateMap(); err != nil {
                return err
            }
            
            if startNode == START {
                g.startNodes = append(g.startNodes, endNode)
            }
            if endNode == END {
                g.endNodes = append(g.endNodes, startNode)
            }
        }
    }
    
    g.branches[startNode] = append(g.branches[startNode], branch)
    return nil
}

图编译过程

// compile 编译图为可执行的 Runnable
func (g *graph) compile(ctx context.Context, opt *graphCompileOptions) (*composableRunnable, error) {
    if g.buildError != nil {
        return nil, g.buildError
    }
    
    // 确定运行类型
    runType := runTypePregel
    cb := pregelChannelBuilder
    
    if isChain(g.cmp) || isWorkflow(g.cmp) {
        if opt != nil && opt.nodeTriggerMode != "" {
            return nil, fmt.Errorf("%s doesn't support node trigger mode", g.cmp)
        }
    }
    
    if (opt != nil && opt.nodeTriggerMode == AllPredecessor) || isWorkflow(g.cmp) {
        runType = runTypeDAG
        cb = dagChannelBuilder
    }
    
    // 确定执行模式
    eager := false
    if isWorkflow(g.cmp) || runType == runTypeDAG {
        eager = true
    }
    if opt != nil && opt.eagerDisabled {
        eager = false
    }
    
    // 验证图结构
    if len(g.startNodes) == 0 {
        return nil, errors.New("no start nodes")
    }
    if len(g.endNodes) == 0 {
        return nil, errors.New("no end nodes")
    }
    
    // 编译子图
    key2SubGraphs := g.beforeChildGraphsCompile(opt)
    chanSubscribeTo := make(map[string]*chanCall)
    
    for name, node := range g.nodes {
        node.beforeChildGraphCompile(name, key2SubGraphs)
        
        // 编译节点
        r, err := node.compileIfNeeded(ctx)
        if err != nil {
            return nil, err
        }
        
        // 创建通道调用
        chCall := &chanCall{
            action:   r,
            writeTo:  g.dataEdges[name],
            controls: g.controlEdges[name],
            
            preProcessor:  node.nodeInfo.preProcessor,
            postProcessor: node.nodeInfo.postProcessor,
        }
        
        // 处理分支
        branches := g.branches[name]
        if len(branches) > 0 {
            chCall.writeToBranches = append(chCall.writeToBranches, branches...)
        }
        
        chanSubscribeTo[name] = chCall
    }
    
    // 构建依赖关系
    dataPredecessors := make(map[string][]string)
    controlPredecessors := make(map[string][]string)
    
    for start, ends := range g.controlEdges {
        for _, end := range ends {
            controlPredecessors[end] = append(controlPredecessors[end], start)
        }
    }
    
    for start, ends := range g.dataEdges {
        for _, end := range ends {
            dataPredecessors[end] = append(dataPredecessors[end], start)
        }
    }
    
    // 创建运行器
    r := &runner{
        chanSubscribeTo:     chanSubscribeTo,
        controlPredecessors: controlPredecessors,
        dataPredecessors:    dataPredecessors,
        
        inputChannels: &chanCall{
            writeTo:         g.dataEdges[START],
            controls:        g.controlEdges[START],
            writeToBranches: g.branches[START],
        },
        
        eager:       eager,
        chanBuilder: cb,
        
        inputType:     g.inputType(),
        outputType:    g.outputType(),
        genericHelper: g.genericHelper,
        
        preBranchHandlerManager: &preBranchHandlerManager{h: g.handlerPreBranch},
        preNodeHandlerManager:   &preNodeHandlerManager{h: g.handlerPreNode},
        edgeHandlerManager:      &edgeHandlerManager{h: g.handlerOnEdges},
    }
    
    // 构建后继关系
    successors := make(map[string][]string)
    for ch := range r.chanSubscribeTo {
        successors[ch] = getSuccessors(r.chanSubscribeTo[ch])
    }
    r.successors = successors
    
    // 设置状态管理
    if g.stateGenerator != nil {
        r.runCtx = func(ctx context.Context) context.Context {
            return context.WithValue(ctx, stateKey{}, &internalState{
                state: g.stateGenerator(ctx),
            })
        }
    }
    
    // DAG 验证
    if runType == runTypeDAG {
        if err := validateDAG(r.chanSubscribeTo, controlPredecessors); err != nil {
            return nil, err
        }
        r.dag = true
    }
    
    g.compiled = true
    g.onCompileFinish(ctx, opt, key2SubGraphs)
    
    return r.toComposableRunnable(), nil
}

🔧 Lambda 函数API

Lambda 类型定义

// Lambda 函数的四种类型定义
type Invoke[I, O, TOption any] func(ctx context.Context, input I, opts ...TOption) (output O, err error)
type Stream[I, O, TOption any] func(ctx context.Context, input I, opts ...TOption) (output *schema.StreamReader[O], err error)
type Collect[I, O, TOption any] func(ctx context.Context, input *schema.StreamReader[I], opts ...TOption) (output O, err error)
type Transform[I, O, TOption any] func(ctx context.Context, input *schema.StreamReader[I], opts ...TOption) (output *schema.StreamReader[O], err error)

// Lambda 包装器
type Lambda struct {
    executor *composableRunnable // 执行器
}

Lambda 创建函数

1. InvokableLambda - 创建可调用Lambda

// InvokableLambda 创建只支持 Invoke 的 Lambda
func InvokableLambda[I, O any](i InvokeWOOpt[I, O], opts ...LambdaOpt) *Lambda {
    // 包装为带选项的函数
    f := func(ctx context.Context, input I, opts_ ...unreachableOption) (output O, err error) {
        return i(ctx, input)
    }
    
    return anyLambda(f, nil, nil, nil, opts...)
}

// 使用示例
func ExampleInvokableLambda() {
    // 创建字符串处理 Lambda
    processor := compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
        return strings.ToUpper(input), nil
    })
    
    // 添加到链中
    chain := compose.NewChain[string, string]().
        AppendLambda("processor", processor)
    
    runnable, err := chain.Compile(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    
    result, err := runnable.Invoke(context.Background(), "hello world")
    if err != nil {
        log.Fatal(err)
    }
    
    fmt.Println(result) // 输出: HELLO WORLD
}

2. StreamableLambda - 创建流式Lambda

// StreamableLambda 创建支持流式输出的 Lambda
func StreamableLambda[I, O any](s StreamWOOpt[I, O], opts ...LambdaOpt) *Lambda {
    f := func(ctx context.Context, input I, opts_ ...unreachableOption) (*schema.StreamReader[O], err error) {
        return s(ctx, input)
    }
    
    return anyLambda(nil, f, nil, nil, opts...)
}

// 使用示例
func ExampleStreamableLambda() {
    // 创建流式文本生成 Lambda
    generator := compose.StreamableLambda(func(ctx context.Context, input string) (*schema.StreamReader[string], error) {
        words := strings.Fields(input)
        
        // 创建流式输出
        sr, sw := schema.Pipe[string](len(words))
        
        go func() {
            defer sw.Close()
            for _, word := range words {
                if err := sw.Send(word, nil); err != nil {
                    return
                }
                time.Sleep(100 * time.Millisecond) // 模拟延迟
            }
        }()
        
        return sr, nil
    })
    
    chain := compose.NewChain[string, string]().
        AppendLambda("generator", generator)
    
    runnable, err := chain.Compile(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    
    // 流式执行
    stream, err := runnable.Stream(context.Background(), "hello world from eino")
    if err != nil {
        log.Fatal(err)
    }
    defer stream.Close()
    
    for {
        word, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatal(err)
        }
        fmt.Printf("Word: %s\n", word)
    }
}

3. AnyLambda - 创建多功能Lambda

// AnyLambda 创建支持多种执行模式的 Lambda
func AnyLambda[I, O, TOption any](
    i Invoke[I, O, TOption], 
    s Stream[I, O, TOption],
    c Collect[I, O, TOption], 
    t Transform[I, O, TOption], 
    opts ...LambdaOpt) (*Lambda, error) {
    
    if i == nil && s == nil && c == nil && t == nil {
        return nil, fmt.Errorf("at least one lambda type must be provided")
    }
    
    return anyLambda(i, s, c, t, opts...), nil
}

// 使用示例
func ExampleAnyLambda() {
    // 创建支持多种模式的文本处理 Lambda
    lambda, err := compose.AnyLambda(
        // Invoke 实现
        func(ctx context.Context, input string, opts ...string) (string, error) {
            result := strings.ToUpper(input)
            for _, opt := range opts {
                result = strings.ReplaceAll(result, " ", opt)
            }
            return result, nil
        },
        // Stream 实现
        func(ctx context.Context, input string, opts ...string) (*schema.StreamReader[string], error) {
            words := strings.Fields(strings.ToUpper(input))
            return schema.StreamReaderFromArray(words), nil
        },
        // Collect 实现
        func(ctx context.Context, input *schema.StreamReader[string], opts ...string) (string, error) {
            var words []string
            for {
                word, err := input.Recv()
                if err == io.EOF {
                    break
                }
                if err != nil {
                    return "", err
                }
                words = append(words, word)
            }
            return strings.Join(words, " "), nil
        },
        // Transform 实现
        func(ctx context.Context, input *schema.StreamReader[string], opts ...string) (*schema.StreamReader[string], error) {
            return schema.StreamReaderWithConvert(input, func(s string) (string, error) {
                return strings.ToUpper(s), nil
            }), nil
        },
    )
    
    if err != nil {
        log.Fatal(err)
    }
    
    // 可以使用任何执行模式
    chain := compose.NewChain[string, string]().
        AppendLambda("processor", lambda)
    
    runnable, err := chain.Compile(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    
    // Invoke 模式
    result, err := runnable.Invoke(context.Background(), "hello world")
    fmt.Printf("Invoke result: %s\n", result)
    
    // Stream 模式
    stream, err := runnable.Stream(context.Background(), "hello world")
    // ... 处理流数据
}

特殊Lambda函数

1. MessageParser - 消息解析Lambda

// MessageParser 创建消息解析 Lambda
func MessageParser[T any](p schema.MessageParser[T], opts ...LambdaOpt) *Lambda {
    i := func(ctx context.Context, input *schema.Message, opts_ ...unreachableOption) (output T, err error) {
        return p.Parse(ctx, input)
    }
    
    opts = append([]LambdaOpt{WithLambdaType("MessageParser")}, opts...)
    return anyLambda(i, nil, nil, nil, opts...)
}

// 使用示例
func ExampleMessageParser() {
    // 定义解析目标结构
    type Response struct {
        Answer string `json:"answer"`
        Score  int    `json:"score"`
    }
    
    // 创建 JSON 解析器
    parser := schema.NewMessageJSONParser[Response](&schema.MessageJSONParseConfig{
        ParseFrom: schema.MessageParseFromContent,
    })
    
    // 创建解析 Lambda
    parserLambda := compose.MessageParser(parser)
    
    // 构建处理链
    chain := compose.NewChain[*schema.Message, Response]().
        AppendChatModel(chatModel).      // 生成 JSON 响应
        AppendLambda("parser", parserLambda) // 解析为结构体
    
    runnable, err := chain.Compile(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    
    // 执行并获取结构化结果
    response, err := runnable.Invoke(context.Background(), &schema.Message{
        Role:    schema.User,
        Content: "请返回一个包含答案和分数的JSON",
    })
    
    fmt.Printf("Answer: %s, Score: %d\n", response.Answer, response.Score)
}

2. ToList - 转换为列表Lambda

// ToList 创建将单个元素转换为列表的 Lambda
func ToList[I any](opts ...LambdaOpt) *Lambda {
    i := func(ctx context.Context, input I, opts_ ...unreachableOption) (output []I, err error) {
        return []I{input}, nil
    }
    
    t := func(ctx context.Context, inputS *schema.StreamReader[I], opts_ ...unreachableOption) (*schema.StreamReader[[]I], err error) {
        return schema.StreamReaderWithConvert(inputS, func(item I) ([]I, error) {
            return []I{item}, nil
        }), nil
    }
    
    return anyLambda(i, nil, nil, t, opts...)
}

// 使用示例
func ExampleToList() {
    // 创建转换 Lambda
    toListLambda := compose.ToList[*schema.Message]()
    
    // 构建链:单个消息 -> 消息列表
    chain := compose.NewChain[*schema.Message, []*schema.Message]().
        AppendLambda("toList", toListLambda)
    
    runnable, err := chain.Compile(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    
    message := schema.UserMessage("Hello")
    messages, err := runnable.Invoke(context.Background(), message)
    if err != nil {
        log.Fatal(err)
    }
    
    fmt.Printf("Messages count: %d\n", len(messages)) // 输出: Messages count: 1
}

🔄 Workflow 工作流API

Workflow 核心结构

// Workflow 是图的包装器,用声明式依赖和字段映射替代 AddEdge
// 底层使用 NodeTriggerMode(AllPredecessor),不支持循环
type Workflow[I, O any] struct {
    g                *graph                           // 底层图结构
    workflowNodes    map[string]*WorkflowNode         // 工作流节点
    workflowBranches []*WorkflowBranch                // 工作流分支
    dependencies     map[string]map[string]dependencyType // 依赖关系
}

// WorkflowNode 工作流节点
type WorkflowNode struct {
    g                *graph                    // 图引用
    key              string                    // 节点键
    addInputs        []func() error            // 输入添加函数
    staticValues     map[string]any            // 静态值
    dependencySetter func(fromNodeKey string, typ dependencyType) // 依赖设置器
    mappedFieldPath  map[string]any            // 映射字段路径
}

Workflow API方法详解

1. 节点添加方法

// AddChatModelNode 添加聊天模型节点
func (wf *Workflow[I, O]) AddChatModelNode(key string, chatModel model.BaseChatModel, opts ...GraphAddNodeOpt) *WorkflowNode {
    _ = wf.g.AddChatModelNode(key, chatModel, opts...)
    return wf.initNode(key)
}

// AddLambdaNode 添加 Lambda 节点
func (wf *Workflow[I, O]) AddLambdaNode(key string, lambda *Lambda, opts ...GraphAddNodeOpt) *WorkflowNode {
    _ = wf.g.AddLambdaNode(key, lambda, opts...)
    return wf.initNode(key)
}

// initNode 初始化工作流节点
func (wf *Workflow[I, O]) initNode(key string) *WorkflowNode {
    n := &WorkflowNode{
        g:            wf.g,
        key:          key,
        staticValues: make(map[string]any),
        dependencySetter: func(fromNodeKey string, typ dependencyType) {
            if _, ok := wf.dependencies[key]; !ok {
                wf.dependencies[key] = make(map[string]dependencyType)
            }
            wf.dependencies[key][fromNodeKey] = typ
        },
        mappedFieldPath: make(map[string]any),
    }
    wf.workflowNodes[key] = n
    return n
}

2. 依赖和数据流配置

// AddInput 创建数据和执行依赖
// 配置数据如何从前置节点流向当前节点,并确保当前节点在前置节点完成后执行
func (n *WorkflowNode) AddInput(fromNodeKey string, inputs ...*FieldMapping) *WorkflowNode {
    return n.addDependencyRelation(fromNodeKey, inputs, &workflowAddInputOpts{})
}

// AddInputWithOptions 带选项的输入添加
func (n *WorkflowNode) AddInputWithOptions(fromNodeKey string, inputs []*FieldMapping, opts ...WorkflowAddInputOpt) *WorkflowNode {
    return n.addDependencyRelation(fromNodeKey, inputs, getAddInputOpts(opts))
}

// AddDependency 创建仅执行依赖(无数据传递)
func (n *WorkflowNode) AddDependency(fromNodeKey string) *WorkflowNode {
    return n.addDependencyRelation(fromNodeKey, nil, &workflowAddInputOpts{dependencyWithoutInput: true})
}

// SetStaticValue 设置静态值
func (n *WorkflowNode) SetStaticValue(path FieldPath, value any) *WorkflowNode {
    n.staticValues[path.join()] = value
    return n
}

3. 依赖关系处理

// addDependencyRelation 添加依赖关系的核心实现
func (n *WorkflowNode) addDependencyRelation(fromNodeKey string, inputs []*FieldMapping, options *workflowAddInputOpts) *WorkflowNode {
    // 设置字段映射的源节点
    for _, input := range inputs {
        input.fromNodeKey = fromNodeKey
    }
    
    if options.noDirectDependency {
        // 创建数据映射但不建立直接执行依赖
        n.addInputs = append(n.addInputs, func() error {
            var paths []FieldPath
            for _, input := range inputs {
                paths = append(paths, input.targetPath())
            }
            
            if err := n.checkAndAddMappedPath(paths); err != nil {
                return err
            }
            
            // noControl=true, noData=false
            if err := n.g.addEdgeWithMappings(fromNodeKey, n.key, true, false, inputs...); err != nil {
                return err
            }
            
            n.dependencySetter(fromNodeKey, noDirectDependency)
            return nil
        })
    } else if options.dependencyWithoutInput {
        // 创建执行依赖但不传递数据
        n.addInputs = append(n.addInputs, func() error {
            if len(inputs) > 0 {
                return fmt.Errorf("dependency without input should not have inputs")
            }
            
            // noControl=false, noData=true
            if err := n.g.addEdgeWithMappings(fromNodeKey, n.key, false, true); err != nil {
                return err
            }
            
            n.dependencySetter(fromNodeKey, normalDependency)
            return nil
        })
    } else {
        // 创建完整的数据和执行依赖
        n.addInputs = append(n.addInputs, func() error {
            var paths []FieldPath
            for _, input := range inputs {
                paths = append(paths, input.targetPath())
            }
            
            if err := n.checkAndAddMappedPath(paths); err != nil {
                return err
            }
            
            // noControl=false, noData=false
            if err := n.g.addEdgeWithMappings(fromNodeKey, n.key, false, false, inputs...); err != nil {
                return err
            }
            
            n.dependencySetter(fromNodeKey, normalDependency)
            return nil
        })
    }
    
    return n
}

Workflow 使用示例

func ExampleWorkflow() {
    ctx := context.Background()
    
    // 创建工作流
    workflow := compose.NewWorkflow[map[string]any, *schema.Message]()
    
    // 添加节点
    templateNode := workflow.AddChatTemplateNode("template", chatTemplate)
    modelNode := workflow.AddChatModelNode("model", chatModel)
    processorNode := workflow.AddLambdaNode("processor", processorLambda)
    
    // 配置数据流和依赖
    // template -> model: 传递格式化后的消息
    modelNode.AddInput("template")
    
    // model -> processor: 传递生成的消息,并添加静态配置
    processorNode.AddInput("model", compose.MapFields("content", "text")).
                  SetStaticValue(compose.FieldPath{"config", "mode"}, "production")
    
    // 结束节点
    workflow.End().AddInput("processor")
    
    // 编译工作流
    runnable, err := workflow.Compile(ctx)
    if err != nil {
        log.Fatal(err)
    }
    
    // 执行工作流
    input := map[string]any{
        "user_query": "Hello, how are you?",
        "context":    "This is a friendly conversation",
    }
    
    result, err := runnable.Invoke(ctx, input)
    if err != nil {
        log.Fatal(err)
    }
    
    fmt.Printf("Final result: %s\n", result.Content)
}

📈 执行性能分析

执行模式对比

执行模式输入类型输出类型适用场景性能特点
Invoke单一值单一值简单请求响应延迟最低,内存占用小
Stream单一值实时输出支持增量输出,用户体验好
Collect单一值批处理内存占用随流大小增长
Transform流式转换内存占用恒定,吞吐量高

性能优化建议

1. 选择合适的执行模式

// ❌ 不推荐:对大量数据使用 Invoke
func BadExample() {
    largeData := generateLargeDataset() // 10GB 数据
    result, err := processor.Invoke(ctx, largeData) // 内存占用过高
}

// ✅ 推荐:对大量数据使用 Transform
func GoodExample() {
    largeDataStream := generateLargeDataStream() // 流式数据
    resultStream, err := processor.Transform(ctx, largeDataStream) // 恒定内存占用
}

2. 合理使用并行处理

// 创建并行处理结构
parallel := compose.NewParallel()
parallel.AddLambda("worker1", worker1Lambda)
parallel.AddLambda("worker2", worker2Lambda)
parallel.AddLambda("worker3", worker3Lambda)

chain := compose.NewChain[Input, Output]().
    AppendParallel(parallel).  // 并行执行
    AppendLambda("merger", mergerLambda) // 合并结果

3. 优化流处理

// 使用缓冲流提高性能
func CreateBufferedStream[T any](data []T, bufferSize int) *schema.StreamReader[T] {
    sr, sw := schema.Pipe[T](bufferSize) // 设置缓冲区大小
    
    go func() {
        defer sw.Close()
        for _, item := range data {
            if err := sw.Send(item, nil); err != nil {
                return
            }
        }
    }()
    
    return sr
}

上一篇: 整体架构分析 下一篇: Schema模块详解 - 深入分析消息系统和流处理机制

更新时间: 2024-12-19 | 文档版本: v1.0