1. 概述
Callbacks 模块是 Eino 框架的回调系统,提供了在组件执行的不同阶段注入自定义逻辑的能力。该模块特别适用于实现治理功能,如日志记录、监控、指标收集、性能分析和错误处理等横切关注点。
Callbacks 模块的核心设计理念包括:
- 切面编程: 通过回调机制实现横切关注点的分离
- 生命周期管理: 在组件执行的各个阶段提供钩子
- 类型安全: 为不同组件类型提供类型安全的回调接口
- 灵活配置: 支持全局和局部回调处理器的配置
- 流式支持: 原生支持流式输入输出的回调处理
2. 核心架构设计
2.1 整体架构图
graph TB
subgraph "回调接口层 (Callback Interface Layer)"
Handler[Handler Interface]
TimingChecker[TimingChecker Interface]
RunInfo[RunInfo Structure]
end
subgraph "回调时机层 (Callback Timing Layer)"
OnStart[OnStart]
OnEnd[OnEnd]
OnError[OnError]
OnStartWithStreamInput[OnStartWithStreamInput]
OnEndWithStreamOutput[OnEndWithStreamOutput]
end
subgraph "构建器层 (Builder Layer)"
HandlerBuilder[HandlerBuilder]
HandlerHelper[HandlerHelper]
ComponentHandlers[Component-Specific Handlers]
end
subgraph "管理层 (Management Layer)"
GlobalHandlers[Global Handlers]
LocalHandlers[Local Handlers]
CallbackManager[Callback Manager]
end
subgraph "组件集成层 (Component Integration Layer)"
ModelCallbacks[Model Callbacks]
EmbeddingCallbacks[Embedding Callbacks]
RetrieverCallbacks[Retriever Callbacks]
ToolCallbacks[Tool Callbacks]
GraphCallbacks[Graph Callbacks]
end
Handler --> OnStart
Handler --> OnEnd
Handler --> OnError
Handler --> OnStartWithStreamInput
Handler --> OnEndWithStreamOutput
HandlerBuilder --> Handler
HandlerHelper --> Handler
ComponentHandlers --> Handler
GlobalHandlers --> CallbackManager
LocalHandlers --> CallbackManager
CallbackManager --> ModelCallbacks
CallbackManager --> EmbeddingCallbacks
CallbackManager --> RetrieverCallbacks
CallbackManager --> ToolCallbacks
CallbackManager --> GraphCallbacks
TimingChecker --> Handler
RunInfo --> Handler
style Handler fill:#e8f5e8
style OnStart fill:#fff3e0
style HandlerBuilder fill:#f3e5f5
style CallbackManager fill:#e3f2fd
图 2-1: Callbacks 模块整体架构图
图介绍: 该架构图展示了 Callbacks 模块的五层架构:
- 回调接口层: 定义了回调处理器的核心接口和运行时信息结构
- 回调时机层: 定义了组件执行生命周期中的五个关键时机
- 构建器层: 提供了多种方式来构建回调处理器,包括通用构建器和组件特定构建器
- 管理层: 负责回调处理器的注册、管理和执行调度
- 组件集成层: 为各种组件类型提供具体的回调集成实现
2.2 核心接口定义
// callbacks/interface.go
// RunInfo 包含正在运行的组件信息
type RunInfo struct {
ComponentType string // 组件类型
ComponentName string // 组件名称
NodeKey string // 节点键
Extra map[string]any // 额外信息
}
// CallbackInput 回调输入
// 输入类型由组件定义
// 使用类型断言或转换函数将输入转换为正确的类型
type CallbackInput interface{}
// CallbackOutput 回调输出
type CallbackOutput interface{}
// Handler 回调处理器接口
type Handler interface {
OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
OnError(ctx context.Context, info *RunInfo, err error) context.Context
OnStartWithStreamInput(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context
OnEndWithStreamOutput(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context
}
// TimingChecker 检查处理器是否需要特定回调时机
// 建议回调处理器实现此接口,但不是强制的
// 如果使用 HandlerHelper 或 HandlerBuilder 创建回调处理器,则自动实现此接口
type TimingChecker interface {
NeedTiming(timing CallbackTiming) bool
}
// CallbackTiming 回调时机枚举
type CallbackTiming int
const (
TimingOnStart CallbackTiming = iota // 组件开始执行
TimingOnEnd // 组件执行结束
TimingOnError // 组件执行出错
TimingOnStartWithStreamInput // 组件开始执行(流式输入)
TimingOnEndWithStreamOutput // 组件执行结束(流式输出)
)
3. 回调处理器构建
3.1 HandlerBuilder 方式
HandlerBuilder
提供了一种通用的方式来构建回调处理器:
// callbacks/handler_builder.go
// HandlerBuilder 回调处理器构建器
type HandlerBuilder struct {
onStartFn func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
onEndFn func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
onErrorFn func(ctx context.Context, info *RunInfo, err error) context.Context
onStartWithStreamInputFn func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context
onEndWithStreamOutputFn func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context
}
// NewHandlerBuilder 创建新的处理器构建器
func NewHandlerBuilder() *HandlerBuilder {
return &HandlerBuilder{}
}
// OnStart 设置开始执行回调
func (hb *HandlerBuilder) OnStart(fn func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context) *HandlerBuilder {
hb.onStartFn = fn
return hb
}
// OnEnd 设置执行结束回调
func (hb *HandlerBuilder) OnEnd(fn func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context) *HandlerBuilder {
hb.onEndFn = fn
return hb
}
// OnError 设置执行错误回调
func (hb *HandlerBuilder) OnError(fn func(ctx context.Context, info *RunInfo, err error) context.Context) *HandlerBuilder {
hb.onErrorFn = fn
return hb
}
// OnStartWithStreamInput 设置流式输入开始回调
func (hb *HandlerBuilder) OnStartWithStreamInput(fn func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context) *HandlerBuilder {
hb.onStartWithStreamInputFn = fn
return hb
}
// OnEndWithStreamOutput 设置流式输出结束回调
func (hb *HandlerBuilder) OnEndWithStreamOutput(fn func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context) *HandlerBuilder {
hb.onEndWithStreamOutputFn = fn
return hb
}
// Build 构建回调处理器
func (hb *HandlerBuilder) Build() Handler {
return &handlerImpl{
HandlerBuilder: *hb,
}
}
// handlerImpl 处理器实现
type handlerImpl struct {
HandlerBuilder
}
func (hi *handlerImpl) OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {
if hi.onStartFn != nil {
return hi.onStartFn(ctx, info, input)
}
return ctx
}
func (hi *handlerImpl) OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context {
if hi.onEndFn != nil {
return hi.onEndFn(ctx, info, output)
}
return ctx
}
func (hi *handlerImpl) OnError(ctx context.Context, info *RunInfo, err error) context.Context {
if hi.onErrorFn != nil {
return hi.onErrorFn(ctx, info, err)
}
return ctx
}
func (hi *handlerImpl) OnStartWithStreamInput(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context {
if hi.onStartWithStreamInputFn != nil {
return hi.onStartWithStreamInputFn(ctx, info, input)
}
return ctx
}
func (hi *handlerImpl) OnEndWithStreamOutput(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context {
if hi.onEndWithStreamOutputFn != nil {
return hi.onEndWithStreamOutputFn(ctx, info, output)
}
return ctx
}
// 实现 TimingChecker 接口
func (hi *handlerImpl) NeedTiming(timing CallbackTiming) bool {
switch timing {
case TimingOnStart:
return hi.onStartFn != nil
case TimingOnEnd:
return hi.onEndFn != nil
case TimingOnError:
return hi.onErrorFn != nil
case TimingOnStartWithStreamInput:
return hi.onStartWithStreamInputFn != nil
case TimingOnEndWithStreamOutput:
return hi.onEndWithStreamOutputFn != nil
default:
return false
}
}
3.2 HandlerBuilder 使用示例
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
func handlerBuilderExample() {
// 创建通用回调处理器
handler := callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
log.Printf("[%s] 组件开始执行: %s", info.ComponentType, info.ComponentName)
// 根据组件类型处理不同的输入
switch info.ComponentType {
case "ChatModel":
if modelInput := model.ConvCallbackInput(input); modelInput != nil {
log.Printf("模型输入消息数: %d", len(modelInput.Messages))
}
case "Embedding":
if embeddingInput := embedding.ConvCallbackInput(input); embeddingInput != nil {
log.Printf("嵌入输入文本数: %d", len(embeddingInput.Texts))
}
}
// 在上下文中存储开始时间
return context.WithValue(ctx, "start_time", time.Now())
}).
OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
// 计算执行时间
if startTime, ok := ctx.Value("start_time").(time.Time); ok {
duration := time.Since(startTime)
log.Printf("[%s] 组件执行完成: %s, 耗时: %v", info.ComponentType, info.ComponentName, duration)
}
// 根据组件类型处理不同的输出
switch info.ComponentType {
case "ChatModel":
if modelOutput := model.ConvCallbackOutput(output); modelOutput != nil {
log.Printf("模型输出内容长度: %d", len(modelOutput.Message.Content))
if modelOutput.TokenUsage != nil {
log.Printf("Token使用: 输入=%d, 输出=%d, 总计=%d",
modelOutput.TokenUsage.PromptTokens,
modelOutput.TokenUsage.CompletionTokens,
modelOutput.TokenUsage.TotalTokens)
}
}
}
return ctx
}).
OnError(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
log.Printf("[%s] 组件执行错误: %s, 错误: %v", info.ComponentType, info.ComponentName, err)
// 记录错误指标
recordErrorMetric(info.ComponentType, info.ComponentName, err)
return ctx
}).
OnStartWithStreamInput(func(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
log.Printf("[%s] 组件开始流式输入处理: %s", info.ComponentType, info.ComponentName)
return ctx
}).
OnEndWithStreamOutput(func(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
log.Printf("[%s] 组件流式输出完成: %s", info.ComponentType, info.ComponentName)
return ctx
}).
Build()
// 使用处理器
useCallbackHandler(handler)
}
func recordErrorMetric(componentType, componentName string, err error) {
// 实现错误指标记录逻辑
fmt.Printf("记录错误指标: %s.%s -> %v\n", componentType, componentName, err)
}
func useCallbackHandler(handler callbacks.Handler) {
ctx := context.Background()
// 创建模拟聊天模型
chatModel := &MockChatModel{}
// 创建链并使用回调
chain := compose.NewChain[[]schema.Message, *schema.Message]().
AppendChatModel("chat_model", chatModel)
runnable, err := chain.Compile(ctx)
if err != nil {
panic(err)
}
// 执行时使用回调
input := []*schema.Message{
schema.UserMessage("Hello, how are you?"),
}
result, err := runnable.Invoke(ctx, input, compose.WithCallbacks(handler))
if err != nil {
log.Printf("执行失败: %v", err)
} else {
log.Printf("执行结果: %s", result.Content)
}
}
3.3 组件特定回调处理器
每个组件类型都可以定义自己的回调处理器结构,提供类型安全的回调接口:
// 模型组件回调处理器示例
type ModelCallbackHandler struct {
OnStart func(ctx context.Context, info *callbacks.RunInfo, input *model.CallbackInput) context.Context
OnEnd func(ctx context.Context, info *callbacks.RunInfo, output *model.CallbackOutput) context.Context
OnError func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context
}
// 实现 Handler 接口
func (mch *ModelCallbackHandler) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
if mch.OnStart != nil {
if modelInput := model.ConvCallbackInput(input); modelInput != nil {
return mch.OnStart(ctx, info, modelInput)
}
}
return ctx
}
func (mch *ModelCallbackHandler) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
if mch.OnEnd != nil {
if modelOutput := model.ConvCallbackOutput(output); modelOutput != nil {
return mch.OnEnd(ctx, info, modelOutput)
}
}
return ctx
}
func (mch *ModelCallbackHandler) OnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
if mch.OnError != nil {
return mch.OnError(ctx, info, err)
}
return ctx
}
// 其他方法使用默认实现
func (mch *ModelCallbackHandler) OnStartWithStreamInput(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
return ctx
}
func (mch *ModelCallbackHandler) OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
return ctx
}
// 实现 TimingChecker 接口
func (mch *ModelCallbackHandler) NeedTiming(timing callbacks.CallbackTiming) bool {
switch timing {
case callbacks.TimingOnStart:
return mch.OnStart != nil
case callbacks.TimingOnEnd:
return mch.OnEnd != nil
case callbacks.TimingOnError:
return mch.OnError != nil
default:
return false
}
}
4. 回调注入机制
4.1 切面注入
Callbacks 模块提供了便捷的切面注入函数,允许组件开发者在关键点注入回调:
// callbacks/aspect_inject.go
// OnStart 为特定上下文调用 OnStart 逻辑,确保在进程开始时
// 以相反顺序(相对于添加顺序)执行所有注册的处理器
func OnStart[T any](ctx context.Context, input T) context.Context {
ctx, _ = callbacks.On(ctx, input, callbacks.OnStartHandle[T], TimingOnStart, true)
return ctx
}
// OnEnd 为特定上下文调用 OnEnd 逻辑,允许在进程结束时进行适当的清理和最终化
// 处理器以正常顺序(相对于添加顺序)执行
func OnEnd[T any](ctx context.Context, output T) context.Context {
ctx, _ = callbacks.On(ctx, output, callbacks.OnEndHandle[T], TimingOnEnd, false)
return ctx
}
// OnError 为特定上下文调用 OnError 逻辑
func OnError(ctx context.Context, err error) context.Context {
ctx, _ = callbacks.On(ctx, err, callbacks.OnErrorHandle, TimingOnError, false)
return ctx
}
// OnStartWithStreamInput 为流式输入调用 OnStart 逻辑
func OnStartWithStreamInput[T any](ctx context.Context, input *schema.StreamReader[T]) context.Context {
ctx, _ = callbacks.On(ctx, input, callbacks.OnStartWithStreamInputHandle[T], TimingOnStartWithStreamInput, true)
return ctx
}
// OnEndWithStreamOutput 为流式输出调用 OnEnd 逻辑
func OnEndWithStreamOutput[T any](ctx context.Context, output *schema.StreamReader[T]) context.Context {
ctx, _ = callbacks.On(ctx, output, callbacks.OnEndWithStreamOutputHandle[T], TimingOnEndWithStreamOutput, false)
return ctx
}
4.2 组件中的回调使用示例
// 组件开发者如何在自定义组件中使用回调
type CustomChatModel struct {
baseModel model.BaseChatModel
}
func (c *CustomChatModel) Generate(ctx context.Context, input []*schema.Message, opts ...model.Option) (resp *schema.Message, err error) {
// 错误处理
defer func() {
if err != nil {
callbacks.OnError(ctx, err)
}
}()
// 开始回调
ctx = callbacks.OnStart(ctx, &model.CallbackInput{
Messages: input,
Config: extractConfig(opts...),
Extra: nil,
})
// 执行实际逻辑
resp, err = c.baseModel.Generate(ctx, input, opts...)
if err != nil {
return nil, err
}
// 结束回调
ctx = callbacks.OnEnd(ctx, &model.CallbackOutput{
Message: resp,
TokenUsage: extractTokenUsage(resp),
Extra: nil,
})
return resp, nil
}
func (c *CustomChatModel) Stream(ctx context.Context, input []*schema.Message, opts ...model.Option) (*schema.StreamReader[*schema.Message], error) {
// 流式输入开始回调
inputStream := schema.StreamReaderFromArray([]*model.CallbackInput{
{
Messages: input,
Config: extractConfig(opts...),
Extra: nil,
},
})
ctx = callbacks.OnStartWithStreamInput(ctx, inputStream)
// 执行流式生成
outputStream, err := c.baseModel.Stream(ctx, input, opts...)
if err != nil {
callbacks.OnError(ctx, err)
return nil, err
}
// 包装输出流以添加结束回调
wrappedStream := wrapStreamWithCallback(ctx, outputStream)
return wrappedStream, nil
}
func wrapStreamWithCallback(ctx context.Context, originalStream *schema.StreamReader[*schema.Message]) *schema.StreamReader[*schema.Message] {
sr, sw := schema.Pipe[*schema.Message](10)
go func() {
defer sw.Close()
defer func() {
// 流式输出结束回调
outputCallbackStream := schema.StreamReaderFromArray([]*model.CallbackOutput{
{
Message: nil, // 流式输出没有单个消息
Extra: map[string]any{"stream_completed": true},
},
})
callbacks.OnEndWithStreamOutput(ctx, outputCallbackStream)
}()
for {
msg, err := originalStream.Recv()
if err == io.EOF {
break
}
if err != nil {
callbacks.OnError(ctx, err)
sw.Send(nil, err)
return
}
if sw.Send(msg, nil) {
return // 流已关闭
}
}
}()
return sr
}
5. 全局与局部回调管理
5.1 全局回调处理器
// 全局回调处理器管理
var globalHandlers []callbacks.Handler
// AppendGlobalHandlers 添加全局回调处理器
// 全局回调处理器将在所有节点的用户特定处理器之前执行
// 注意:此函数不是线程安全的,应仅在进程初始化期间调用
func AppendGlobalHandlers(handlers ...callbacks.Handler) {
globalHandlers = append(globalHandlers, handlers...)
}
// 全局回调处理器示例
func setupGlobalCallbacks() {
// 性能监控处理器
performanceHandler := callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
// 记录开始时间
return context.WithValue(ctx, "perf_start_time", time.Now())
}).
OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
// 记录性能指标
if startTime, ok := ctx.Value("perf_start_time").(time.Time); ok {
duration := time.Since(startTime)
recordPerformanceMetric(info.ComponentType, info.ComponentName, duration)
}
return ctx
}).
Build()
// 日志处理器
loggingHandler := callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
log.Printf("开始执行组件: %s.%s", info.ComponentType, info.ComponentName)
return ctx
}).
OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
log.Printf("完成执行组件: %s.%s", info.ComponentType, info.ComponentName)
return ctx
}).
OnError(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
log.Printf("组件执行错误: %s.%s, 错误: %v", info.ComponentType, info.ComponentName, err)
return ctx
}).
Build()
// 错误统计处理器
errorStatsHandler := callbacks.NewHandlerBuilder().
OnError(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
incrementErrorCounter(info.ComponentType, info.ComponentName, err)
return ctx
}).
Build()
// 添加全局处理器
callbacks.AppendGlobalHandlers(performanceHandler, loggingHandler, errorStatsHandler)
}
func recordPerformanceMetric(componentType, componentName string, duration time.Duration) {
// 实现性能指标记录
fmt.Printf("性能指标: %s.%s 执行时间 %v\n", componentType, componentName, duration)
}
func incrementErrorCounter(componentType, componentName string, err error) {
// 实现错误计数
fmt.Printf("错误统计: %s.%s 错误计数 +1, 错误类型: %T\n", componentType, componentName, err)
}
5.2 局部回调处理器
// 局部回调处理器示例
func localCallbackExample() {
ctx := context.Background()
// 创建特定于任务的回调处理器
taskSpecificHandler := callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
// 任务特定的开始逻辑
log.Printf("任务开始: %s", info.NodeKey)
return context.WithValue(ctx, "task_id", generateTaskID())
}).
OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
// 任务特定的结束逻辑
if taskID, ok := ctx.Value("task_id").(string); ok {
log.Printf("任务完成: %s, 任务ID: %s", info.NodeKey, taskID)
}
return ctx
}).
Build()
// 创建调试回调处理器
debugHandler := callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
log.Printf("DEBUG: 输入数据: %+v", input)
return ctx
}).
OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
log.Printf("DEBUG: 输出数据: %+v", output)
return ctx
}).
Build()
// 创建链
chain := compose.NewChain[string, string]().
AppendLambda("processor", compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
return "processed: " + input, nil
}))
runnable, err := chain.Compile(ctx)
if err != nil {
panic(err)
}
// 使用局部回调处理器
result, err := runnable.Invoke(ctx, "test input",
compose.WithCallbacks(taskSpecificHandler, debugHandler))
if err != nil {
log.Printf("执行失败: %v", err)
} else {
log.Printf("执行结果: %s", result)
}
}
func generateTaskID() string {
return fmt.Sprintf("task_%d", time.Now().UnixNano())
}
6. 流式回调处理
6.1 流式输入回调
// 流式输入回调处理示例
func streamInputCallbackExample() {
handler := callbacks.NewHandlerBuilder().
OnStartWithStreamInput(func(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
log.Printf("开始处理流式输入: %s.%s", info.ComponentType, info.ComponentName)
// 在后台监控输入流
go func() {
defer input.Close()
itemCount := 0
for {
item, err := input.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("流式输入错误: %v", err)
break
}
itemCount++
log.Printf("接收到输入项 %d: %T", itemCount, item)
}
log.Printf("流式输入完成,总计 %d 项", itemCount)
}()
return ctx
}).
Build()
// 使用流式输入回调
useStreamInputCallback(handler)
}
func useStreamInputCallback(handler callbacks.Handler) {
ctx := context.Background()
// 创建流式处理链
chain := compose.NewChain[*schema.StreamReader[string], string]().
AppendLambda("stream_collector", compose.CollectableLambda(func(ctx context.Context, input *schema.StreamReader[string]) (string, error) {
var items []string
for {
item, err := input.Recv()
if err == io.EOF {
break
}
if err != nil {
return "", err
}
items = append(items, item)
}
return strings.Join(items, ", "), nil
}))
runnable, err := chain.Compile(ctx)
if err != nil {
panic(err)
}
// 创建输入流
inputStream := schema.StreamReaderFromArray([]string{"item1", "item2", "item3"})
// 执行时使用回调
result, err := runnable.Collect(ctx, inputStream, compose.WithCallbacks(handler))
if err != nil {
log.Printf("执行失败: %v", err)
} else {
log.Printf("收集结果: %s", result)
}
}
6.2 流式输出回调
// 流式输出回调处理示例
func streamOutputCallbackExample() {
handler := callbacks.NewHandlerBuilder().
OnEndWithStreamOutput(func(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
log.Printf("开始处理流式输出: %s.%s", info.ComponentType, info.ComponentName)
// 在后台监控输出流
go func() {
defer output.Close()
itemCount := 0
for {
item, err := output.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("流式输出错误: %v", err)
break
}
itemCount++
log.Printf("输出项 %d: %T", itemCount, item)
}
log.Printf("流式输出完成,总计 %d 项", itemCount)
}()
return ctx
}).
Build()
// 使用流式输出回调
useStreamOutputCallback(handler)
}
func useStreamOutputCallback(handler callbacks.Handler) {
ctx := context.Background()
// 创建流式生成链
chain := compose.NewChain[string, *schema.StreamReader[string]]().
AppendLambda("stream_generator", compose.StreamableLambda(func(ctx context.Context, input string) (*schema.StreamReader[string], error) {
sr, sw := schema.Pipe[string](10)
go func() {
defer sw.Close()
words := strings.Fields(input)
for i, word := range words {
processed := fmt.Sprintf("item_%d_%s", i+1, word)
if sw.Send(processed, nil) {
return
}
time.Sleep(100 * time.Millisecond)
}
}()
return sr, nil
}))
runnable, err := chain.Compile(ctx)
if err != nil {
panic(err)
}
// 执行时使用回调
outputStream, err := runnable.Stream(ctx, "hello world test", compose.WithCallbacks(handler))
if err != nil {
log.Printf("执行失败: %v", err)
return
}
defer outputStream.Close()
// 消费输出流
for {
item, err := outputStream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("接收错误: %v", err)
break
}
log.Printf("接收到输出: %s", item)
}
}
7. 高级回调模式
7.1 条件回调
// 条件回调处理器
type ConditionalHandler struct {
condition func(ctx context.Context, info *callbacks.RunInfo) bool
handler callbacks.Handler
}
func NewConditionalHandler(condition func(ctx context.Context, info *callbacks.RunInfo) bool, handler callbacks.Handler) *ConditionalHandler {
return &ConditionalHandler{
condition: condition,
handler: handler,
}
}
func (ch *ConditionalHandler) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
if ch.condition(ctx, info) {
return ch.handler.OnStart(ctx, info, input)
}
return ctx
}
func (ch *ConditionalHandler) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
if ch.condition(ctx, info) {
return ch.handler.OnEnd(ctx, info, output)
}
return ctx
}
func (ch *ConditionalHandler) OnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
if ch.condition(ctx, info) {
return ch.handler.OnError(ctx, info, err)
}
return ctx
}
func (ch *ConditionalHandler) OnStartWithStreamInput(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
if ch.condition(ctx, info) {
return ch.handler.OnStartWithStreamInput(ctx, info, input)
}
return ctx
}
func (ch *ConditionalHandler) OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
if ch.condition(ctx, info) {
return ch.handler.OnEndWithStreamOutput(ctx, info, output)
}
return ctx
}
func (ch *ConditionalHandler) NeedTiming(timing callbacks.CallbackTiming) bool {
if checker, ok := ch.handler.(callbacks.TimingChecker); ok {
return checker.NeedTiming(timing)
}
return true // 保守估计
}
// 条件回调使用示例
func conditionalCallbackExample() {
// 只对模型组件生效的回调
modelOnlyHandler := NewConditionalHandler(
func(ctx context.Context, info *callbacks.RunInfo) bool {
return info.ComponentType == "ChatModel"
},
callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
log.Printf("模型组件开始执行: %s", info.ComponentName)
return ctx
}).
Build(),
)
// 只在调试模式下生效的回调
debugModeHandler := NewConditionalHandler(
func(ctx context.Context, info *callbacks.RunInfo) bool {
return ctx.Value("debug_mode") == true
},
callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
log.Printf("DEBUG: 组件 %s 开始执行,输入: %+v", info.ComponentName, input)
return ctx
}).
OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
log.Printf("DEBUG: 组件 %s 执行完成,输出: %+v", info.ComponentName, output)
return ctx
}).
Build(),
)
// 使用条件回调
ctx := context.WithValue(context.Background(), "debug_mode", true)
// ... 使用 modelOnlyHandler 和 debugModeHandler
}
7.2 回调链
// 回调链处理器
type ChainHandler struct {
handlers []callbacks.Handler
}
func NewChainHandler(handlers ...callbacks.Handler) *ChainHandler {
return &ChainHandler{handlers: handlers}
}
func (ch *ChainHandler) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
for i := len(ch.handlers) - 1; i >= 0; i-- { // 反向执行
ctx = ch.handlers[i].OnStart(ctx, info, input)
}
return ctx
}
func (ch *ChainHandler) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
for _, handler := range ch.handlers { // 正向执行
ctx = handler.OnEnd(ctx, info, output)
}
return ctx
}
func (ch *ChainHandler) OnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
for _, handler := range ch.handlers {
ctx = handler.OnError(ctx, info, err)
}
return ctx
}
func (ch *ChainHandler) OnStartWithStreamInput(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
for i := len(ch.handlers) - 1; i >= 0; i-- {
ctx = ch.handlers[i].OnStartWithStreamInput(ctx, info, input)
}
return ctx
}
func (ch *ChainHandler) OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
for _, handler := range ch.handlers {
ctx = handler.OnEndWithStreamOutput(ctx, info, output)
}
return ctx
}
func (ch *ChainHandler) NeedTiming(timing callbacks.CallbackTiming) bool {
for _, handler := range ch.handlers {
if checker, ok := handler.(callbacks.TimingChecker); ok {
if checker.NeedTiming(timing) {
return true
}
} else {
return true // 如果任何处理器没有实现 TimingChecker,保守估计
}
}
return false
}
// 回调链使用示例
func chainHandlerExample() {
// 创建多个处理器
loggingHandler := callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
log.Printf("LOG: 开始执行 %s", info.ComponentName)
return ctx
}).
OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
log.Printf("LOG: 完成执行 %s", info.ComponentName)
return ctx
}).
Build()
metricsHandler := callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
// 记录开始指标
recordStartMetric(info.ComponentType, info.ComponentName)
return ctx
}).
OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
// 记录结束指标
recordEndMetric(info.ComponentType, info.ComponentName)
return ctx
}).
Build()
tracingHandler := callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
// 开始追踪
return startTracing(ctx, info.ComponentName)
}).
OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
// 结束追踪
return endTracing(ctx, info.ComponentName)
}).
Build()
// 创建回调链
chainHandler := NewChainHandler(loggingHandler, metricsHandler, tracingHandler)
// 使用回调链
// ... 在组件执行中使用 chainHandler
}
func recordStartMetric(componentType, componentName string) {
fmt.Printf("METRIC: %s.%s started\n", componentType, componentName)
}
func recordEndMetric(componentType, componentName string) {
fmt.Printf("METRIC: %s.%s completed\n", componentType, componentName)
}
func startTracing(ctx context.Context, componentName string) context.Context {
traceID := fmt.Sprintf("trace_%s_%d", componentName, time.Now().UnixNano())
fmt.Printf("TRACE: Started %s\n", traceID)
return context.WithValue(ctx, "trace_id", traceID)
}
func endTracing(ctx context.Context, componentName string) context.Context {
if traceID, ok := ctx.Value("trace_id").(string); ok {
fmt.Printf("TRACE: Ended %s\n", traceID)
}
return ctx
}
7.3 异步回调
// 异步回调处理器
type AsyncHandler struct {
handler callbacks.Handler
workers int
queue chan func()
}
func NewAsyncHandler(handler callbacks.Handler, workers int) *AsyncHandler {
ah := &AsyncHandler{
handler: handler,
workers: workers,
queue: make(chan func(), 1000),
}
// 启动工作协程
for i := 0; i < workers; i++ {
go ah.worker()
}
return ah
}
func (ah *AsyncHandler) worker() {
for task := range ah.queue {
task()
}
}
func (ah *AsyncHandler) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
// 同步执行关键逻辑
if ah.isCritical(callbacks.TimingOnStart) {
return ah.handler.OnStart(ctx, info, input)
}
// 异步执行非关键逻辑
select {
case ah.queue <- func() {
ah.handler.OnStart(ctx, info, input)
}:
default:
// 队列满,丢弃任务
log.Printf("警告: 异步回调队列已满,丢弃 OnStart 任务")
}
return ctx
}
func (ah *AsyncHandler) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
// 同步执行关键逻辑
if ah.isCritical(callbacks.TimingOnEnd) {
return ah.handler.OnEnd(ctx, info, output)
}
// 异步执行非关键逻辑
select {
case ah.queue <- func() {
ah.handler.OnEnd(ctx, info, output)
}:
default:
log.Printf("警告: 异步回调队列已满,丢弃 OnEnd 任务")
}
return ctx
}
func (ah *AsyncHandler) OnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
// 错误处理通常是关键的,同步执行
return ah.handler.OnError(ctx, info, err)
}
func (ah *AsyncHandler) OnStartWithStreamInput(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
return ah.handler.OnStartWithStreamInput(ctx, info, input)
}
func (ah *AsyncHandler) OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
return ah.handler.OnEndWithStreamOutput(ctx, info, output)
}
func (ah *AsyncHandler) isCritical(timing callbacks.CallbackTiming) bool {
// 定义哪些回调时机是关键的,需要同步执行
switch timing {
case callbacks.TimingOnError:
return true
case callbacks.TimingOnStart:
return false // 开始回调通常不关键
case callbacks.TimingOnEnd:
return false // 结束回调通常不关键
default:
return true // 保守估计
}
}
func (ah *AsyncHandler) NeedTiming(timing callbacks.CallbackTiming) bool {
if checker, ok := ah.handler.(callbacks.TimingChecker); ok {
return checker.NeedTiming(timing)
}
return true
}
func (ah *AsyncHandler) Close() {
close(ah.queue)
}
// 异步回调使用示例
func asyncCallbackExample() {
// 创建基础处理器
baseHandler := callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
// 模拟耗时的日志记录
time.Sleep(50 * time.Millisecond)
log.Printf("详细日志: 组件 %s 开始执行", info.ComponentName)
return ctx
}).
OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
// 模拟耗时的指标记录
time.Sleep(30 * time.Millisecond)
log.Printf("详细指标: 组件 %s 执行完成", info.ComponentName)
return ctx
}).
OnError(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
// 错误处理保持同步
log.Printf("错误: 组件 %s 执行失败: %v", info.ComponentName, err)
return ctx
}).
Build()
// 创建异步处理器
asyncHandler := NewAsyncHandler(baseHandler, 3) // 3个工作协程
defer asyncHandler.Close()
// 使用异步回调
// ... 在组件执行中使用 asyncHandler
}
8. 监控与指标收集
8.1 性能监控回调
// 性能监控回调处理器
type PerformanceMonitorHandler struct {
metrics map[string]*ComponentMetrics
mu sync.RWMutex
}
type ComponentMetrics struct {
CallCount int64
TotalDuration time.Duration
ErrorCount int64
LastError error
LastErrorTime time.Time
}
func NewPerformanceMonitorHandler() *PerformanceMonitorHandler {
return &PerformanceMonitorHandler{
metrics: make(map[string]*ComponentMetrics),
}
}
func (pmh *PerformanceMonitorHandler) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
// 记录开始时间
return context.WithValue(ctx, "perf_start_time", time.Now())
}
func (pmh *PerformanceMonitorHandler) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
// 计算执行时间
if startTime, ok := ctx.Value("perf_start_time").(time.Time); ok {
duration := time.Since(startTime)
pmh.recordMetrics(info.ComponentType, info.ComponentName, duration, nil)
}
return ctx
}
func (pmh *PerformanceMonitorHandler) OnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
// 记录错误
pmh.recordMetrics(info.ComponentType, info.ComponentName, 0, err)
return ctx
}
func (pmh *PerformanceMonitorHandler) OnStartWithStreamInput(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
return context.WithValue(ctx, "perf_start_time", time.Now())
}
func (pmh *PerformanceMonitorHandler) OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
if startTime, ok := ctx.Value("perf_start_time").(time.Time); ok {
duration := time.Since(startTime)
pmh.recordMetrics(info.ComponentType, info.ComponentName, duration, nil)
}
return ctx
}
func (pmh *PerformanceMonitorHandler) recordMetrics(componentType, componentName string, duration time.Duration, err error) {
key := fmt.Sprintf("%s.%s", componentType, componentName)
pmh.mu.Lock()
defer pmh.mu.Unlock()
metrics, exists := pmh.metrics[key]
if !exists {
metrics = &ComponentMetrics{}
pmh.metrics[key] = metrics
}
metrics.CallCount++
if duration > 0 {
metrics.TotalDuration += duration
}
if err != nil {
metrics.ErrorCount++
metrics.LastError = err
metrics.LastErrorTime = time.Now()
}
}
func (pmh *PerformanceMonitorHandler) GetMetrics() map[string]*ComponentMetrics {
pmh.mu.RLock()
defer pmh.mu.RUnlock()
result := make(map[string]*ComponentMetrics)
for k, v := range pmh.metrics {
result[k] = &ComponentMetrics{
CallCount: v.CallCount,
TotalDuration: v.TotalDuration,
ErrorCount: v.ErrorCount,
LastError: v.LastError,
LastErrorTime: v.LastErrorTime,
}
}
return result
}
func (pmh *PerformanceMonitorHandler) PrintReport() {
metrics := pmh.GetMetrics()
fmt.Println("=== 性能监控报告 ===")
for component, metric := range metrics {
avgDuration := time.Duration(0)
if metric.CallCount > 0 {
avgDuration = metric.TotalDuration / time.Duration(metric.CallCount)
}
fmt.Printf("组件: %s\n", component)
fmt.Printf(" 调用次数: %d\n", metric.CallCount)
fmt.Printf(" 总耗时: %v\n", metric.TotalDuration)
fmt.Printf(" 平均耗时: %v\n", avgDuration)
fmt.Printf(" 错误次数: %d\n", metric.ErrorCount)
if metric.ErrorCount > 0 {
fmt.Printf(" 最后错误: %v (%v)\n", metric.LastError, metric.LastErrorTime.Format("2006-01-02 15:04:05"))
}
fmt.Println()
}
}
func (pmh *PerformanceMonitorHandler) NeedTiming(timing callbacks.CallbackTiming) bool {
switch timing {
case callbacks.TimingOnStart, callbacks.TimingOnEnd, callbacks.TimingOnError:
return true
case callbacks.TimingOnStartWithStreamInput, callbacks.TimingOnEndWithStreamOutput:
return true
default:
return false
}
}
8.2 使用性能监控
func performanceMonitoringExample() {
// 创建性能监控处理器
perfMonitor := NewPerformanceMonitorHandler()
// 添加到全局处理器
callbacks.AppendGlobalHandlers(perfMonitor)
// 创建测试链
ctx := context.Background()
chain := compose.NewChain[string, string]().
AppendLambda("fast_processor", compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
time.Sleep(10 * time.Millisecond) // 模拟快速处理
return "fast: " + input, nil
})).
AppendLambda("slow_processor", compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
time.Sleep(100 * time.Millisecond) // 模拟慢速处理
return "slow: " + input, nil
})).
AppendLambda("error_processor", compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
if strings.Contains(input, "error") {
return "", fmt.Errorf("模拟错误: %s", input)
}
return "ok: " + input, nil
}))
runnable, err := chain.Compile(ctx)
if err != nil {
panic(err)
}
// 执行多次测试
testInputs := []string{
"test1",
"test2",
"error_case",
"test3",
"another_error",
"test4",
}
for _, input := range testInputs {
result, err := runnable.Invoke(ctx, input)
if err != nil {
log.Printf("执行失败: %v", err)
} else {
log.Printf("执行成功: %s", result)
}
}
// 打印性能报告
perfMonitor.PrintReport()
}
9. 最佳实践与使用建议
9.1 回调设计原则
- 轻量级: 回调处理器应该尽可能轻量,避免阻塞主要执行流程
- 幂等性: 回调处理器应该是幂等的,多次调用不应产生副作用
- 错误隔离: 回调处理器中的错误不应影响主要业务逻辑
- 资源管理: 正确管理回调中使用的资源,避免泄露
- 性能考虑: 对于性能敏感的场景,考虑使用异步回调
9.2 常见使用模式
// 1. 日志记录模式
func createLoggingHandler() callbacks.Handler {
return callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
log.Printf("[%s] 开始执行: %s", info.ComponentType, info.ComponentName)
return ctx
}).
OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
log.Printf("[%s] 执行完成: %s", info.ComponentType, info.ComponentName)
return ctx
}).
OnError(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
log.Printf("[%s] 执行错误: %s, 错误: %v", info.ComponentType, info.ComponentName, err)
return ctx
}).
Build()
}
// 2. 指标收集模式
func createMetricsHandler(metricsCollector MetricsCollector) callbacks.Handler {
return callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
metricsCollector.IncrementCounter(fmt.Sprintf("%s.%s.calls", info.ComponentType, info.ComponentName))
return context.WithValue(ctx, "metrics_start_time", time.Now())
}).
OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
if startTime, ok := ctx.Value("metrics_start_time").(time.Time); ok {
duration := time.Since(startTime)
metricsCollector.RecordDuration(fmt.Sprintf("%s.%s.duration", info.ComponentType, info.ComponentName), duration)
}
return ctx
}).
OnError(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
metricsCollector.IncrementCounter(fmt.Sprintf("%s.%s.errors", info.ComponentType, info.ComponentName))
return ctx
}).
Build()
}
// 3. 追踪模式
func createTracingHandler() callbacks.Handler {
return callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
span := startSpan(ctx, fmt.Sprintf("%s.%s", info.ComponentType, info.ComponentName))
return context.WithValue(ctx, "tracing_span", span)
}).
OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
if span, ok := ctx.Value("tracing_span").(Span); ok {
span.SetStatus(StatusOK)
span.End()
}
return ctx
}).
OnError(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
if span, ok := ctx.Value("tracing_span").(Span); ok {
span.SetStatus(StatusError)
span.RecordError(err)
span.End()
}
return ctx
}).
Build()
}
// 4. 缓存模式
func createCacheHandler(cache Cache) callbacks.Handler {
return callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
// 尝试从缓存获取结果
cacheKey := generateCacheKey(info, input)
if cachedResult, found := cache.Get(cacheKey); found {
return context.WithValue(ctx, "cached_result", cachedResult)
}
return context.WithValue(ctx, "cache_key", cacheKey)
}).
OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
// 将结果存入缓存
if cacheKey, ok := ctx.Value("cache_key").(string); ok {
cache.Set(cacheKey, output, 5*time.Minute)
}
return ctx
}).
Build()
}
// 辅助接口定义
type MetricsCollector interface {
IncrementCounter(name string)
RecordDuration(name string, duration time.Duration)
}
type Span interface {
SetStatus(status Status)
RecordError(err error)
End()
}
type Status int
const (
StatusOK Status = iota
StatusError
)
type Cache interface {
Get(key string) (interface{}, bool)
Set(key string, value interface{}, ttl time.Duration)
}
func startSpan(ctx context.Context, name string) Span {
// 实现追踪 span 创建逻辑
return &mockSpan{name: name}
}
func generateCacheKey(info *callbacks.RunInfo, input callbacks.CallbackInput) string {
// 实现缓存键生成逻辑
return fmt.Sprintf("%s.%s.%x", info.ComponentType, info.ComponentName, hash(input))
}
func hash(input interface{}) uint64 {
// 实现输入哈希逻辑
return 0
}
// 模拟实现
type mockSpan struct {
name string
}
func (s *mockSpan) SetStatus(status Status) {}
func (s *mockSpan) RecordError(err error) {}
func (s *mockSpan) End() {}
9.3 性能优化建议
- 使用 TimingChecker: 实现
TimingChecker
接口以避免不必要的回调调用 - 异步处理: 对于非关键路径的回调,考虑使用异步处理
- 批量操作: 对于指标收集等操作,考虑批量处理以减少开销
- 条件回调: 使用条件回调避免在不需要时执行昂贵的操作
- 资源池化: 对于频繁创建的对象,使用对象池减少 GC 压力
10. 总结
Callbacks 模块是 Eino 框架的重要组成部分,提供了强大的横切关注点处理能力。其主要特点包括:
10.1 设计优势
- 切面编程: 通过回调机制实现关注点分离,保持业务逻辑的纯净性
- 生命周期完整: 覆盖组件执行的所有关键时机,提供全面的监控能力
- 类型安全: 为不同组件类型提供类型安全的回调接口
- 灵活配置: 支持全局和局部回调处理器,满足不同场景需求
- 流式支持: 原生支持流式输入输出的回调处理
10.2 核心功能
- HandlerBuilder: 通用的回调处理器构建器,支持所有回调时机
- 组件特定处理器: 为各种组件类型提供类型安全的回调接口
- 全局处理器管理: 支持全局回调处理器的注册和管理
- 切面注入: 便捷的切面注入函数,简化组件开发者的使用
- 高级模式: 支持条件回调、回调链、异步回调等高级使用模式
10.3 应用场景
- 监控与指标: 收集组件执行的性能指标和业务指标
- 日志记录: 统一的日志记录和审计追踪
- 错误处理: 集中的错误处理和告警机制
- 缓存管理: 透明的缓存层实现
- 追踪分析: 分布式追踪和性能分析
- 治理功能: 限流、熔断、重试等治理功能的实现
通过 Callbacks 模块,开发者可以在不修改业务逻辑的情况下,为 Eino 应用添加丰富的治理和监控功能,大大提升了系统的可观测性和可维护性。
上一篇: ADK模块详解 | 下一篇: 关键数据结构与继承关系
更新时间: 2024-12-19 | 文档版本: v1.0