📖 文档概述
本文档深入分析 Eino 框架的 Schema 模块,包括消息系统、流处理机制、工具定义、文档结构等核心数据模式的设计与实现。
🏗️ Schema模块架构
模块结构图
graph TB
subgraph "Schema 核心模块"
M[Message 消息系统]
S[Stream 流处理]
T[Tool 工具定义]
D[Document 文档结构]
P[Parser 解析器]
end
subgraph "Message 子系统"
MT[MessageTemplate 消息模板]
MR[MessageRole 消息角色]
MC[MessageContent 消息内容]
MM[MessageMeta 消息元数据]
end
subgraph "Stream 子系统"
SR[StreamReader 流读取器]
SW[StreamWriter 流写入器]
SC[StreamConcatenator 流拼接器]
SM[StreamMerger 流合并器]
end
subgraph "Tool 子系统"
TI[ToolInfo 工具信息]
TP[ToolParams 工具参数]
TC[ToolChoice 工具选择]
TD[DataType 数据类型]
end
M --> MT
M --> MR
M --> MC
M --> MM
S --> SR
S --> SW
S --> SC
S --> SM
T --> TI
T --> TP
T --> TC
T --> TD
style M fill:#e8f5e8
style S fill:#fff3e0
style T fill:#f3e5f5
style D fill:#e3f2fd
💬 Message 消息系统
核心数据结构
1. Message 结构定义
// Message 是 Eino 框架中的核心消息结构
// 支持多种角色、多媒体内容、工具调用等功能
type Message struct {
// Role 消息角色:assistant、user、system、tool
Role RoleType `json:"role"`
// Content 文本内容,基础的消息内容
Content string `json:"content"`
// MultiContent 多媒体内容,如果不为空则优先使用此字段
// 支持文本、图片、音频、视频、文件等多种类型
MultiContent []ChatMessagePart `json:"multi_content,omitempty"`
// Name 消息发送者名称,可选
Name string `json:"name,omitempty"`
// ToolCalls 工具调用列表,仅用于 Assistant 消息
ToolCalls []ToolCall `json:"tool_calls,omitempty"`
// ToolCallID 工具调用ID,仅用于 Tool 消息
ToolCallID string `json:"tool_call_id,omitempty"`
// ToolName 工具名称,仅用于 Tool 消息
ToolName string `json:"tool_name,omitempty"`
// ResponseMeta 响应元数据,包含完成原因、令牌使用情况等
ResponseMeta *ResponseMeta `json:"response_meta,omitempty"`
// ReasoningContent 模型的思考过程,用于推理内容
ReasoningContent string `json:"reasoning_content,omitempty"`
// Extra 自定义扩展信息,用于模型实现的定制化需求
Extra map[string]any `json:"extra,omitempty"`
}
2. 消息角色类型
// RoleType 定义消息的角色类型
type RoleType string
const (
// Assistant 助手角色,表示由 ChatModel 返回的消息
Assistant RoleType = "assistant"
// User 用户角色,表示用户输入的消息
User RoleType = "user"
// System 系统角色,表示系统提示消息
System RoleType = "system"
// Tool 工具角色,表示工具调用的输出结果
Tool RoleType = "tool"
)
// 便捷的消息创建函数
func SystemMessage(content string) *Message {
return &Message{
Role: System,
Content: content,
}
}
func UserMessage(content string) *Message {
return &Message{
Role: User,
Content: content,
}
}
func AssistantMessage(content string, toolCalls []ToolCall) *Message {
return &Message{
Role: Assistant,
Content: content,
ToolCalls: toolCalls,
}
}
func ToolMessage(content string, toolCallID string, opts ...ToolMessageOption) *Message {
o := &toolMessageOptions{}
for _, opt := range opts {
opt(o)
}
return &Message{
Role: Tool,
Content: content,
ToolCallID: toolCallID,
ToolName: o.toolName,
}
}
3. 多媒体内容支持
// ChatMessagePart 聊天消息的组成部分
type ChatMessagePart struct {
// Type 内容类型:text、image_url、audio_url、video_url、file_url
Type ChatMessagePartType `json:"type,omitempty"`
// Text 文本内容,当 Type 为 "text" 时使用
Text string `json:"text,omitempty"`
// ImageURL 图片URL,当 Type 为 "image_url" 时使用
ImageURL *ChatMessageImageURL `json:"image_url,omitempty"`
// AudioURL 音频URL,当 Type 为 "audio_url" 时使用
AudioURL *ChatMessageAudioURL `json:"audio_url,omitempty"`
// VideoURL 视频URL,当 Type 为 "video_url" 时使用
VideoURL *ChatMessageVideoURL `json:"video_url,omitempty"`
// FileURL 文件URL,当 Type 为 "file_url" 时使用
FileURL *ChatMessageFileURL `json:"file_url,omitempty"`
}
// ChatMessageImageURL 图片URL结构
type ChatMessageImageURL struct {
// URL 可以是传统URL或符合RFC-2397的内联数据URL
URL string `json:"url,omitempty"`
URI string `json:"uri,omitempty"`
// Detail 图片质量:high、low、auto
Detail ImageURLDetail `json:"detail,omitempty"`
// MIMEType MIME类型,如 "image/png"
MIMEType string `json:"mime_type,omitempty"`
// Extra 扩展信息
Extra map[string]any `json:"extra,omitempty"`
}
消息模板系统
1. MessagesTemplate 接口
// MessagesTemplate 消息模板接口
// 用于将模板渲染为消息列表
type MessagesTemplate interface {
Format(ctx context.Context, vs map[string]any, formatType FormatType) ([]*Message, error)
}
// FormatType 格式化类型
type FormatType uint8
const (
// FString Python风格的字符串格式化
FString FormatType = 0
// GoTemplate Go标准库的模板格式
GoTemplate FormatType = 1
// Jinja2 Jinja2模板格式
Jinja2 FormatType = 2
)
2. 消息模板实现
// Message 实现 MessagesTemplate 接口
func (m *Message) Format(_ context.Context, vs map[string]any, formatType FormatType) ([]*Message, error) {
// 格式化主要内容
c, err := formatContent(m.Content, vs, formatType)
if err != nil {
return nil, err
}
// 创建副本并更新内容
copied := *m
copied.Content = c
// 格式化多媒体内容中的文本部分
if len(m.MultiContent) != 0 {
copied.MultiContent = make([]ChatMessagePart, len(m.MultiContent))
copy(copied.MultiContent, m.MultiContent)
for i, mc := range copied.MultiContent {
if len(mc.Text) > 0 {
nmc, err := formatContent(mc.Text, vs, formatType)
if err != nil {
return nil, err
}
copied.MultiContent[i].Text = nmc
}
}
}
return []*Message{&copied}, nil
}
// formatContent 根据不同格式类型格式化内容
func formatContent(content string, vs map[string]any, formatType FormatType) (string, error) {
switch formatType {
case FString:
// 使用 Python 风格格式化
return pyfmt.Fmt(content, vs)
case GoTemplate:
// 使用 Go 模板
parsedTmpl, err := template.New("template").
Option("missingkey=error").
Parse(content)
if err != nil {
return "", err
}
sb := new(strings.Builder)
err = parsedTmpl.Execute(sb, vs)
if err != nil {
return "", err
}
return sb.String(), nil
case Jinja2:
// 使用 Jinja2 模板
env, err := getJinjaEnv()
if err != nil {
return "", err
}
tpl, err := env.FromString(content)
if err != nil {
return "", err
}
out, err := tpl.Execute(vs)
if err != nil {
return "", err
}
return out, nil
default:
return "", fmt.Errorf("unknown format type: %v", formatType)
}
}
3. MessagesPlaceholder 占位符
// MessagesPlaceholder 消息占位符
// 用于在模板中引用参数中的消息列表
type messagesPlaceholder struct {
key string // 参数键名
optional bool // 是否可选
}
func MessagesPlaceholder(key string, optional bool) MessagesTemplate {
return &messagesPlaceholder{
key: key,
optional: optional,
}
}
// Format 返回指定键的消息列表
func (p *messagesPlaceholder) Format(_ context.Context, vs map[string]any, _ FormatType) ([]*Message, error) {
v, ok := vs[p.key]
if !ok {
if p.optional {
return []*Message{}, nil
}
return nil, fmt.Errorf("message placeholder format: %s not found", p.key)
}
msgs, ok := v.([]*Message)
if !ok {
return nil, fmt.Errorf("only messages can be used to format message placeholder, key: %v, actual type: %v",
p.key, reflect.TypeOf(v))
}
return msgs, nil
}
消息拼接机制
1. ConcatMessages 函数
// ConcatMessages 拼接相同角色和名称的消息
// 用于将流式消息合并为单一消息
func ConcatMessages(msgs []*Message) (*Message, error) {
var (
contents []string // 内容片段
contentLen int // 内容总长度
reasoningContents []string // 推理内容片段
reasoningContentLen int // 推理内容总长度
toolCalls []ToolCall // 工具调用列表
ret = Message{} // 结果消息
extraList = make([]map[string]any, 0, len(msgs)) // 扩展信息列表
)
for idx, msg := range msgs {
if msg == nil {
return nil, fmt.Errorf("unexpected nil chunk in message stream, index: %d", idx)
}
// 验证角色一致性
if msg.Role != "" {
if ret.Role == "" {
ret.Role = msg.Role
} else if ret.Role != msg.Role {
return nil, fmt.Errorf("cannot concat messages with different roles: '%s' '%s'",
ret.Role, msg.Role)
}
}
// 验证名称一致性
if msg.Name != "" {
if ret.Name == "" {
ret.Name = msg.Name
} else if ret.Name != msg.Name {
return nil, fmt.Errorf("cannot concat messages with different names: '%s' '%s'",
ret.Name, msg.Name)
}
}
// 收集内容
if msg.Content != "" {
contents = append(contents, msg.Content)
contentLen += len(msg.Content)
}
// 收集推理内容
if msg.ReasoningContent != "" {
reasoningContents = append(reasoningContents, msg.ReasoningContent)
reasoningContentLen += len(msg.ReasoningContent)
}
// 收集工具调用
if len(msg.ToolCalls) > 0 {
toolCalls = append(toolCalls, msg.ToolCalls...)
}
// 收集扩展信息
if len(msg.Extra) > 0 {
extraList = append(extraList, msg.Extra)
}
// 处理响应元数据
if msg.ResponseMeta != nil && ret.ResponseMeta == nil {
ret.ResponseMeta = &ResponseMeta{}
}
if msg.ResponseMeta != nil && ret.ResponseMeta != nil {
// 保留最后一个有效的完成原因
if msg.ResponseMeta.FinishReason != "" {
ret.ResponseMeta.FinishReason = msg.ResponseMeta.FinishReason
}
// 合并令牌使用情况(取最大值)
if msg.ResponseMeta.Usage != nil {
if ret.ResponseMeta.Usage == nil {
ret.ResponseMeta.Usage = &TokenUsage{}
}
if msg.ResponseMeta.Usage.PromptTokens > ret.ResponseMeta.Usage.PromptTokens {
ret.ResponseMeta.Usage.PromptTokens = msg.ResponseMeta.Usage.PromptTokens
}
if msg.ResponseMeta.Usage.CompletionTokens > ret.ResponseMeta.Usage.CompletionTokens {
ret.ResponseMeta.Usage.CompletionTokens = msg.ResponseMeta.Usage.CompletionTokens
}
if msg.ResponseMeta.Usage.TotalTokens > ret.ResponseMeta.Usage.TotalTokens {
ret.ResponseMeta.Usage.TotalTokens = msg.ResponseMeta.Usage.TotalTokens
}
}
// 合并日志概率
if msg.ResponseMeta.LogProbs != nil {
if ret.ResponseMeta.LogProbs == nil {
ret.ResponseMeta.LogProbs = &LogProbs{}
}
ret.ResponseMeta.LogProbs.Content = append(ret.ResponseMeta.LogProbs.Content,
msg.ResponseMeta.LogProbs.Content...)
}
}
}
// 拼接内容
if len(contents) > 0 {
var sb strings.Builder
sb.Grow(contentLen)
for _, content := range contents {
sb.WriteString(content)
}
ret.Content = sb.String()
}
// 拼接推理内容
if len(reasoningContents) > 0 {
var sb strings.Builder
sb.Grow(reasoningContentLen)
for _, rc := range reasoningContents {
sb.WriteString(rc)
}
ret.ReasoningContent = sb.String()
}
// 拼接工具调用
if len(toolCalls) > 0 {
merged, err := concatToolCalls(toolCalls)
if err != nil {
return nil, err
}
ret.ToolCalls = merged
}
// 合并扩展信息
if len(extraList) > 0 {
extra, err := concatExtra(extraList)
if err != nil {
return nil, fmt.Errorf("failed to concat message's extra: %w", err)
}
if len(extra) > 0 {
ret.Extra = extra
}
}
return &ret, nil
}
2. 工具调用拼接
// concatToolCalls 拼接工具调用
// 处理流式工具调用的合并逻辑
func concatToolCalls(chunks []ToolCall) ([]ToolCall, error) {
var merged []ToolCall
m := make(map[int][]int) // 索引到块列表的映射
// 按索引分组
for i := range chunks {
index := chunks[i].Index
if index == nil {
// 没有索引的直接添加
merged = append(merged, chunks[i])
} else {
// 有索引的按索引分组
m[*index] = append(m[*index], i)
}
}
var args strings.Builder
// 处理每个索引组
for k, v := range m {
index := k
toolCall := ToolCall{Index: &index}
if len(v) > 0 {
toolCall = chunks[v[0]]
}
args.Reset()
toolID, toolType, toolName := "", "", ""
// 合并同一索引的所有块
for _, n := range v {
chunk := chunks[n]
// 验证工具ID一致性
if chunk.ID != "" {
if toolID == "" {
toolID = chunk.ID
} else if toolID != chunk.ID {
return nil, fmt.Errorf("cannot concat ToolCalls with different tool id: '%s' '%s'",
toolID, chunk.ID)
}
}
// 验证工具类型一致性
if chunk.Type != "" {
if toolType == "" {
toolType = chunk.Type
} else if toolType != chunk.Type {
return nil, fmt.Errorf("cannot concat ToolCalls with different tool type: '%s' '%s'",
toolType, chunk.Type)
}
}
// 验证工具名称一致性
if chunk.Function.Name != "" {
if toolName == "" {
toolName = chunk.Function.Name
} else if toolName != chunk.Function.Name {
return nil, fmt.Errorf("cannot concat ToolCalls with different tool name: '%s' '%s'",
toolName, chunk.Function.Name)
}
}
// 拼接参数
if chunk.Function.Arguments != "" {
args.WriteString(chunk.Function.Arguments)
}
}
// 设置合并后的工具调用
toolCall.ID = toolID
toolCall.Type = toolType
toolCall.Function.Name = toolName
toolCall.Function.Arguments = args.String()
merged = append(merged, toolCall)
}
// 按索引排序
if len(merged) > 1 {
sort.SliceStable(merged, func(i, j int) bool {
iVal, jVal := merged[i].Index, merged[j].Index
if iVal == nil && jVal == nil {
return false
} else if iVal == nil && jVal != nil {
return true
} else if iVal != nil && jVal == nil {
return false
}
return *iVal < *jVal
})
}
return merged, nil
}
🌊 Stream 流处理系统
流处理架构
graph TB
subgraph "流处理核心"
P[Pipe 管道]
SR[StreamReader 读取器]
SW[StreamWriter 写入器]
end
subgraph "流操作"
C[Copy 复制]
M[Merge 合并]
T[Transform 转换]
CC[Concat 拼接]
end
subgraph "流类型"
S[Stream 基础流]
A[Array 数组流]
MS[MultiStream 多流]
CS[ConvertStream 转换流]
CHS[ChildStream 子流]
end
P --> SR
P --> SW
SR --> C
SR --> M
SR --> T
SR --> CC
SR --> S
SR --> A
SR --> MS
SR --> CS
SR --> CHS
style P fill:#e8f5e8
style SR fill:#fff3e0
style C fill:#f3e5f5
style S fill:#e3f2fd
核心流接口
1. StreamReader 流读取器
// StreamReader 流读取器,支持多种底层实现
type StreamReader[T any] struct {
typ readerType // 读取器类型
st *stream[T] // 基础流
ar *arrayReader[T] // 数组读取器
msr *multiStreamReader[T] // 多流读取器
srw *streamReaderWithConvert[T] // 转换流读取器
csr *childStreamReader[T] // 子流读取器
}
// Recv 接收流中的下一个数据项
// 返回 io.EOF 表示流结束
func (sr *StreamReader[T]) Recv() (T, error) {
switch sr.typ {
case readerTypeStream:
return sr.st.recv()
case readerTypeArray:
return sr.ar.recv()
case readerTypeMultiStream:
return sr.msr.recv()
case readerTypeWithConvert:
return sr.srw.recv()
case readerTypeChild:
return sr.csr.recv()
default:
panic("impossible")
}
}
// Close 安全关闭流读取器
// 应该只调用一次,多次调用可能无法正常工作
func (sr *StreamReader[T]) Close() {
switch sr.typ {
case readerTypeStream:
sr.st.closeRecv()
case readerTypeArray:
// 数组读取器无需清理
case readerTypeMultiStream:
sr.msr.close()
case readerTypeWithConvert:
sr.srw.close()
case readerTypeChild:
sr.csr.close()
default:
panic("impossible")
}
}
2. StreamWriter 流写入器
// StreamWriter 流写入器
type StreamWriter[T any] struct {
stm *stream[T] // 底层流
}
// Send 发送数据到流中
// 返回 true 表示流已关闭
func (sw *StreamWriter[T]) Send(chunk T, err error) (closed bool) {
return sw.stm.send(chunk, err)
}
// Close 通知接收者流发送已完成
// 接收者将从 StreamReader.Recv() 收到 io.EOF 错误
func (sw *StreamWriter[T]) Close() {
sw.stm.closeSend()
}
3. Pipe 管道创建
// Pipe 创建指定容量的流管道
// 返回 StreamReader 和 StreamWriter 用于读写
func Pipe[T any](cap int) (*StreamReader[T], *StreamWriter[T]) {
stm := newStream[T](cap)
return stm.asReader(), &StreamWriter[T]{stm: stm}
}
// 使用示例
func ExamplePipe() {
sr, sw := schema.Pipe[string](3)
// 发送数据的协程
go func() {
defer sw.Close()
for i := 0; i < 10; i++ {
sw.Send(fmt.Sprintf("item_%d", i), nil)
}
}()
// 接收数据
defer sr.Close()
for {
chunk, err := sr.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Println(chunk)
}
}
流操作功能
1. 流复制 (Copy)
// Copy 创建多个独立的流读取器副本
// 原始流在复制后将不可用
func (sr *StreamReader[T]) Copy(n int) []*StreamReader[T] {
if n < 2 {
return []*StreamReader[T]{sr}
}
if sr.typ == readerTypeArray {
// 数组流的复制比较简单
ret := make([]*StreamReader[T], n)
for i, ar := range sr.ar.copy(n) {
ret[i] = &StreamReader[T]{typ: readerTypeArray, ar: ar}
}
return ret
}
// 其他类型流的复制
return copyStreamReaders[T](sr, n)
}
// copyStreamReaders 实现复杂流的复制逻辑
func copyStreamReaders[T any](sr *StreamReader[T], n int) []*StreamReader[T] {
cpsr := &parentStreamReader[T]{
sr: sr,
subStreamList: make([]*cpStreamElement[T], n),
closedNum: 0,
}
// 初始化子流列表
elem := &cpStreamElement[T]{}
for i := range cpsr.subStreamList {
cpsr.subStreamList[i] = elem
}
// 创建子流读取器
ret := make([]*StreamReader[T], n)
for i := range ret {
ret[i] = &StreamReader[T]{
csr: &childStreamReader[T]{
parent: cpsr,
index: i,
},
typ: readerTypeChild,
}
}
return ret
}
2. 流合并 (Merge)
// MergeStreamReaders 将多个流合并为一个
func MergeStreamReaders[T any](srs []*StreamReader[T]) *StreamReader[T] {
if len(srs) < 1 {
return nil
}
if len(srs) < 2 {
return srs[0]
}
var arr []T
var ss []*stream[T]
// 分类处理不同类型的流
for _, sr := range srs {
switch sr.typ {
case readerTypeStream:
ss = append(ss, sr.st)
case readerTypeArray:
arr = append(arr, sr.ar.arr[sr.ar.index:]...)
case readerTypeMultiStream:
ss = append(ss, sr.msr.nonClosedStreams()...)
case readerTypeWithConvert:
ss = append(ss, sr.srw.toStream())
case readerTypeChild:
ss = append(ss, sr.csr.toStream())
default:
panic("impossible")
}
}
// 如果只有数组数据,返回数组流
if len(ss) == 0 {
return &StreamReader[T]{
typ: readerTypeArray,
ar: &arrayReader[T]{
arr: arr,
index: 0,
},
}
}
// 如果有数组数据,转换为流
if len(arr) != 0 {
s := arrToStream(arr)
ss = append(ss, s)
}
// 返回多流读取器
return &StreamReader[T]{
typ: readerTypeMultiStream,
msr: newMultiStreamReader(ss),
}
}
3. 命名流合并
// MergeNamedStreamReaders 合并命名流,保留流名称信息
// 当源流结束时,返回包含源流名称的 SourceEOF 错误
func MergeNamedStreamReaders[T any](srs map[string]*StreamReader[T]) *StreamReader[T] {
if len(srs) < 1 {
return nil
}
ss := make([]*StreamReader[T], len(srs))
names := make([]string, len(srs))
i := 0
for name, sr := range srs {
ss[i] = sr
names[i] = name
i++
}
return InternalMergeNamedStreamReaders(ss, names)
}
// SourceEOF 表示来自特定源流的EOF错误
type SourceEOF struct {
sourceName string
}
func (e *SourceEOF) Error() string {
return fmt.Sprintf("EOF from source stream: %s", e.sourceName)
}
// GetSourceName 从 SourceEOF 错误中提取源流名称
func GetSourceName(err error) (string, bool) {
var sErr *SourceEOF
if errors.As(err, &sErr) {
return sErr.sourceName, true
}
return "", false
}
4. 流转换 (Transform)
// StreamReaderWithConvert 将流转换为另一种类型的流
func StreamReaderWithConvert[T, D any](sr *StreamReader[T], convert func(T) (D, error)) *StreamReader[D] {
c := func(a any) (D, error) {
return convert(a.(T))
}
return newStreamReaderWithConvert(sr, c)
}
// streamReaderWithConvert 转换流的实现
type streamReaderWithConvert[T any] struct {
sr iStreamReader // 源流
convert func(any) (T, error) // 转换函数
}
func (srw *streamReaderWithConvert[T]) recv() (T, error) {
for {
out, err := srw.sr.recvAny()
if err != nil {
var t T
return t, err
}
t, err := srw.convert(out)
if err == nil {
return t, nil
}
// ErrNoValue 用于跳过某些值
if !errors.Is(err, ErrNoValue) {
return t, err
}
}
}
// 使用示例
func ExampleStreamConvert() {
// 创建整数流
intReader := schema.StreamReaderFromArray([]int{1, 2, 3, 4, 5})
// 转换为字符串流
stringReader := schema.StreamReaderWithConvert(intReader, func(i int) (string, error) {
if i%2 == 0 {
return fmt.Sprintf("even_%d", i), nil
}
return "", schema.ErrNoValue // 跳过奇数
})
defer stringReader.Close()
for {
s, err := stringReader.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Println(s) // 输出: even_2, even_4
}
}
流生命周期管理
1. 自动关闭机制
// SetAutomaticClose 设置流在不再可达时自动关闭
// 不是并发安全的
func (sr *StreamReader[T]) SetAutomaticClose() {
switch sr.typ {
case readerTypeStream:
if !sr.st.automaticClose {
sr.st.automaticClose = true
var flag uint32
sr.st.closedFlag = &flag
runtime.SetFinalizer(sr, func(s *StreamReader[T]) {
s.Close()
})
}
case readerTypeMultiStream:
for _, s := range sr.msr.nonClosedStreams() {
if !s.automaticClose {
s.automaticClose = true
var flag uint32
s.closedFlag = &flag
runtime.SetFinalizer(s, func(st *stream[T]) {
st.closeRecv()
})
}
}
case readerTypeChild:
parent := sr.csr.parent.sr
parent.SetAutomaticClose()
case readerTypeWithConvert:
sr.srw.sr.SetAutomaticClose()
case readerTypeArray:
// 数组流无需清理
default:
}
}
2. 流状态管理
// stream 基础流结构
type stream[T any] struct {
items chan streamItem[T] // 数据通道
closed chan struct{} // 关闭信号通道
automaticClose bool // 是否自动关闭
closedFlag *uint32 // 关闭标志(原子操作)
}
type streamItem[T any] struct {
chunk T // 数据块
err error // 错误信息
}
func (s *stream[T]) send(chunk T, err error) (closed bool) {
// 检查流是否已关闭
select {
case <-s.closed:
return true
default:
}
item := streamItem[T]{chunk, err}
select {
case <-s.closed:
return true
case s.items <- item:
return false
}
}
func (s *stream[T]) recv() (chunk T, err error) {
item, ok := <-s.items
if !ok {
item.err = io.EOF
}
return item.chunk, item.err
}
func (s *stream[T]) closeRecv() {
if s.automaticClose {
if atomic.CompareAndSwapUint32(s.closedFlag, 0, 1) {
close(s.closed)
}
return
}
close(s.closed)
}
🔧 Tool 工具定义系统
工具系统架构
graph TB
subgraph "工具定义"
TI[ToolInfo 工具信息]
TC[ToolChoice 工具选择]
TP[ToolParams 工具参数]
end
subgraph "参数系统"
PI[ParameterInfo 参数信息]
DT[DataType 数据类型]
PO[ParamsOneOf 参数联合]
end
subgraph "Schema支持"
JS[JSONSchema]
OA[OpenAPIV3]
PM[ParamMap]
end
TI --> TC
TI --> TP
TP --> PI
PI --> DT
TP --> PO
PO --> JS
PO --> OA
PO --> PM
style TI fill:#e8f5e8
style PI fill:#fff3e0
style JS fill:#f3e5f5
工具信息定义
1. ToolInfo 结构
// ToolInfo 工具信息定义
type ToolInfo struct {
// Name 工具的唯一名称,清楚地表达其用途
Name string
// Desc 描述如何/何时/为什么使用该工具
// 可以提供少量示例作为描述的一部分
Desc string
// Extra 工具的额外信息
Extra map[string]any
// ParamsOneOf 工具接受的参数定义
// 可以通过两种方式描述:
// - 使用 params: schema.NewParamsOneOfByParams(params)
// - 使用 openAPIV3: schema.NewParamsOneOfByOpenAPIV3(openAPIV3)
// 如果为 nil,表示工具不需要任何输入参数
*ParamsOneOf
}
2. ToolChoice 工具选择策略
// ToolChoice 控制模型如何调用工具
type ToolChoice string
const (
// ToolChoiceForbidden 模型不应调用任何工具
// 对应 OpenAI Chat Completion 中的 "none"
ToolChoiceForbidden ToolChoice = "forbidden"
// ToolChoiceAllowed 模型可以选择生成消息或调用一个或多个工具
// 对应 OpenAI Chat Completion 中的 "auto"
ToolChoiceAllowed ToolChoice = "allowed"
// ToolChoiceForced 模型必须调用一个或多个工具
// 对应 OpenAI Chat Completion 中的 "required"
ToolChoiceForced ToolChoice = "forced"
)
参数定义系统
1. ParameterInfo 参数信息
// ParameterInfo 参数信息定义
type ParameterInfo struct {
// Type 参数类型
Type DataType
// ElemInfo 元素类型信息,仅用于数组类型
ElemInfo *ParameterInfo
// SubParams 子参数,仅用于对象类型
SubParams map[string]*ParameterInfo
// Desc 参数描述
Desc string
// Enum 枚举值,仅用于字符串类型
Enum []string
// Required 是否必需
Required bool
}
// DataType 数据类型定义
type DataType string
const (
Object DataType = "object" // 对象类型
Number DataType = "number" // 数字类型
Integer DataType = "integer" // 整数类型
String DataType = "string" // 字符串类型
Array DataType = "array" // 数组类型
Null DataType = "null" // 空值类型
Boolean DataType = "boolean" // 布尔类型
)
2. ParamsOneOf 参数联合类型
// ParamsOneOf 参数描述的联合类型
// 用户必须指定且仅指定一种方法来描述参数
type ParamsOneOf struct {
// params 使用 NewParamsOneOfByParams 设置
params map[string]*ParameterInfo
// openAPIV3 使用 NewParamsOneOfByOpenAPIV3 设置(已废弃)
openAPIV3 *openapi3.Schema
// jsonschema 使用 NewParamsOneOfByJSONSchema 设置
jsonschema *jsonschema.Schema
}
// NewParamsOneOfByParams 通过参数映射创建 ParamsOneOf
func NewParamsOneOfByParams(params map[string]*ParameterInfo) *ParamsOneOf {
return &ParamsOneOf{
params: params,
}
}
// NewParamsOneOfByJSONSchema 通过 JSONSchema 创建 ParamsOneOf
func NewParamsOneOfByJSONSchema(s *jsonschema.Schema) *ParamsOneOf {
return &ParamsOneOf{
jsonschema: s,
}
}
3. Schema 转换功能
// ToJSONSchema 将 ParamsOneOf 转换为 JSONSchema 格式
func (p *ParamsOneOf) ToJSONSchema() (*jsonschema.Schema, error) {
if p == nil {
return nil, nil
}
if p.params != nil {
// 从参数映射转换
sc := &jsonschema.Schema{
Properties: orderedmap.New[string, *jsonschema.Schema](),
Type: string(Object),
Required: make([]string, 0, len(p.params)),
}
for k := range p.params {
v := p.params[k]
sc.Properties.Set(k, paramInfoToJSONSchema(v))
if v.Required {
sc.Required = append(sc.Required, k)
}
}
return sc, nil
}
if p.openAPIV3 != nil {
// 从 OpenAPIV3 转换
js, err := openapiV3ToJSONSchema(p.openAPIV3)
if err != nil {
return nil, fmt.Errorf("convert OpenAPIV3 to JSONSchema failed: %w", err)
}
return js, nil
}
return p.jsonschema, nil
}
// paramInfoToJSONSchema 将 ParameterInfo 转换为 JSONSchema
func paramInfoToJSONSchema(paramInfo *ParameterInfo) *jsonschema.Schema {
js := &jsonschema.Schema{
Type: string(paramInfo.Type),
Description: paramInfo.Desc,
}
// 处理枚举值
if len(paramInfo.Enum) > 0 {
js.Enum = make([]any, len(paramInfo.Enum))
for i, enum := range paramInfo.Enum {
js.Enum[i] = enum
}
}
// 处理数组元素类型
if paramInfo.ElemInfo != nil {
js.Items = paramInfoToJSONSchema(paramInfo.ElemInfo)
}
// 处理对象子参数
if len(paramInfo.SubParams) > 0 {
required := make([]string, 0, len(paramInfo.SubParams))
js.Properties = orderedmap.New[string, *jsonschema.Schema]()
for k, v := range paramInfo.SubParams {
item := paramInfoToJSONSchema(v)
js.Properties.Set(k, item)
if v.Required {
required = append(required, k)
}
}
js.Required = required
}
return js
}
工具使用示例
1. 简单工具定义
func ExampleSimpleTool() {
// 定义天气查询工具
weatherTool := &schema.ToolInfo{
Name: "get_weather",
Desc: "获取指定城市的当前天气信息",
ParamsOneOf: schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{
"city": {
Type: schema.String,
Desc: "城市名称",
Required: true,
},
"unit": {
Type: schema.String,
Desc: "温度单位",
Enum: []string{"celsius", "fahrenheit"},
Required: false,
},
}),
}
// 转换为 JSONSchema
jsonSchema, err := weatherTool.ParamsOneOf.ToJSONSchema()
if err != nil {
log.Fatal(err)
}
fmt.Printf("Tool Schema: %+v\n", jsonSchema)
}
2. 复杂工具定义
func ExampleComplexTool() {
// 定义数据库查询工具
dbQueryTool := &schema.ToolInfo{
Name: "database_query",
Desc: "执行数据库查询操作,支持SELECT、INSERT、UPDATE、DELETE",
Extra: map[string]any{
"timeout": 30,
"retry": 3,
},
ParamsOneOf: schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{
"operation": {
Type: schema.String,
Desc: "数据库操作类型",
Enum: []string{"SELECT", "INSERT", "UPDATE", "DELETE"},
Required: true,
},
"table": {
Type: schema.String,
Desc: "目标表名",
Required: true,
},
"conditions": {
Type: schema.Object,
Desc: "查询条件",
SubParams: map[string]*schema.ParameterInfo{
"where": {
Type: schema.String,
Desc: "WHERE子句",
Required: false,
},
"limit": {
Type: schema.Integer,
Desc: "结果数量限制",
Required: false,
},
},
Required: false,
},
"data": {
Type: schema.Array,
Desc: "要插入或更新的数据",
ElemInfo: &schema.ParameterInfo{
Type: schema.Object,
Desc: "数据行",
},
Required: false,
},
}),
}
// 使用工具
fmt.Printf("Tool: %s\n", dbQueryTool.Name)
fmt.Printf("Description: %s\n", dbQueryTool.Desc)
}
📄 Document 文档系统
文档结构定义
// Document 带有元数据的文本片段
type Document struct {
// ID 文档的唯一标识符
ID string `json:"id"`
// Content 文档内容
Content string `json:"content"`
// MetaData 文档元数据,可用于存储额外信息
MetaData map[string]any `json:"meta_data"`
}
// String 返回文档内容
func (d *Document) String() string {
return d.Content
}
文档元数据管理
1. 预定义元数据键
const (
docMetaDataKeySubIndexes = "_sub_indexes" // 子索引
docMetaDataKeyScore = "_score" // 相关性分数
docMetaDataKeyExtraInfo = "_extra_info" // 额外信息
docMetaDataKeyDSL = "_dsl" // DSL查询
docMetaDataKeyDenseVector = "_dense_vector" // 密集向量
docMetaDataKeySparseVector = "_sparse_vector" // 稀疏向量
)
2. 文档操作方法
// WithSubIndexes 设置文档的子索引
func (d *Document) WithSubIndexes(indexes []string) *Document {
if d.MetaData == nil {
d.MetaData = make(map[string]any)
}
d.MetaData[docMetaDataKeySubIndexes] = indexes
return d
}
// SubIndexes 获取文档的子索引
func (d *Document) SubIndexes() []string {
if d.MetaData == nil {
return nil
}
indexes, ok := d.MetaData[docMetaDataKeySubIndexes].([]string)
if ok {
return indexes
}
return nil
}
// WithScore 设置文档的相关性分数
func (d *Document) WithScore(score float64) *Document {
if d.MetaData == nil {
d.MetaData = make(map[string]any)
}
d.MetaData[docMetaDataKeyScore] = score
return d
}
// Score 获取文档的相关性分数
func (d *Document) Score() float64 {
if d.MetaData == nil {
return 0
}
score, ok := d.MetaData[docMetaDataKeyScore].(float64)
if ok {
return score
}
return 0
}
// WithDenseVector 设置文档的密集向量
func (d *Document) WithDenseVector(vector []float64) *Document {
if d.MetaData == nil {
d.MetaData = make(map[string]any)
}
d.MetaData[docMetaDataKeyDenseVector] = vector
return d
}
// DenseVector 获取文档的密集向量
func (d *Document) DenseVector() []float64 {
if d.MetaData == nil {
return nil
}
vector, ok := d.MetaData[docMetaDataKeyDenseVector].([]float64)
if ok {
return vector
}
return nil
}
文档使用示例
func ExampleDocument() {
// 创建文档
doc := &schema.Document{
ID: "doc_001",
Content: "这是一个关于人工智能的文档内容。",
MetaData: map[string]any{
"author": "张三",
"category": "AI",
"timestamp": time.Now(),
},
}
// 设置子索引
doc.WithSubIndexes([]string{"ai", "machine_learning", "deep_learning"})
// 设置相关性分数
doc.WithScore(0.95)
// 设置向量
doc.WithDenseVector([]float64{0.1, 0.2, 0.3, 0.4, 0.5})
// 使用文档
fmt.Printf("Document ID: %s\n", doc.ID)
fmt.Printf("Content: %s\n", doc.Content)
fmt.Printf("Sub Indexes: %v\n", doc.SubIndexes())
fmt.Printf("Score: %.2f\n", doc.Score())
fmt.Printf("Vector: %v\n", doc.DenseVector())
}
📊 性能优化与最佳实践
流处理性能优化
1. 缓冲区大小选择
// 根据数据量选择合适的缓冲区大小
func OptimalBufferSize(dataSize int, itemSize int) int {
// 小数据量:使用较小缓冲区
if dataSize < 1000 {
return 10
}
// 中等数据量:使用中等缓冲区
if dataSize < 100000 {
return 100
}
// 大数据量:使用较大缓冲区
return 1000
}
// 使用示例
func ExampleOptimalBuffer() {
dataSize := 50000
bufferSize := OptimalBufferSize(dataSize, 1)
sr, sw := schema.Pipe[string](bufferSize)
// 使用优化后的缓冲区...
}
2. 流生命周期管理
// 正确的流使用模式
func ProperStreamUsage() {
sr, sw := schema.Pipe[string](100)
// 发送协程
go func() {
defer sw.Close() // 确保关闭写入器
for i := 0; i < 1000; i++ {
if sw.Send(fmt.Sprintf("item_%d", i), nil) {
break // 流已关闭,退出
}
}
}()
// 接收协程
defer sr.Close() // 确保关闭读取器
for {
item, err := sr.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Error: %v", err)
break
}
// 处理数据
processItem(item)
}
}
func processItem(item string) {
// 处理逻辑
}
3. 内存使用优化
// 使用自动关闭避免内存泄漏
func MemoryOptimizedStream() {
sr := schema.StreamReaderFromArray([]string{"a", "b", "c"})
// 设置自动关闭
sr.SetAutomaticClose()
// 即使忘记调用 Close(),GC 时也会自动清理
// 但仍建议显式调用 Close()
defer sr.Close()
// 使用流...
}
消息处理最佳实践
1. 消息模板优化
// 预编译模板提高性能
type TemplateCache struct {
templates map[string]*template.Template
mutex sync.RWMutex
}
func (tc *TemplateCache) GetTemplate(content string) (*template.Template, error) {
tc.mutex.RLock()
tmpl, exists := tc.templates[content]
tc.mutex.RUnlock()
if exists {
return tmpl, nil
}
tc.mutex.Lock()
defer tc.mutex.Unlock()
// 双重检查
if tmpl, exists := tc.templates[content]; exists {
return tmpl, nil
}
// 编译模板
tmpl, err := template.New("").Parse(content)
if err != nil {
return nil, err
}
if tc.templates == nil {
tc.templates = make(map[string]*template.Template)
}
tc.templates[content] = tmpl
return tmpl, nil
}
2. 消息拼接优化
// 高效的消息拼接
func EfficientMessageConcat(msgs []*schema.Message) (*schema.Message, error) {
if len(msgs) == 0 {
return nil, fmt.Errorf("empty message list")
}
if len(msgs) == 1 {
return msgs[0], nil // 单个消息直接返回
}
// 预计算总长度,减少内存分配
var totalContentLen int
var totalReasoningLen int
for _, msg := range msgs {
totalContentLen += len(msg.Content)
totalReasoningLen += len(msg.ReasoningContent)
}
// 使用预分配的 Builder
contentBuilder := strings.Builder{}
contentBuilder.Grow(totalContentLen)
reasoningBuilder := strings.Builder{}
reasoningBuilder.Grow(totalReasoningLen)
// 拼接内容
for _, msg := range msgs {
contentBuilder.WriteString(msg.Content)
reasoningBuilder.WriteString(msg.ReasoningContent)
}
result := *msgs[0] // 复制第一个消息
result.Content = contentBuilder.String()
result.ReasoningContent = reasoningBuilder.String()
return &result, nil
}
上一篇: 核心API深度分析 下一篇: Components模块详解 - 深入分析组件抽象和实现
更新时间: 2024-12-19 | 文档版本: v1.0