1. 概述
Compose 模块是 Eino 框架的编排引擎核心,负责将各种组件组合成复杂的执行流程。该模块提供了三种主要的编排模式:Chain(链式)、Graph(图式)和 Workflow(工作流),以及统一的 Runnable 接口抽象。
Compose 模块的核心设计理念是:
- 统一抽象: 所有编排结果都编译为
Runnable
接口 - 类型安全: 利用 Go 泛型确保编译时类型检查
- 灵活编排: 支持线性、并行、分支、循环等多种执行模式
- 流式优先: 原生支持流式数据处理
- 可扩展性: 支持自定义 Lambda 函数和子图嵌套
2. 核心架构设计
2.1 整体架构图
graph TB
subgraph "编排层 (Orchestration Layer)"
Chain[Chain 链式编排]
Graph[Graph 图式编排]
Workflow[Workflow 工作流编排]
end
subgraph "编译层 (Compilation Layer)"
Compiler[Graph Compiler]
TypeChecker[Type Checker]
Optimizer[Execution Optimizer]
end
subgraph "执行层 (Execution Layer)"
Runnable[Runnable Interface]
Runner[Graph Runner]
TaskManager[Task Manager]
ChannelManager[Channel Manager]
end
subgraph "组件层 (Component Layer)"
Lambda[Lambda Functions]
Components[Eino Components]
SubGraphs[Sub Graphs]
end
Chain --> Compiler
Graph --> Compiler
Workflow --> Compiler
Compiler --> TypeChecker
Compiler --> Optimizer
TypeChecker --> Runnable
Optimizer --> Runnable
Runnable --> Runner
Runner --> TaskManager
Runner --> ChannelManager
Lambda --> Components
Components --> SubGraphs
style Chain fill:#e8f5e8
style Graph fill:#fff3e0
style Workflow fill:#f3e5f5
style Runnable fill:#e3f2fd
图 2-1: Compose 模块整体架构图
图介绍: 该架构图展示了 Compose 模块的四层架构:
- 编排层: 提供三种不同复杂度的编排方式,满足不同场景需求
- 编译层: 负责将编排定义转换为可执行的运行时结构,包括类型检查和执行优化
- 执行层: 统一的执行抽象和高效的并发执行引擎
- 组件层: 支持各种类型的可执行单元,包括 Lambda 函数、Eino 组件和子图
2.2 核心类型定义
// compose/types.go
type component = components.Component
// 内置组件类型
const (
ComponentOfUnknown component = "Unknown"
ComponentOfGraph component = "Graph"
ComponentOfWorkflow component = "Workflow"
ComponentOfChain component = "Chain"
ComponentOfPassthrough component = "Passthrough"
ComponentOfToolsNode component = "ToolsNode"
ComponentOfLambda component = "Lambda"
)
// NodeTriggerMode 控制图节点的触发模式
type NodeTriggerMode string
const (
// AnyPredecessor 当任何前驱节点完成时触发
// 适用于 Pregel 模式,支持循环和迭代
AnyPredecessor NodeTriggerMode = "any_predecessor"
// AllPredecessor 当所有前驱节点完成时才触发
// 适用于 DAG 模式,确保严格的依赖顺序
AllPredecessor NodeTriggerMode = "all_predecessor"
)
2.3 统一接口抽象
// compose/types_composable.go
// AnyGraph 可组合图的统一接口
// 由 Graph、Chain、Workflow 实现
type AnyGraph interface {
getGenericHelper() *genericHelper
compile(ctx context.Context, options *graphCompileOptions) (*composableRunnable, error)
inputType() reflect.Type
outputType() reflect.Type
component() component
}
3. Runnable 接口详解
3.1 接口定义与设计理念
// compose/runnable.go
// Runnable 是可执行对象的接口。Graph、Chain 可以编译为 Runnable。
// runnable 是 eino 的核心概念,我们对四种数据流模式做降级兼容,
// 可以自动连接只实现一种或多种方法的组件。
// Runnable 是框架的核心抽象,定义了四种数据流模式
// 详细的接口定义请参考 [核心API深度分析](/posts/eino-03-core-api-analysis/)
type Runnable[I, O any] interface {
// 省略具体方法定义 - 请参考核心API文档
}
设计理念:
- 四种数据流模式: 支持同步/异步、单值/流式的所有组合
- 自动适配: 组件只需实现一种模式,框架自动提供其他模式的适配
- 类型安全: 通过泛型确保输入输出类型的一致性
- 统一抽象: 所有编排结果都实现相同的接口
3.2 数据流模式详解
graph LR
subgraph "输入类型"
SingleInput[单值输入 I]
StreamInput[流式输入 StreamReader<I>]
end
subgraph "输出类型"
SingleOutput[单值输出 O]
StreamOutput[流式输出 StreamReader<O>]
end
subgraph "四种模式"
Invoke[Invoke: I → O]
Stream[Stream: I → StreamReader<O>]
Collect[Collect: StreamReader<I> → O]
Transform[Transform: StreamReader<I> → StreamReader<O>]
end
SingleInput --> Invoke
SingleInput --> Stream
StreamInput --> Collect
StreamInput --> Transform
Invoke --> SingleOutput
Stream --> StreamOutput
Collect --> SingleOutput
Transform --> StreamOutput
style Invoke fill:#e8f5e8
style Stream fill:#fff3e0
style Collect fill:#f3e5f5
style Transform fill:#e3f2fd
图 3-1: Runnable 四种数据流模式
3.3 自动适配机制
// compose/runnable.go 内部实现
// composableRunnable 是 Runnable 的具体实现
type composableRunnable struct {
invoke func(context.Context, any, ...Option) (any, error)
stream func(context.Context, any, ...Option) (*schema.StreamReader[any], error)
collect func(context.Context, *schema.StreamReader[any], ...Option) (any, error)
transform func(context.Context, *schema.StreamReader[any], ...Option) (*schema.StreamReader[any], error)
inputType reflect.Type
outputType reflect.Type
genericHelper *genericHelper
nodeInfo *nodeInfo
meta *executorMeta
}
// 自动适配示例:通过 Stream 实现 Invoke
func (cr *composableRunnable) invokeByStream(ctx context.Context, input any, opts ...Option) (any, error) {
stream, err := cr.stream(ctx, input, opts...)
if err != nil {
return nil, err
}
defer stream.Close()
var result any
for {
chunk, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
result = chunk // 保留最后一个值
}
return result, nil
}
// 自动适配示例:通过 Invoke 实现 Stream
func (cr *composableRunnable) streamByInvoke(ctx context.Context, input any, opts ...Option) (*schema.StreamReader[any], error) {
result, err := cr.invoke(ctx, input, opts...)
if err != nil {
return nil, err
}
// 将单个结果包装为只有一个元素的流
return schema.StreamReaderFromArray([]any{result}), nil
}
4. Chain 链式编排
4.1 设计理念与特点
Chain 是最简单的编排模式,适用于线性顺序执行的场景:
- 线性执行: 组件按添加顺序依次执行
- 构建器模式: 支持链式调用,代码简洁直观
- 自动连接: 自动处理组件间的数据流连接
- 类型推导: 自动推导和验证组件间的类型兼容性
4.2 核心结构
// compose/chain.go
// Chain 是组件链
// Chain 节点可以是并行/分支/序列组件
// Chain 设计为构建器模式使用(使用前应该 Compile())
// 接口是 `Chain 风格`,可以像这样使用:`chain.AppendXX(...).AppendXX(...)`
type Chain[I, O any] struct {
gg *Graph[I, O] // 内部使用 Graph 实现
preNodeKeys []string // 前一个节点的键列表
err error // 构建过程中的错误
}
// NewChain 创建具有输入/输出类型的链
func NewChain[I, O any](opts ...NewGraphOption) *Chain[I, O] {
ch := &Chain[I, O]{
gg: NewGraph[I, O](opts...),
}
ch.gg.cmp = ComponentOfChain
return ch
}
4.3 组件添加方法
// 聊天模型组件
func (c *Chain[I, O]) AppendChatModel(node model.BaseChatModel, opts ...GraphAddNodeOpt) *Chain[I, O] {
gNode, options := toChatModelNode(node, opts...)
c.addNode(gNode, options)
return c
}
// Lambda 函数组件
func (c *Chain[I, O]) AppendLambda(node *Lambda, opts ...GraphAddNodeOpt) *Chain[I, O] {
gNode, options := toLambdaNode(node, opts...)
c.addNode(gNode, options)
return c
}
// 检索器组件
func (c *Chain[I, O]) AppendRetriever(node retriever.Retriever, opts ...GraphAddNodeOpt) *Chain[I, O] {
gNode, options := toRetrieverNode(node, opts...)
c.addNode(gNode, options)
return c
}
// 并行结构
func (c *Chain[I, O]) AppendParallel(p *Parallel) *Chain[I, O] {
gNode, options := toParallelNode(p)
c.addNode(gNode, options)
return c
}
// 分支结构
func (c *Chain[I, O]) AppendBranch(b *ChainBranch) *Chain[I, O] {
gNode, options := toBranchNode(b)
c.addNode(gNode, options)
return c
}
4.4 自动连接机制
// compose/chain.go
func (c *Chain[I, O]) addNode(node *graphNode, options *graphAddNodeOpts) {
if c.err != nil {
return
}
if c.gg.compiled {
c.reportError(ErrChainCompiled)
return
}
if node == nil {
c.reportError(fmt.Errorf("chain add node invalid, node is nil"))
return
}
// 生成节点键
nodeKey := options.nodeOptions.nodeKey
defaultNodeKey := c.nextNodeKey()
if nodeKey == "" {
nodeKey = defaultNodeKey
}
// 添加节点到底层图
err := c.gg.addNode(nodeKey, node, options)
if err != nil {
c.reportError(err)
return
}
// 自动连接前驱节点
if len(c.preNodeKeys) == 0 {
c.preNodeKeys = append(c.preNodeKeys, START)
}
for _, preNodeKey := range c.preNodeKeys {
e := c.gg.AddEdge(preNodeKey, nodeKey)
if e != nil {
c.reportError(e)
return
}
}
// 更新前驱节点列表
c.preNodeKeys = []string{nodeKey}
}
4.5 Chain 使用示例
package main
import (
"context"
"fmt"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
func chainExample() {
ctx := context.Background()
// 创建处理器 Lambda
preprocessor := compose.InvokableLambda(func(ctx context.Context, input string) (map[string]any, error) {
return map[string]any{
"query": input,
"timestamp": time.Now(),
}, nil
})
// 创建格式化器 Lambda
formatter := compose.InvokableLambda(func(ctx context.Context, input map[string]any) (*schema.Message, error) {
query := input["query"].(string)
return &schema.Message{
Role: schema.Assistant,
Content: fmt.Sprintf("处理查询: %s", query),
}, nil
})
// 构建链
chain := compose.NewChain[string, *schema.Message]().
AppendLambda("preprocessor", preprocessor).
AppendLambda("formatter", formatter)
// 编译
runnable, err := chain.Compile(ctx)
if err != nil {
panic(err)
}
// 执行
result, err := runnable.Invoke(ctx, "Hello Eino")
if err != nil {
panic(err)
}
fmt.Printf("结果: %s\n", result.Content)
}
// 并行处理示例
func parallelChainExample() {
ctx := context.Background()
// 创建并行处理器
parallel := compose.NewParallel().
AppendLambda("processor1", compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
return "处理器1: " + input, nil
})).
AppendLambda("processor2", compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
return "处理器2: " + input, nil
}))
// 创建合并器
merger := compose.InvokableLambda(func(ctx context.Context, input map[string]any) (string, error) {
results := make([]string, 0)
for _, v := range input {
if str, ok := v.(string); ok {
results = append(results, str)
}
}
return strings.Join(results, " | "), nil
})
// 构建链
chain := compose.NewChain[string, string]().
AppendParallel(parallel).
AppendLambda("merger", merger)
// 编译并执行
runnable, err := chain.Compile(ctx)
if err != nil {
panic(err)
}
result, err := runnable.Invoke(ctx, "测试数据")
if err != nil {
panic(err)
}
fmt.Printf("并行处理结果: %s\n", result)
}
5. Graph 图式编排
5.1 设计理念与特点
Graph 提供了最灵活的编排能力,支持复杂的数据流和控制流:
- 任意拓扑: 支持任意有向图结构,包括分支、合并、循环
- 双重边类型: 区分数据流边和控制流边
- 字段映射: 支持细粒度的数据字段映射
- 条件分支: 支持基于条件的动态路由
- 两种执行模式: Pregel 模式(支持循环)和 DAG 模式(严格依赖)
5.2 核心结构
// compose/graph.go
type graph struct {
nodes map[string]*graphNode // 节点映射
dataEdges map[string][]string // 数据流边
controlEdges map[string][]string // 控制流边
branches map[string][]*GraphBranch // 条件分支
startNodes []string // 起始节点
endNodes []string // 结束节点
// 类型验证相关
toValidateMap map[string][]struct {
endNode string
mappings []*FieldMapping
}
fieldMappingRecords map[string][]*FieldMapping
// 编译状态
compiled bool
buildError error
// 类型信息
expectedInputType reflect.Type
expectedOutputType reflect.Type
genericHelper *genericHelper
// 组件类型
cmp component
// 状态管理
stateType reflect.Type
stateGenerator func() any
// 处理器
handlerOnEdges map[string]map[string][]handlerPair
handlerPreNode map[string][]handlerPair
handlerPreBranch map[string][][]handlerPair
}
5.3 节点管理
// 添加聊天模型节点
func (g *Graph[I, O]) AddChatModelNode(key string, node model.BaseChatModel, opts ...GraphAddNodeOpt) error {
gNode, options := toChatModelNode(node, opts...)
return g.g.addNode(key, gNode, options)
}
// 添加 Lambda 节点
func (g *Graph[I, O]) AddLambdaNode(key string, node *Lambda, opts ...GraphAddNodeOpt) error {
gNode, options := toLambdaNode(node, opts...)
return g.g.addNode(key, gNode, options)
}
// 添加子图节点
func (g *Graph[I, O]) AddGraphNode(key string, node AnyGraph, opts ...GraphAddNodeOpt) error {
gNode, options := toGraphNode(node, opts...)
return g.g.addNode(key, gNode, options)
}
// 添加直通节点(用于连接多个分支)
func (g *Graph[I, O]) AddPassthroughNode(key string, opts ...GraphAddNodeOpt) error {
gNode, options := toPassthroughNode(opts...)
return g.g.addNode(key, gNode, options)
}
5.4 边管理
// 添加数据流和控制流边
func (g *Graph[I, O]) AddEdge(startNode, endNode string, mappings ...*FieldMapping) error {
return g.g.addEdgeWithMappings(startNode, endNode, false, false, mappings...)
}
// 添加仅控制流边(不传递数据)
func (g *Graph[I, O]) AddControlEdge(startNode, endNode string) error {
return g.g.addEdgeWithMappings(startNode, endNode, false, true)
}
// 添加仅数据流边(不控制执行顺序)
func (g *Graph[I, O]) AddDataEdge(startNode, endNode string, mappings ...*FieldMapping) error {
return g.g.addEdgeWithMappings(startNode, endNode, true, false, mappings...)
}
// 添加条件分支
func (g *Graph[I, O]) AddBranch(startNode string, branch *GraphBranch) error {
return g.g.addBranch(startNode, branch, false)
}
5.5 字段映射机制
// compose/field_mapping.go
// FieldMapping 定义字段映射关系
type FieldMapping struct {
from string // 源字段路径
to string // 目标字段路径
}
// FieldPath 字段路径类型
type FieldPath []string
// 创建字段映射
func NewFieldMapping(from, to string) *FieldMapping {
return &FieldMapping{from: from, to: to}
}
// 从字段路径创建映射
func FromFieldPath(from FieldPath) *FieldMapping {
return &FieldMapping{from: joinFieldPath(from)}
}
// 到字段路径创建映射
func ToFieldPath(to FieldPath) *FieldMapping {
return &FieldMapping{to: joinFieldPath(to)}
}
// 字段映射应用示例
func applyFieldMappings(input map[string]any, mappings []*FieldMapping) (map[string]any, error) {
result := make(map[string]any)
for _, mapping := range mappings {
// 从输入中提取值
value, err := extractValue(input, mapping.from)
if err != nil {
return nil, err
}
// 设置到目标路径
err = setValue(result, mapping.to, value)
if err != nil {
return nil, err
}
}
return result, nil
}
5.6 条件分支
// compose/branch.go
// GraphBranch 图分支定义
type GraphBranch struct {
condition func(ctx context.Context, input any) (string, error) // 分支条件函数
endNodes map[string]bool // 目标节点集合
noDataFlow bool // 是否传递数据
}
// 创建条件分支
func NewGraphBranch(condition func(ctx context.Context, input any) (string, error)) *GraphBranch {
return &GraphBranch{
condition: condition,
endNodes: make(map[string]bool),
}
}
// 添加分支目标
func (gb *GraphBranch) AddTarget(nodeKey string) *GraphBranch {
gb.endNodes[nodeKey] = true
return gb
}
// 设置不传递数据
func (gb *GraphBranch) WithNoDataFlow() *GraphBranch {
gb.noDataFlow = true
return gb
}
// 分支执行逻辑
func (gb *GraphBranch) execute(ctx context.Context, input any) ([]string, error) {
target, err := gb.condition(ctx, input)
if err != nil {
return nil, err
}
if gb.endNodes[target] {
return []string{target}, nil
}
return nil, fmt.Errorf("branch target %s not found", target)
}
5.7 Graph 使用示例
package main
import (
"context"
"fmt"
"strings"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
func graphExample() {
ctx := context.Background()
// 创建图
graph := compose.NewGraph[string, string]()
// 添加预处理节点
err := graph.AddLambdaNode("preprocessor", compose.InvokableLambda(func(ctx context.Context, input string) (map[string]any, error) {
return map[string]any{
"text": strings.ToLower(input),
"length": len(input),
}, nil
}))
if err != nil {
panic(err)
}
// 添加条件分支
branch := compose.NewGraphBranch(func(ctx context.Context, input any) (string, error) {
data := input.(map[string]any)
length := data["length"].(int)
if length > 10 {
return "long_processor", nil
}
return "short_processor", nil
}).AddTarget("long_processor").AddTarget("short_processor")
err = graph.AddBranch("preprocessor", branch)
if err != nil {
panic(err)
}
// 添加长文本处理器
err = graph.AddLambdaNode("long_processor", compose.InvokableLambda(func(ctx context.Context, input map[string]any) (string, error) {
text := input["text"].(string)
return fmt.Sprintf("长文本处理: %s", text), nil
}))
if err != nil {
panic(err)
}
// 添加短文本处理器
err = graph.AddLambdaNode("short_processor", compose.InvokableLambda(func(ctx context.Context, input map[string]any) (string, error) {
text := input["text"].(string)
return fmt.Sprintf("短文本处理: %s", text), nil
}))
if err != nil {
panic(err)
}
// 添加合并节点
err = graph.AddPassthroughNode("merger")
if err != nil {
panic(err)
}
// 连接边
err = graph.AddEdge("long_processor", "merger")
if err != nil {
panic(err)
}
err = graph.AddEdge("short_processor", "merger")
if err != nil {
panic(err)
}
// 编译并执行
runnable, err := graph.Compile(ctx)
if err != nil {
panic(err)
}
// 测试短文本
result1, err := runnable.Invoke(ctx, "Hello")
if err != nil {
panic(err)
}
fmt.Printf("短文本结果: %s\n", result1)
// 测试长文本
result2, err := runnable.Invoke(ctx, "This is a very long text for testing")
if err != nil {
panic(err)
}
fmt.Printf("长文本结果: %s\n", result2)
}
// 字段映射示例
func fieldMappingExample() {
ctx := context.Background()
graph := compose.NewGraph[map[string]any, map[string]any]()
// 添加数据处理节点
err := graph.AddLambdaNode("processor", compose.InvokableLambda(func(ctx context.Context, input map[string]any) (map[string]any, error) {
name := input["user_name"].(string)
age := input["user_age"].(int)
return map[string]any{
"processed_name": strings.ToUpper(name),
"processed_age": age * 2,
}, nil
}))
if err != nil {
panic(err)
}
// 添加格式化节点,使用字段映射
err = graph.AddLambdaNode("formatter", compose.InvokableLambda(func(ctx context.Context, input map[string]any) (map[string]any, error) {
return map[string]any{
"result": fmt.Sprintf("Name: %s, Age: %v", input["name"], input["age"]),
}, nil
}))
if err != nil {
panic(err)
}
// 添加带字段映射的边
err = graph.AddEdge("processor", "formatter",
compose.NewFieldMapping("processed_name", "name"),
compose.NewFieldMapping("processed_age", "age"),
)
if err != nil {
panic(err)
}
// 编译并执行
runnable, err := graph.Compile(ctx)
if err != nil {
panic(err)
}
input := map[string]any{
"user_name": "alice",
"user_age": 25,
}
result, err := runnable.Invoke(ctx, input)
if err != nil {
panic(err)
}
fmt.Printf("字段映射结果: %v\n", result)
}
6. Workflow 工作流编排
6.1 设计理念与特点
Workflow 是 Graph 的高级封装,提供更声明式的编排方式:
- 声明式依赖: 通过
AddInput
和AddDependency
声明节点关系 - 静态值注入: 支持为节点设置静态输入值
- DAG 模式: 强制使用 DAG 模式,不支持循环
- 类型安全: 编译时验证依赖关系和数据流
- 简化 API: 相比 Graph 提供更简洁的 API
6.2 核心结构
// compose/workflow.go
// WorkflowNode 工作流节点
type WorkflowNode struct {
g *graph // 底层图引用
key string // 节点键
addInputs []func() error // 待添加的输入函数列表
staticValues map[string]any // 静态值映射
dependencySetter func(string, dependencyType) // 依赖设置器
mappedFieldPath map[string]any // 映射字段路径
}
// Workflow 是 graph 的包装器,用声明依赖和字段映射替代 AddEdge
// 底层使用 NodeTriggerMode(AllPredecessor),因此不支持循环
type Workflow[I, O any] struct {
g *graph // 底层图
workflowNodes map[string]*WorkflowNode // 工作流节点映射
workflowBranches []*WorkflowBranch // 工作流分支
dependencies map[string]map[string]dependencyType // 依赖关系映射
}
// 依赖类型
type dependencyType int
const (
normalDependency dependencyType = iota // 普通依赖(数据+控制)
noDirectDependency // 无直接依赖(仅控制)
branchDependency // 分支依赖
)
6.3 节点创建与配置
// 创建聊天模型节点
func (wf *Workflow[I, O]) AddChatModelNode(key string, chatModel model.BaseChatModel, opts ...GraphAddNodeOpt) *WorkflowNode {
_ = wf.g.AddChatModelNode(key, chatModel, opts...)
return wf.initNode(key)
}
// 创建 Lambda 节点
func (wf *Workflow[I, O]) AddLambdaNode(key string, lambda *Lambda, opts ...GraphAddNodeOpt) *WorkflowNode {
_ = wf.g.AddLambdaNode(key, lambda, opts...)
return wf.initNode(key)
}
// 初始化工作流节点
func (wf *Workflow[I, O]) initNode(key string) *WorkflowNode {
n := &WorkflowNode{
g: wf.g,
key: key,
staticValues: make(map[string]any),
dependencySetter: func(fromNodeKey string, typ dependencyType) {
if _, ok := wf.dependencies[key]; !ok {
wf.dependencies[key] = make(map[string]dependencyType)
}
wf.dependencies[key][fromNodeKey] = typ
},
mappedFieldPath: make(map[string]any),
}
wf.workflowNodes[key] = n
return n
}
6.4 依赖管理
// 添加输入依赖(数据流 + 控制流)
func (n *WorkflowNode) AddInput(fromNodeKey string, inputs ...*FieldMapping) *WorkflowNode {
return n.addDependencyRelation(fromNodeKey, inputs, &workflowAddInputOpts{})
}
// 添加输入依赖(带选项)
func (n *WorkflowNode) AddInputWithOptions(fromNodeKey string, inputs []*FieldMapping, opts ...WorkflowAddInputOpt) *WorkflowNode {
options := &workflowAddInputOpts{}
for _, opt := range opts {
opt(options)
}
return n.addDependencyRelation(fromNodeKey, inputs, options)
}
// 添加控制依赖(仅控制流)
func (n *WorkflowNode) AddDependency(fromNodeKey string) *WorkflowNode {
return n.addDependencyRelation(fromNodeKey, nil, &workflowAddInputOpts{dependencyWithoutInput: true})
}
// 设置静态值
func (n *WorkflowNode) SetStaticValue(path FieldPath, value any) *WorkflowNode {
pathStr := joinFieldPath(path)
n.staticValues[pathStr] = value
return n
}
// 内部依赖关系添加逻辑
func (n *WorkflowNode) addDependencyRelation(fromNodeKey string, inputs []*FieldMapping, opts *workflowAddInputOpts) *WorkflowNode {
n.addInputs = append(n.addInputs, func() error {
// 根据选项确定边类型
noControl := opts.noDirectDependency
noData := opts.dependencyWithoutInput || len(inputs) == 0
// 添加边到底层图
err := n.g.addEdgeWithMappings(fromNodeKey, n.key, noControl, noData, inputs...)
if err != nil {
return err
}
// 记录依赖类型
if opts.noDirectDependency {
n.dependencySetter(fromNodeKey, noDirectDependency)
} else if opts.dependencyWithoutInput {
n.dependencySetter(fromNodeKey, normalDependency)
} else {
n.dependencySetter(fromNodeKey, normalDependency)
}
// 验证和记录字段映射
if len(inputs) > 0 {
paths := make([]FieldPath, len(inputs))
for i, input := range inputs {
paths[i] = splitFieldPath(input.to)
}
return n.checkAndAddMappedPath(paths)
}
return nil
})
return n
}
6.5 编译过程
func (wf *Workflow[I, O]) compile(ctx context.Context, options *graphCompileOptions) (*composableRunnable, error) {
if wf.g.buildError != nil {
return nil, wf.g.buildError
}
// 处理工作流分支
for _, wb := range wf.workflowBranches {
for endNode := range wb.endNodes {
if endNode == END {
if _, ok := wf.dependencies[END]; !ok {
wf.dependencies[END] = make(map[string]dependencyType)
}
wf.dependencies[END][wb.fromNodeKey] = branchDependency
} else {
n := wf.workflowNodes[endNode]
n.dependencySetter(wb.fromNodeKey, branchDependency)
}
}
_ = wf.g.addBranch(wb.fromNodeKey, wb.GraphBranch, true)
}
// 应用所有输入依赖
for _, n := range wf.workflowNodes {
for _, addInput := range n.addInputs {
if err := addInput(); err != nil {
return nil, err
}
}
n.addInputs = nil
}
// 处理静态值
for _, n := range wf.workflowNodes {
if len(n.staticValues) > 0 {
value := make(map[string]any, len(n.staticValues))
var paths []FieldPath
for path, v := range n.staticValues {
value[path] = v
paths = append(paths, splitFieldPath(path))
}
if err := n.checkAndAddMappedPath(paths); err != nil {
return nil, err
}
// 创建静态值注入处理器
pair := handlerPair{
invoke: func(in any) (any, error) {
values := []any{in, value}
return mergeValues(values, nil)
},
transform: func(in streamReader) streamReader {
sr := schema.StreamReaderFromArray([]map[string]any{value})
newS, err := mergeValues([]any{in, packStreamReader(sr)}, nil)
if err != nil {
errSR, errSW := schema.Pipe[map[string]any](1)
errSW.Send(nil, err)
errSW.Close()
return packStreamReader(errSR)
}
return newS.(streamReader)
},
}
// 记录字段映射
for i := range paths {
wf.g.fieldMappingRecords[n.key] = append(wf.g.fieldMappingRecords[n.key], ToFieldPath(paths[i]))
}
wf.g.handlerPreNode[n.key] = []handlerPair{pair}
}
}
// 委托给底层图编译
return wf.g.compile(ctx, options)
}
6.6 Workflow 使用示例
package main
import (
"context"
"fmt"
"strings"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
func workflowExample() {
ctx := context.Background()
// 创建工作流
workflow := compose.NewWorkflow[map[string]any, map[string]any]()
// 添加数据预处理节点
preprocessor := workflow.AddLambdaNode("preprocessor", compose.InvokableLambda(func(ctx context.Context, input map[string]any) (map[string]any, error) {
text := input["text"].(string)
return map[string]any{
"cleaned_text": strings.TrimSpace(text),
"word_count": len(strings.Fields(text)),
}, nil
}))
// 添加文本分析节点
analyzer := workflow.AddLambdaNode("analyzer", compose.InvokableLambda(func(ctx context.Context, input map[string]any) (map[string]any, error) {
text := input["text"].(string)
wordCount := input["count"].(int)
return map[string]any{
"analysis": fmt.Sprintf("文本长度: %d 字符, 单词数: %d", len(text), wordCount),
"category": func() string {
if wordCount > 10 {
return "长文本"
}
return "短文本"
}(),
}, nil
})).
AddInput("preprocessor",
compose.NewFieldMapping("cleaned_text", "text"),
compose.NewFieldMapping("word_count", "count"),
).
SetStaticValue(compose.FieldPath{"metadata", "version"}, "1.0")
// 添加结果格式化节点
formatter := workflow.AddLambdaNode("formatter", compose.InvokableLambda(func(ctx context.Context, input map[string]any) (map[string]any, error) {
analysis := input["analysis"].(string)
category := input["category"].(string)
version := input["version"].(string)
return map[string]any{
"result": fmt.Sprintf("[%s] %s (版本: %s)", category, analysis, version),
}, nil
})).
AddInput("analyzer",
compose.NewFieldMapping("analysis", "analysis"),
compose.NewFieldMapping("category", "category"),
compose.NewFieldMapping("metadata.version", "version"),
)
// 设置输出节点
workflow.End().AddInput("formatter")
// 编译并执行
runnable, err := workflow.Compile(ctx)
if err != nil {
panic(err)
}
input := map[string]any{
"text": " This is a sample text for workflow testing with multiple words ",
}
result, err := runnable.Invoke(ctx, input)
if err != nil {
panic(err)
}
fmt.Printf("工作流结果: %v\n", result)
}
// 复杂依赖示例
func complexWorkflowExample() {
ctx := context.Background()
workflow := compose.NewWorkflow[string, string]()
// 输入处理
inputProcessor := workflow.AddLambdaNode("input_processor", compose.InvokableLambda(func(ctx context.Context, input string) (map[string]any, error) {
return map[string]any{
"original": input,
"length": len(input),
}, nil
}))
// 并行处理分支 1: 文本转换
textTransformer := workflow.AddLambdaNode("text_transformer", compose.InvokableLambda(func(ctx context.Context, input map[string]any) (map[string]any, error) {
text := input["text"].(string)
return map[string]any{
"transformed": strings.ToUpper(text),
}, nil
})).AddInput("input_processor", compose.NewFieldMapping("original", "text"))
// 并行处理分支 2: 长度分析
lengthAnalyzer := workflow.AddLambdaNode("length_analyzer", compose.InvokableLambda(func(ctx context.Context, input map[string]any) (map[string]any, error) {
length := input["len"].(int)
return map[string]any{
"analysis": fmt.Sprintf("文本长度: %d", length),
}, nil
})).AddInput("input_processor", compose.NewFieldMapping("length", "len"))
// 结果合并(等待两个分支完成)
merger := workflow.AddLambdaNode("merger", compose.InvokableLambda(func(ctx context.Context, input map[string]any) (string, error) {
transformed := input["transformed_text"].(string)
analysis := input["length_analysis"].(string)
return fmt.Sprintf("%s | %s", transformed, analysis), nil
})).
AddInput("text_transformer", compose.NewFieldMapping("transformed", "transformed_text")).
AddInput("length_analyzer", compose.NewFieldMapping("analysis", "length_analysis"))
// 设置输出
workflow.End().AddInput("merger")
// 编译并执行
runnable, err := workflow.Compile(ctx)
if err != nil {
panic(err)
}
result, err := runnable.Invoke(ctx, "hello world")
if err != nil {
panic(err)
}
fmt.Printf("复杂工作流结果: %s\n", result)
}
7. 编译与执行机制
7.1 编译过程概览
sequenceDiagram
participant User
participant Graph
participant Compiler
participant TypeChecker
participant Optimizer
participant Runtime
User->>Graph: 调用 Compile()
Graph->>Compiler: 开始编译
Compiler->>TypeChecker: 类型验证
TypeChecker->>TypeChecker: 检查节点类型兼容性
TypeChecker->>TypeChecker: 验证字段映射
TypeChecker-->>Compiler: 类型检查结果
Compiler->>Optimizer: 执行优化
Optimizer->>Optimizer: 确定执行模式 (Pregel/DAG)
Optimizer->>Optimizer: 生成执行计划
Optimizer-->>Compiler: 优化后的执行计划
Compiler->>Runtime: 创建运行时
Runtime->>Runtime: 初始化通道管理器
Runtime->>Runtime: 创建任务管理器
Runtime-->>Compiler: 运行时实例
Compiler-->>Graph: 返回 composableRunnable
Graph-->>User: 返回 Runnable
图 7-1: 编译过程序列图
7.2 类型检查机制
// compose/graph.go
func (g *graph) compile(ctx context.Context, opt *graphCompileOptions) (*composableRunnable, error) {
if g.buildError != nil {
return nil, g.buildError
}
// 确定运行类型
runType := runTypePregel
cb := pregelChannelBuilder
if isChain(g.cmp) || isWorkflow(g.cmp) {
if opt != nil && opt.nodeTriggerMode != "" {
return nil, errors.New(fmt.Sprintf("%s doesn't support node trigger mode option", g.cmp))
}
}
if (opt != nil && opt.nodeTriggerMode == AllPredecessor) || isWorkflow(g.cmp) {
runType = runTypeDAG
cb = dagChannelBuilder
}
// 确定执行模式
eager := false
if isWorkflow(g.cmp) || runType == runTypeDAG {
eager = true
}
if opt != nil && opt.eagerDisabled {
eager = false
}
// 验证起始和结束节点
if len(g.startNodes) == 0 {
return nil, errors.New("start node not set")
}
if len(g.endNodes) == 0 {
return nil, errors.New("end node not set")
}
// 类型推导验证
for _, v := range g.toValidateMap {
if len(v) > 0 {
return nil, fmt.Errorf("some node's input or output types cannot be inferred: %v", g.toValidateMap)
}
}
// 字段映射验证
for key := range g.fieldMappingRecords {
toMap := make(map[string]bool)
for _, mapping := range g.fieldMappingRecords[key] {
if _, ok := toMap[mapping.to]; ok {
return nil, fmt.Errorf("duplicate mapping target field: %s of node[%s]", mapping.to, key)
}
toMap[mapping.to] = true
}
// 添加字段映射转换器
g.handlerPreNode[key] = append(g.handlerPreNode[key], g.getNodeGenericHelper(key).inputFieldMappingConverter)
}
// 编译子图和组件
key2SubGraphs := g.beforeChildGraphsCompile(opt)
chanSubscribeTo := make(map[string]*chanCall)
for name, node := range g.nodes {
r, err := node.compileIfNeeded(ctx)
if err != nil {
return nil, err
}
chCall := &chanCall{
action: r,
writeTo: g.dataEdges[name],
controls: g.controlEdges[name],
preProcessor: node.nodeInfo.preProcessor,
postProcessor: node.nodeInfo.postProcessor,
}
// 处理分支
branches := g.branches[name]
if len(branches) > 0 {
chCall.writeToBranches = append(chCall.writeToBranches, branches...)
}
chanSubscribeTo[name] = chCall
}
// 构建前驱关系
dataPredecessors := make(map[string][]string)
controlPredecessors := make(map[string][]string)
// ... 构建前驱关系的详细逻辑
// 创建运行时
return g.createRuntime(chanSubscribeTo, dataPredecessors, controlPredecessors, runType, eager, cb)
}
7.3 执行引擎
7.3.1 任务管理器
// compose/graph_manager.go
type taskManager struct {
runWrapper func(context.Context, *composableRunnable, any, ...Option) (any, error)
runningTasks map[string]*task // 正在运行的任务
done chan *task // 完成的任务通道
num int // 任务计数
needAll bool // 是否需要等待所有任务
cancelCh chan struct{} // 取消通道
opts []Option // 执行选项
}
type task struct {
nodeKey string // 节点键
call *chanCall // 调用信息
input any // 输入数据
output any // 输出数据
err error // 错误信息
ctx context.Context // 上下文
option []Option // 选项
}
// 执行任务
func (t *taskManager) execute(currentTask *task) {
defer func() {
panicInfo := recover()
if panicInfo != nil {
currentTask.output = nil
currentTask.err = safe.NewPanicErr(panicInfo, debug.Stack())
}
t.done.Send(currentTask)
}()
// 初始化节点回调
ctx := initNodeCallbacks(currentTask.ctx, currentTask.nodeKey, currentTask.call.action.nodeInfo, currentTask.call.action.meta, t.opts...)
// 执行任务
currentTask.output, currentTask.err = t.runWrapper(ctx, currentTask.call.action, currentTask.input, currentTask.option...)
}
// 提交任务
func (t *taskManager) submit(tasks []*task) error {
if len(tasks) == 0 {
return nil
}
// 预处理任务
for i := 0; i < len(tasks); i++ {
currentTask := tasks[i]
err := runPreHandler(currentTask, t.runWrapper)
if err != nil {
currentTask.err = err
tasks = append(tasks[:i], tasks[i+1:]...)
i--
t.num++
t.done.Send(currentTask)
}
t.runningTasks[currentTask.nodeKey] = currentTask
}
if len(tasks) == 0 {
return nil
}
// 同步执行优化
var syncTask *task
if t.num == 0 && (len(tasks) == 1 || t.needAll) && t.cancelCh == nil {
syncTask = tasks[0]
tasks = tasks[1:]
}
// 异步执行任务
for _, currentTask := range tasks {
t.num += 1
go t.execute(currentTask)
}
// 同步执行任务
if syncTask != nil {
t.num += 1
t.execute(syncTask)
}
return nil
}
7.3.2 通道管理器
// compose/graph_run.go
type runner struct {
chanSubscribeTo map[string]*chanCall // 通道订阅映射
dataPredecessors map[string][]string // 数据前驱
controlPredecessors map[string][]string // 控制前驱
runType graphRunType // 运行类型
eager bool // 是否急切模式
channelBuilder func(*runner, map[string]FanInMergeConfig) channelManager // 通道构建器
}
// 执行调用
func (r *runner) invoke(ctx context.Context, input any, opts ...Option) (any, error) {
// 创建通道管理器
cm := r.channelBuilder(r, nil)
// 创建任务管理器
tm := &taskManager{
runWrapper: r.runWrapper,
runningTasks: make(map[string]*task),
done: make(chan *task, 100),
opts: opts,
}
// 启动输入
err := cm.startInput(ctx, input)
if err != nil {
return nil, err
}
// 执行主循环
for {
// 获取可执行任务
tasks := cm.getExecutableTasks(ctx)
if len(tasks) == 0 {
break
}
// 提交任务执行
err := tm.submit(tasks)
if err != nil {
return nil, err
}
// 等待任务完成
completedTasks, canceled := tm.wait()
if canceled {
return nil, ctx.Err()
}
// 处理完成的任务
for _, task := range completedTasks {
err := cm.handleTaskResult(task)
if err != nil {
return nil, err
}
}
// 检查是否完成
if cm.isComplete() {
break
}
}
// 获取最终结果
return cm.getFinalResult()
}
7.4 两种执行模式
7.4.1 Pregel 模式
Pregel 模式支持循环和迭代,适用于需要多轮处理的场景:
// Pregel 模式特点:
// 1. 支持循环边
// 2. 节点可以被多次触发
// 3. 使用 AnyPredecessor 触发模式
// 4. 适用于迭代算法和状态机
type pregelChannelManager struct {
channels map[string]chan any // 节点通道
activeNodes map[string]bool // 活跃节点
iteration int // 迭代轮次
maxIteration int // 最大迭代次数
}
func (pcm *pregelChannelManager) getExecutableTasks(ctx context.Context) []*task {
tasks := make([]*task, 0)
for nodeKey, ch := range pcm.channels {
select {
case input := <-ch:
if pcm.activeNodes[nodeKey] {
task := &task{
nodeKey: nodeKey,
input: input,
ctx: ctx,
}
tasks = append(tasks, task)
}
default:
// 通道为空,跳过
}
}
return tasks
}
7.4.2 DAG 模式
DAG 模式确保严格的依赖顺序,不支持循环:
// DAG 模式特点:
// 1. 严格的拓扑排序
// 2. 节点只能被触发一次
// 3. 使用 AllPredecessor 触发模式
// 4. 适用于数据管道和工作流
type dagChannelManager struct {
nodeStates map[string]nodeState // 节点状态
pendingData map[string][]any // 待处理数据
completedSet map[string]bool // 已完成节点集合
}
type nodeState int
const (
nodeWaiting nodeState = iota // 等待前驱
nodeReady // 准备执行
nodeRunning // 正在执行
nodeCompleted // 已完成
)
func (dcm *dagChannelManager) getExecutableTasks(ctx context.Context) []*task {
tasks := make([]*task, 0)
for nodeKey, state := range dcm.nodeStates {
if state == nodeReady {
// 检查所有前驱是否完成
allPredecessorsCompleted := true
for _, pred := range dcm.controlPredecessors[nodeKey] {
if !dcm.completedSet[pred] {
allPredecessorsCompleted = false
break
}
}
if allPredecessorsCompleted {
// 合并所有前驱的数据
input := dcm.mergeInputData(nodeKey)
task := &task{
nodeKey: nodeKey,
input: input,
ctx: ctx,
}
tasks = append(tasks, task)
// 更新状态
dcm.nodeStates[nodeKey] = nodeRunning
}
}
}
return tasks
}
8. Lambda 函数系统
8.1 Lambda 类型定义
// compose/types_lambda.go
// 四种基本 Lambda 类型
type Invoke[I, O, TOption any] func(ctx context.Context, input I, opts ...TOption) (output O, err error)
type Stream[I, O, TOption any] func(ctx context.Context, input I, opts ...TOption) (output *schema.StreamReader[O], err error)
type Collect[I, O, TOption any] func(ctx context.Context, input *schema.StreamReader[I], opts ...TOption) (output O, err error)
type Transform[I, O, TOption any] func(ctx context.Context, input *schema.StreamReader[I], opts ...TOption) (output *schema.StreamReader[O], err error)
// 无选项版本
type InvokeWOOpt[I, O any] func(ctx context.Context, input I) (output O, err error)
type StreamWOOpt[I, O any] func(ctx context.Context, input I) (output *schema.StreamReader[O], err error)
type CollectWOOpt[I, O any] func(ctx context.Context, input *schema.StreamReader[I]) (output O, err error)
type TransformWOOpt[I, O any] func(ctx context.Context, input *schema.StreamReader[I]) (output *schema.StreamReader[O], err error)
// Lambda 结构体
type Lambda struct {
invoke any // Invoke 函数
stream any // Stream 函数
collect any // Collect 函数
transform any // Transform 函数
lambdaType string // Lambda 类型标识
}
8.2 Lambda 构造函数
// 创建 Invoke Lambda
func InvokableLambda[I, O any](fn InvokeWOOpt[I, O], opts ...LambdaOption) *Lambda {
lambda := &Lambda{
invoke: fn,
}
for _, opt := range opts {
opt(lambda)
}
return lambda
}
// 创建 Stream Lambda
func StreamableLambda[I, O any](fn StreamWOOpt[I, O], opts ...LambdaOption) *Lambda {
lambda := &Lambda{
stream: fn,
}
for _, opt := range opts {
opt(lambda)
}
return lambda
}
// 创建 Collect Lambda
func CollectableLambda[I, O any](fn CollectWOOpt[I, O], opts ...LambdaOption) *Lambda {
lambda := &Lambda{
collect: fn,
}
for _, opt := range opts {
opt(lambda)
}
return lambda
}
// 创建 Transform Lambda
func TransformableLambda[I, O any](fn TransformWOOpt[I, O], opts ...LambdaOption) *Lambda {
lambda := &Lambda{
transform: fn,
}
for _, opt := range opts {
opt(lambda)
}
return lambda
}
// Lambda 选项
type LambdaOption func(*Lambda)
func WithLambdaType(lambdaType string) LambdaOption {
return func(l *Lambda) {
l.lambdaType = lambdaType
}
}
8.3 Lambda 使用示例
package main
import (
"context"
"fmt"
"strings"
"time"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
func lambdaExamples() {
ctx := context.Background()
// 1. 简单数据处理 Lambda
textProcessor := compose.InvokableLambda(func(ctx context.Context, input string) (map[string]any, error) {
return map[string]any{
"original": input,
"upper": strings.ToUpper(input),
"lower": strings.ToLower(input),
"word_count": len(strings.Fields(input)),
}, nil
}, compose.WithLambdaType("TextProcessor"))
// 2. 流式数据生成 Lambda
streamGenerator := compose.StreamableLambda(func(ctx context.Context, input string) (*schema.StreamReader[string], error) {
sr, sw := schema.Pipe[string](10)
go func() {
defer sw.Close()
words := strings.Fields(input)
for i, word := range words {
select {
case <-ctx.Done():
return
default:
}
processed := fmt.Sprintf("第%d个词: %s", i+1, word)
if sw.Send(processed, nil) {
return // 流已关闭
}
time.Sleep(100 * time.Millisecond) // 模拟处理延迟
}
}()
return sr, nil
}, compose.WithLambdaType("StreamGenerator"))
// 3. 流式数据收集 Lambda
streamCollector := compose.CollectableLambda(func(ctx context.Context, input *schema.StreamReader[string]) (string, error) {
var results []string
for {
item, err := input.Recv()
if err == io.EOF {
break
}
if err != nil {
return "", err
}
results = append(results, item)
}
return strings.Join(results, " | "), nil
}, compose.WithLambdaType("StreamCollector"))
// 4. 流式数据转换 Lambda
streamTransformer := compose.TransformableLambda(func(ctx context.Context, input *schema.StreamReader[string]) (*schema.StreamReader[string], error) {
sr, sw := schema.Pipe[string](10)
go func() {
defer sw.Close()
for {
item, err := input.Recv()
if err == io.EOF {
break
}
if err != nil {
sw.Send("", err)
return
}
transformed := fmt.Sprintf("转换: %s", strings.ToUpper(item))
if sw.Send(transformed, nil) {
return
}
}
}()
return sr, nil
}, compose.WithLambdaType("StreamTransformer"))
// 构建处理链
chain := compose.NewChain[string, string]().
AppendLambda("processor", textProcessor).
AppendLambda("generator", streamGenerator).
AppendLambda("transformer", streamTransformer).
AppendLambda("collector", streamCollector)
// 编译并执行
runnable, err := chain.Compile(ctx)
if err != nil {
panic(err)
}
result, err := runnable.Invoke(ctx, "Hello World Lambda Functions")
if err != nil {
panic(err)
}
fmt.Printf("Lambda 链处理结果: %s\n", result)
}
// 条件处理 Lambda
func conditionalLambdaExample() {
ctx := context.Background()
// 条件路由 Lambda
router := compose.InvokableLambda(func(ctx context.Context, input map[string]any) (string, error) {
text := input["text"].(string)
length := len(text)
if length > 20 {
return "long_text_processor"
} else if length > 10 {
return "medium_text_processor"
}
return "short_text_processor"
})
// 不同长度的处理器
longProcessor := compose.InvokableLambda(func(ctx context.Context, input map[string]any) (string, error) {
text := input["text"].(string)
return fmt.Sprintf("长文本处理: %s (截取前50字符)", text[:min(50, len(text))]), nil
})
mediumProcessor := compose.InvokableLambda(func(ctx context.Context, input map[string]any) (string, error) {
text := input["text"].(string)
return fmt.Sprintf("中等文本处理: %s", strings.ToUpper(text)), nil
})
shortProcessor := compose.InvokableLambda(func(ctx context.Context, input map[string]any) (string, error) {
text := input["text"].(string)
return fmt.Sprintf("短文本处理: %s", strings.Repeat(text, 2)), nil
})
// 构建图
graph := compose.NewGraph[map[string]any, string]()
// 添加节点
graph.AddLambdaNode("router", router)
graph.AddLambdaNode("long_text_processor", longProcessor)
graph.AddLambdaNode("medium_text_processor", mediumProcessor)
graph.AddLambdaNode("short_text_processor", shortProcessor)
// 添加条件分支
branch := compose.NewGraphBranch(func(ctx context.Context, input any) (string, error) {
return input.(string), nil // 直接使用路由结果
}).
AddTarget("long_text_processor").
AddTarget("medium_text_processor").
AddTarget("short_text_processor")
graph.AddBranch("router", branch)
// 编译并测试
runnable, err := graph.Compile(ctx)
if err != nil {
panic(err)
}
// 测试不同长度的文本
testCases := []map[string]any{
{"text": "短"},
{"text": "这是一个中等长度的文本"},
{"text": "这是一个非常长的文本,用于测试长文本处理逻辑,包含很多字符和内容"},
}
for i, testCase := range testCases {
result, err := runnable.Invoke(ctx, testCase)
if err != nil {
panic(err)
}
fmt.Printf("测试案例 %d 结果: %s\n", i+1, result)
}
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
9. 高级特性
9.1 状态管理
// 状态管理示例
func stateManagementExample() {
ctx := context.Background()
// 定义状态类型
type AppState struct {
Counter int `json:"counter"`
History []string `json:"history"`
Data map[string]any `json:"data"`
}
// 创建带状态的图
graph := compose.NewGraph[string, map[string]any](
compose.WithState(func() any {
return &AppState{
Counter: 0,
History: make([]string, 0),
Data: make(map[string]any),
}
}),
)
// 状态更新 Lambda
stateUpdater := compose.InvokableLambda(func(ctx context.Context, input map[string]any) (map[string]any, error) {
// 获取状态
state := compose.GetState(ctx).(*AppState)
// 更新状态
state.Counter++
state.History = append(state.History, input["message"].(string))
state.Data["last_update"] = time.Now()
return map[string]any{
"counter": state.Counter,
"message": fmt.Sprintf("处理第 %d 条消息: %s", state.Counter, input["message"]),
}, nil
})
graph.AddLambdaNode("updater", stateUpdater)
// 编译并测试
runnable, err := graph.Compile(ctx)
if err != nil {
panic(err)
}
// 多次调用,观察状态变化
for i := 1; i <= 3; i++ {
input := map[string]any{
"message": fmt.Sprintf("消息 %d", i),
}
result, err := runnable.Invoke(ctx, input)
if err != nil {
panic(err)
}
fmt.Printf("调用 %d 结果: %v\n", i, result)
}
}
9.2 错误处理与重试
// 错误处理示例
func errorHandlingExample() {
ctx := context.Background()
// 可能失败的处理器
unreliableProcessor := compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
// 模拟随机失败
if rand.Float32() < 0.3 {
return "", fmt.Errorf("随机处理失败: %s", input)
}
return fmt.Sprintf("成功处理: %s", input), nil
})
// 错误恢复处理器
errorRecovery := compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
return fmt.Sprintf("错误恢复处理: %s", input), nil
})
// 重试包装器
retryWrapper := func(maxRetries int, processor *compose.Lambda) *compose.Lambda {
return compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
var lastErr error
for i := 0; i < maxRetries; i++ {
// 编译处理器
runnable, err := compose.NewChain[string, string]().
AppendLambda("processor", processor).
Compile(ctx)
if err != nil {
return "", err
}
// 尝试执行
result, err := runnable.Invoke(ctx, input)
if err == nil {
return result, nil
}
lastErr = err
fmt.Printf("重试 %d/%d 失败: %v\n", i+1, maxRetries, err)
// 等待后重试
time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
}
return "", fmt.Errorf("重试 %d 次后仍然失败: %w", maxRetries, lastErr)
})
}
// 构建带重试的链
chain := compose.NewChain[string, string]().
AppendLambda("retry_processor", retryWrapper(3, unreliableProcessor))
// 编译并测试
runnable, err := chain.Compile(ctx)
if err != nil {
panic(err)
}
// 测试多次
for i := 1; i <= 5; i++ {
input := fmt.Sprintf("测试数据 %d", i)
result, err := runnable.Invoke(ctx, input)
if err != nil {
fmt.Printf("最终失败 %d: %v\n", i, err)
} else {
fmt.Printf("成功 %d: %s\n", i, result)
}
}
}
9.3 性能监控
// 性能监控示例
func performanceMonitoringExample() {
ctx := context.Background()
// 性能监控包装器
monitoringWrapper := func(name string, processor *compose.Lambda) *compose.Lambda {
return compose.InvokableLambda(func(ctx context.Context, input any) (any, error) {
startTime := time.Now()
// 编译并执行原始处理器
runnable, err := compose.NewChain[any, any]().
AppendLambda("processor", processor).
Compile(ctx)
if err != nil {
return nil, err
}
result, err := runnable.Invoke(ctx, input)
// 记录性能指标
duration := time.Since(startTime)
fmt.Printf("[监控] %s 执行时间: %v, 成功: %t\n", name, duration, err == nil)
return result, err
})
}
// 创建被监控的处理器
slowProcessor := compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
// 模拟慢处理
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
return fmt.Sprintf("慢处理结果: %s", input), nil
})
fastProcessor := compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
// 模拟快处理
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
return fmt.Sprintf("快处理结果: %s", input), nil
})
// 构建监控链
chain := compose.NewChain[string, string]().
AppendLambda("slow_monitor", monitoringWrapper("SlowProcessor", slowProcessor)).
AppendLambda("fast_monitor", monitoringWrapper("FastProcessor", fastProcessor))
// 编译并测试
runnable, err := chain.Compile(ctx)
if err != nil {
panic(err)
}
// 执行测试
for i := 1; i <= 3; i++ {
input := fmt.Sprintf("测试 %d", i)
fmt.Printf("\n=== 执行 %d ===\n", i)
result, err := runnable.Invoke(ctx, input)
if err != nil {
fmt.Printf("执行失败: %v\n", err)
} else {
fmt.Printf("最终结果: %s\n", result)
}
}
}
10. 总结
Compose 模块是 Eino 框架的编排引擎核心,提供了强大而灵活的组件编排能力。其主要特点包括:
10.1 设计优势
- 统一抽象: 通过
Runnable
接口统一所有编排结果,简化使用复杂度 - 类型安全: 利用 Go 泛型在编译时确保类型一致性和安全性
- 多种模式: 提供 Chain、Graph、Workflow 三种编排模式,满足不同复杂度需求
- 流式优先: 原生支持四种数据流模式,适应各种数据处理场景
- 高性能执行: 基于通道的并发执行引擎,支持 Pregel 和 DAG 两种执行模式
10.2 核心组件
- Chain: 线性编排,适用于简单的顺序处理流程
- Graph: 图式编排,支持复杂的分支、合并、循环逻辑
- Workflow: 声明式编排,强调依赖关系和字段映射的清晰表达
- Lambda: 自定义函数组件,提供灵活的业务逻辑扩展能力
- Runnable: 统一执行接口,支持四种数据流模式的自动适配
10.3 技术特性
- 编译优化: 智能的类型推导、字段映射验证和执行计划优化
- 并发执行: 基于任务管理器和通道管理器的高效并发执行
- 错误处理: 完善的错误传播和恢复机制
- 状态管理: 支持有状态的图执行,适用于复杂的业务场景
- 可扩展性: 支持自定义组件、Lambda 函数和子图嵌套
通过 Compose 模块,开发者可以构建从简单的数据处理管道到复杂的多模态 AI 应用的各种编排逻辑,充分发挥 Eino 框架的编排能力。
上一篇: Components模块详解 | 下一篇: ADK模块详解
更新时间: 2024-12-19 | 文档版本: v1.0