Milvus-03-DataCoord-概览
1. 模块概述
1.1 职责定义
DataCoord(数据协调器)是Milvus数据管理层的核心组件,负责数据的生命周期管理、存储优化和资源调度。
核心职责:
-
Segment生命周期管理
- Segment分配与创建
- Growing → Sealed → Flushed → Indexed状态转换
- Segment元数据维护
-
数据持久化协调
- 触发DataNode Flush操作
- 监控Flush进度
- 管理Binlog文件
-
Compaction调度
- 自动触发Compaction任务
- 分配Compaction资源
- 监控Compaction进度
-
Channel与DataNode管理
- Channel分配与负载均衡
- DataNode注册与心跳监控
- 故障检测与恢复
-
索引管理协调
- 触发索引构建任务
- 监控索引构建进度
- 管理索引文件元数据
-
垃圾回收(GC)
- 清理过期Segment
- 清理无效Binlog文件
- 回收Object Storage空间
1.2 输入与输出
输入:
- Proxy:AssignSegmentID请求(数据写入)
- DataNode:Flush完成通知、Compaction进度
- RootCoord:Collection/Partition DDL事件
- etcd/TiKV:Segment元数据持久化
- Object Storage:Binlog/Deltalog文件元数据
输出:
- DataNode:Flush/Compaction任务
- QueryCoord:Handoff通知(Segment状态变更)
- etcd/TiKV:元数据更新
- 监控系统:Metrics指标
1.3 上下游依赖
flowchart TD
RC[RootCoord] -->|DDL事件| DC[DataCoord]
P[Proxy] -->|AssignSegmentID| DC
DN1[DataNode1] -.->|Flush完成| DC
DN2[DataNode2] -.->|Compaction进度| DC
DC -->|Flush任务| DN1
DC -->|Compaction任务| DN2
DC -->|Handoff通知| QC[QueryCoord]
DC <-->|元数据读写| ETCD[etcd/TiKV]
DC -->|清理文件| OS[Object Storage]
DC -->|Metrics| Prom[Prometheus]
style DC fill:#f9f,stroke:#333,stroke-width:4px
依赖关系说明:
| 依赖组件 | 依赖类型 | 用途 |
|---|---|---|
| RootCoord | 强依赖 | 获取Collection元信息、DDL事件订阅 |
| DataNode | 强依赖 | 执行Flush/Compaction任务 |
| QueryCoord | 弱依赖 | 通知Segment状态变更(Handoff) |
| etcd/TiKV | 强依赖 | 元数据持久化与Watch |
| Object Storage | 强依赖 | Binlog文件管理 |
1.4 生命周期
stateDiagram-v2
[*] --> Created: CreateServer()
Created --> Initializing: Init()
Initializing --> Initializing: 初始化元数据/连接
Initializing --> Healthy: Start()
Healthy --> Healthy: 处理请求
Healthy --> Stopping: Stop()
Stopping --> [*]: 清理资源
note right of Initializing
1. 连接etcd
2. 加载元数据
3. 初始化SegmentManager
4. 启动Compaction
5. 启动GC
end note
note right of Healthy
1. 接收Segment分配请求
2. 调度Flush任务
3. 调度Compaction任务
4. 处理DataNode心跳
5. 执行垃圾回收
end note
生命周期阶段:
| 阶段 | 状态码 | 主要操作 | 耗时 |
|---|---|---|---|
| Created | - | 创建Server实例 | <1ms |
| Initializing | StateCode_Initializing | 连接依赖组件、加载元数据 | 1-5s |
| Healthy | StateCode_Healthy | 正常服务 | - |
| Stopping | StateCode_Abnormal | 停止后台任务、关闭连接 | 1-3s |
2. 架构设计
2.1 整体服务架构图
flowchart TB
subgraph Upstream["上游组件"]
Proxy[Proxy<br/>数据写入入口]
RC[RootCoord<br/>DDL协调器]
QC[QueryCoord<br/>查询协调器]
end
subgraph DataCoord["DataCoord Server"]
subgraph APILayer["API 层"]
GRPC[gRPC Server<br/>对外接口]
end
subgraph CoreManagers["核心管理器"]
direction TB
SM[SegmentManager<br/>━━━━━━━━━━━━<br/>• Segment分配<br/>• 状态管理<br/>• Seal策略]
CM[ChannelManager<br/>━━━━━━━━━━━━<br/>• Channel分配<br/>• 负载均衡<br/>• 故障转移]
Meta[Meta<br/>━━━━━━━━━━━━<br/>• 内存缓存<br/>• 元数据持久化<br/>• 事务支持]
end
subgraph TaskSchedulers["任务调度层"]
direction LR
CTM[CompactionTriggerManager<br/>━━━━━━━━━━━━<br/>• L0 Compaction<br/>• Mix Compaction<br/>• Clustering]
CI[CompactionInspector<br/>━━━━━━━━━━━━<br/>• 任务调度<br/>• 进度监控<br/>• 结果处理]
II[IndexInspector<br/>━━━━━━━━━━━━<br/>• 索引任务调度<br/>• 构建监控]
end
subgraph BackgroundServices["后台服务"]
GC[GarbageCollector<br/>━━━━━━━━━━━━<br/>• Dropped Segment清理<br/>• Binlog文件回收<br/>• 索引文件清理]
Handler[Handler<br/>━━━━━━━━━━━━<br/>• Collection信息缓存<br/>• RootCoord交互<br/>• QueryCoord交互]
end
GRPC --> SM
GRPC --> CM
GRPC --> Meta
GRPC --> CTM
SM --> Meta
CM --> Meta
CTM --> Meta
CI --> Meta
II --> Meta
GC --> Meta
Handler --> Meta
end
subgraph Downstream["下游组件"]
DN1[DataNode-1<br/>━━━━━━<br/>数据写入]
DN2[DataNode-2<br/>━━━━━━<br/>数据写入]
DNn[DataNode-N<br/>━━━━━━<br/>数据写入]
Cluster[Cluster Manager<br/>━━━━━━━━━━━━<br/>DataNode通信层]
end
subgraph Storage["存储层"]
direction TB
ETCD[(etcd/TiKV<br/>━━━━━━━━<br/>元数据存储)]
S3[(Object Storage<br/>━━━━━━━━<br/>MinIO/S3<br/>Binlog文件)]
end
%% 上游到 DataCoord
Proxy -->|AssignSegmentID<br/>获取Segment分配| GRPC
RC -->|WatchChannels<br/>创建Collection| GRPC
RC -->|DropCollection<br/>删除Collection| GRPC
QC -->|GetSegmentInfo<br/>查询Segment| GRPC
%% DataCoord 到下游
CM -->|Watch/Release Channel<br/>分配/释放| Cluster
CI -->|Compaction Task<br/>压缩任务| Cluster
SM -->|Flush Command<br/>刷盘命令| Cluster
Cluster --> DN1
Cluster --> DN2
Cluster --> DNn
%% DataCoord 到存储
Meta <-->|Save/Load<br/>Segment Meta| ETCD
CM <-->|Save/Load<br/>Channel Meta| ETCD
GC -->|Delete<br/>Binlog Files| S3
%% DataNode 反馈
DN1 -.->|FlushCompleted<br/>CompactionCompleted| GRPC
DN2 -.->|FlushCompleted<br/>CompactionCompleted| GRPC
DNn -.->|FlushCompleted<br/>CompactionCompleted| GRPC
style DataCoord fill:#e1f5ff,stroke:#1976d2,stroke-width:3px
style CoreManagers fill:#fff4e6,stroke:#f57c00,stroke-width:2px
style TaskSchedulers fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style BackgroundServices fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
style Upstream fill:#ffebee,stroke:#c62828,stroke-width:2px
style Downstream fill:#e0f2f1,stroke:#00695c,stroke-width:2px
style Storage fill:#fce4ec,stroke:#880e4f,stroke-width:2px
架构分层说明:
| 层次 | 组件 | 职责 | 交互方式 |
|---|---|---|---|
| API层 | gRPC Server | 对外提供RPC接口 | 同步调用 |
| 核心管理器 | SegmentManager | Segment生命周期管理 | 内存+持久化 |
| ChannelManager | Channel分配与负载均衡 | 内存+持久化 | |
| Meta | 元数据缓存与持久化 | 读多写少 | |
| 任务调度层 | CompactionTriggerManager | 自动触发Compaction | 定时+事件触发 |
| CompactionInspector | Compaction任务调度 | 异步调度 | |
| IndexInspector | 索引任务调度 | 异步调度 | |
| 后台服务 | GarbageCollector | 垃圾回收 | 定时清理 |
| Handler | 跨组件通信 | RPC调用 |
2.2 模块交互矩阵
下表展示了DataCoord内部各模块之间的依赖关系和交互方式:
| 调用方 ↓ 被调方 → | SegmentManager | ChannelManager | Meta | CompactionTrigger | GarbageCollector |
|---|---|---|---|---|---|
| API Layer | 分配Segment 同步调用 |
Watch/Release 同步调用 |
查询元数据 同步调用 |
手动触发 同步调用 |
- |
| SegmentManager | - | 查询DataNode 同步调用 |
读写Segment元数据 同步调用 |
- | - |
| ChannelManager | - | - | 读写Channel元数据 同步调用 |
- | - |
| CompactionTrigger | 获取待压缩Segment 同步调用 |
- | 读取元数据 同步调用 |
- | - |
| GarbageCollector | - | - | 查询Dropped Segment 删除元数据 同步调用 |
- | - |
2.3 数据流转示意图
flowchart LR
subgraph Write["写入流程"]
W1[Proxy请求分配] --> W2[SegmentManager分配]
W2 --> W3[Meta持久化]
W3 --> W4[返回SegmentID]
W4 --> W5[DataNode写入数据]
W5 --> W6[Segment达到阈值]
end
subgraph Flush["Flush流程"]
F1[SegmentManager检测] --> F2[Seal Segment]
F2 --> F3[通知DataNode Flush]
F3 --> F4[DataNode写Binlog]
F4 --> F5[更新Meta状态]
F5 --> F6[通知QueryCoord]
end
subgraph Compact["Compaction流程"]
C1[CompactionTrigger扫描] --> C2[计算收益]
C2 --> C3[提交CompactionInspector]
C3 --> C4[DataNode合并]
C4 --> C5[写新Segment]
C5 --> C6[Drop旧Segment]
end
subgraph GC["GC流程"]
G1[GC扫描Dropped] --> G2[删除Binlog]
G2 --> G3[删除Meta]
end
W6 --> F1
F6 --> C1
C6 --> G1
style Write fill:#e3f2fd
style Flush fill:#fff3e0
style Compact fill:#f3e5f5
style GC fill:#e8f5e9
架构层次:
- API层:gRPC接口,处理外部请求
- 核心管理器:Segment、Channel、Meta管理
- 调度器:Compaction、索引、统计任务调度
- 后台服务:GC、心跳监控、时间同步
2.2 核心组件说明
2.2.1 SegmentManager
职责:Segment分配与生命周期管理
核心功能:
AllocSegment:为数据写入分配SegmentSealSegment:将Growing Segment标记为SealedDropSegment:删除Segment元数据GetFlushableSegments:获取可Flush的Segment列表
Segment状态机:
stateDiagram-v2
[*] --> Growing: AllocSegment
Growing --> Sealed: SealSegment(满或超时)
Sealed --> Flushing: DataNode开始Flush
Flushing --> Flushed: Flush完成
Flushed --> Indexed: 索引构建完成
Indexed --> Dropped: DropSegment
Dropped --> [*]: GC清理
Growing --> Dropped: Collection被删除
Sealed --> Dropped: Collection被删除
note right of Growing
状态:内存中
操作:接受新数据写入
大小:动态增长
end note
note right of Flushed
状态:已持久化
操作:只读
位置:Object Storage
end note
Segment分配策略:
// 分配策略:优先复用未满Segment,不存在则创建新Segment
func (m *SegmentManager) AllocSegment(collectionID, partitionID int64, channelName string, count int64) ([]*Allocation, error) {
// 1. 查找未满的Growing Segment
segment := m.meta.GetGrowingSegment(collectionID, partitionID, channelName)
// 2. 检查容量是否足够
if segment != nil && segment.AvailableSize >= count {
allocation := &Allocation{
SegmentID: segment.ID,
NumOfRows: count,
ExpireTime: time.Now().Add(SegmentExpireDuration),
}
return []*Allocation{allocation}, nil
}
// 3. 创建新Segment
newSegmentID, err := m.allocator.AllocID()
if err != nil {
return nil, err
}
segment = &SegmentInfo{
SegmentID: newSegmentID,
CollectionID: collectionID,
PartitionID: partitionID,
State: commonpb.SegmentState_Growing,
MaxSize: SegmentMaxSize, // 默认512MB
Channel: channelName,
}
m.meta.AddSegment(segment)
return []*Allocation{{SegmentID: newSegmentID, NumOfRows: count}}, nil
}
2.2.2 ChannelManager
职责:管理DML Channel与DataNode的映射关系
核心功能:
Watch:为Channel分配DataNodeRelease:释放Channel(Collection被删除)GetDataNode:查询Channel对应的DataNodeBalance:负载均衡(将Channel迁移到其他DataNode)
Channel分配示意:
flowchart LR
C1[Collection1<br/>4 Channels] --> VC1[vchan_0]
C1 --> VC2[vchan_1]
C1 --> VC3[vchan_2]
C1 --> VC4[vchan_3]
VC1 -.->|Watch| DN1[DataNode1]
VC2 -.->|Watch| DN2[DataNode2]
VC3 -.->|Watch| DN1
VC4 -.->|Watch| DN3[DataNode3]
DN1 -->|Flush| S1[Segment1-3]
DN2 -->|Flush| S2[Segment4-5]
DN3 -->|Flush| S3[Segment6-7]
style C1 fill:#e1f5ff
style DN1 fill:#fff4e6
style DN2 fill:#fff4e6
style DN3 fill:#fff4e6
2.2.3 Meta
职责:管理Segment元数据的内存缓存与持久化
数据结构:
type meta struct {
sync.RWMutex
// Segment元数据:segmentID -> SegmentInfo
segments map[UniqueID]*SegmentInfo
// Collection -> Segments映射
collections map[UniqueID][]UniqueID
// Channel -> Segments映射
channelSegments map[string][]UniqueID
// 元数据持久化层
catalog metastore.DataCoordCatalog
// Collection信息缓存
collectionInfos map[UniqueID]*collectionInfo
}
元数据操作:
| 操作 | 说明 | 持久化 |
|---|---|---|
AddSegment |
添加新Segment | 是 |
UpdateSegment |
更新Segment状态 | 是 |
DropSegment |
删除Segment | 是 |
GetSegment |
查询Segment信息 | 否(内存) |
ListSegments |
列出Collection的Segment | 否(内存) |
2.2.4 CompactionTrigger
职责:自动触发Compaction任务
触发策略:
- 时间触发:定期扫描(每10分钟)
- 事件触发:Segment Flush完成后检查
- 手动触发:用户主动调用API
Compaction类型:
flowchart TD
Start[扫描Segment] --> CheckDelete{Delete比例>20%}
CheckDelete -->|是| MixCompaction[MixCompaction<br/>合并Delete Delta]
CheckDelete -->|否| CheckSize{小文件数量>10}
CheckSize -->|是| MergeCompaction[MergeCompaction<br/>合并小Segment]
CheckSize -->|否| CheckCluster{是否启用Clustering}
CheckCluster -->|是| ClusteringCompaction[ClusteringCompaction<br/>按标量字段聚类]
CheckCluster -->|否| End[不触发]
MixCompaction --> Schedule[提交到CompactionInspector]
MergeCompaction --> Schedule
ClusteringCompaction --> Schedule
Schedule --> End
style MixCompaction fill:#ffe6e6
style MergeCompaction fill:#e6f3ff
style ClusteringCompaction fill:#e6ffe6
Compaction收益评估:
// 计算Compaction收益分数
func (t *compactionTrigger) calculateScore(segments []*SegmentInfo) float64 {
var totalSize int64
var totalDeleted int64
var fragmentScore float64
for _, seg := range segments {
totalSize += seg.Size
totalDeleted += seg.NumOfRows * seg.DeleteRatio
// 碎片化分数:小文件越多,分数越高
if seg.Size < SmallSegmentThreshold {
fragmentScore += 1.0
}
}
deleteRatio := float64(totalDeleted) / float64(totalSize)
// 综合分数 = 删除比例 * 0.7 + 碎片化分数 * 0.3
return deleteRatio*0.7 + (fragmentScore/float64(len(segments)))*0.3
}
2.2.5 GarbageCollector
职责:清理无效数据,回收存储空间
清理对象:
- Dropped Segment的Binlog
- Compaction后的旧Segment
- 过期的临时文件
- 孤立的索引文件
GC流程:
sequenceDiagram
autonumber
participant GC as GarbageCollector
participant Meta as Meta
participant OS as Object Storage
loop 每30分钟
GC->>Meta: 扫描Dropped Segment
Meta-->>GC: [seg1, seg2, ...]
loop 每个Segment
GC->>Meta: 获取Segment的Binlog路径
Meta-->>GC: [binlog1, binlog2, ...]
GC->>OS: 检查文件是否存在
OS-->>GC: 存在
GC->>OS: 删除Binlog文件
OS-->>GC: 删除成功
GC->>Meta: 移除Segment元数据
Meta-->>GC: 成功
end
end
3. 核心流程详解
3.1 AssignSegmentID - Segment分配流程
3.1.1 整体时序图
sequenceDiagram
autonumber
participant P as Proxy
participant DC as DataCoord::Server
participant H as Handler
participant SM as SegmentManager
participant Meta as Meta
participant Alloc as Allocator
participant ETCD as etcd/TiKV
P->>+DC: AssignSegmentID(requests)
Note over DC: 检查健康状态
loop 处理每个请求
DC->>+H: GetCollection(collectionID)
H->>RC: DescribeCollection(collectionID)
RC-->>H: CollectionInfo(schema, properties)
H-->>-DC: CollectionInfo
DC->>+SM: AllocSegment(collID, partID, channel, count, version)
Note over SM: 加Channel锁<br/>确保串行分配
SM->>+Meta: GetGrowingSegments(channel)
Note over Meta: 从内存缓存查询<br/>channel -> segments映射
Meta-->>-SM: [segment1, segment2, ...]
alt 存在可用Segment
Note over SM: 按可用空间排序<br/>优先使用最满的Segment
SM->>SM: 检查容量是否充足
alt 容量足够
SM->>Meta: 更新分配记录
SM-->>DC: Allocation(existingSegID, count)
else 容量不足需要新Segment
SM->>+Alloc: AllocID()
Alloc->>ETCD: IncrementBy(1)
ETCD-->>Alloc: new_id
Alloc-->>-SM: new_segment_id
SM->>SM: 构建SegmentInfo
Note over SM: state=Growing<br/>maxRows=1000000<br/>maxSize=512MB
SM->>+Meta: AddSegment(newSegment)
Meta->>Meta: 更新内存索引
Note over Meta: 更新 segments map<br/>更新 channel2Growing map<br/>更新 collection2Segments map
Meta->>+ETCD: SaveSegment(segmentInfo)
Note over ETCD: key: segment/{segID}<br/>事务写入
ETCD-->>-Meta: OK
Meta-->>-SM: Success
SM-->>DC: Allocation(newSegID, count)
end
else 不存在Growing Segment
Note over SM: 同上创建新Segment流程
end
SM-->>-DC: [Allocation...]
DC->>DC: 构建SegmentIDAssignment
Note over DC: segmentID, count<br/>expireTime, collectionID
end
DC-->>-P: AssignSegmentIDResponse
Note over P: 使用分配的SegmentID<br/>写入数据到DataNode
3.1.2 调用链路详解
① API入口 - Server.AssignSegmentID()
// internal/datacoord/services.go:307
func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
// 1. 健康检查
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
return &datapb.AssignSegmentIDResponse{
Status: merr.Status(err),
}, nil
}
// 2. 处理每个分配请求
assigns := make([]*datapb.SegmentIDAssignment, 0, len(req.SegmentIDRequests))
for _, r := range req.SegmentIDRequests {
// 3. 获取Collection信息(从RootCoord)
_, err := s.handler.GetCollection(ctx, r.GetCollectionID())
// 4. 调用SegmentManager执行分配
segmentAllocations, err := s.segmentManager.AllocSegment(ctx,
r.CollectionID, r.PartitionID, r.ChannelName,
int64(r.Count), r.GetStorageVersion())
// 5. 构造返回结果
for _, allocation := range segmentAllocations {
assigns = append(assigns, &datapb.SegmentIDAssignment{
SegID: allocation.SegmentID,
ChannelName: r.ChannelName,
Count: uint32(allocation.NumOfRows),
CollectionID: r.CollectionID,
PartitionID: r.PartitionID,
ExpireTime: allocation.ExpireTime,
})
}
}
return &datapb.AssignSegmentIDResponse{
Status: merr.Success(),
SegIDAssignments: assigns,
}, nil
}
② 核心逻辑 - SegmentManager.AllocSegment()
// internal/datacoord/segment_manager.go:240
func (sm *SegmentManager) AllocSegment(ctx context.Context, collectionID, partitionID UniqueID,
channelName string, requestRows int64, storageVersion int64) ([]*Allocation, error) {
// 1. 加锁:确保同一个Channel的Segment分配串行化
sm.channelLock.Lock(channelName)
defer sm.channelLock.Unlock(channelName)
// 2. 获取分配策略
allocations, err := sm.allocPolicy.apply(collectionID, partitionID,
channelName, requestRows)
// 3. 根据策略执行分配
for _, alloc := range allocations {
if alloc.SegmentID == 0 {
// 需要创建新Segment
segmentID, err := sm.allocator.allocID(ctx)
segment := &SegmentInfo{
SegmentID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channelName,
State: commonpb.SegmentState_Growing,
MaxRowNum: Params.SegmentMaxSize,
NumOfRows: 0,
StorageVersion: storageVersion,
}
// 持久化到Meta
err = sm.meta.AddSegment(ctx, segment)
alloc.SegmentID = segmentID
}
}
return allocations, nil
}
③ 元数据管理 - Meta.AddSegment()
// internal/datacoord/meta.go:450
func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error {
m.Lock()
defer m.Unlock()
// 1. 更新内存索引
m.segments.Insert(segment.GetID(), segment)
// 2. 更新Channel -> Segments映射
if segment.State == commonpb.SegmentState_Growing {
growingSet := m.channel2Growing.GetOrInsert(segment.InsertChannel,
typeutil.NewUniqueSet())
growingSet.Insert(segment.GetID())
}
// 3. 更新Collection -> Segments映射
collSegments := m.collections.GetOrInsert(segment.CollectionID, []int64{})
m.collections.Insert(segment.CollectionID, append(collSegments, segment.GetID()))
// 4. 持久化到etcd
if err := m.catalog.AddSegment(ctx, segment.SegmentInfo); err != nil {
// 回滚内存操作
m.segments.Remove(segment.GetID())
return err
}
return nil
}
3.1.3 关键设计点
1. 并发控制
// 使用Channel级别的锁,避免全局锁竞争
type SegmentManager struct {
channelLock *lock.KeyLock[string] // channel -> lock
// ...
}
// 同一Channel的分配请求串行化
sm.channelLock.Lock(channelName)
defer sm.channelLock.Unlock(channelName)
2. 分配策略
// 优先复用未满的Growing Segment
func (sm *SegmentManager) selectOrCreateSegment(channel string, count int64) (*SegmentInfo, error) {
segments := sm.meta.GetGrowingSegments(channel)
// 按可用空间排序,优先选择最满的Segment(减少碎片)
sort.Slice(segments, func(i, j int) bool {
return segments[i].AvailableSize() < segments[j].AvailableSize()
})
for _, seg := range segments {
if seg.AvailableSize() >= count {
return seg, nil
}
}
// 无可用Segment,创建新的
return sm.createSegment(channel, count)
}
3. 元数据一致性
stateDiagram-v2
[*] --> 内存更新: AddSegment
内存更新 --> etcd持久化: Save
etcd持久化 --> 成功: OK
etcd持久化 --> 回滚内存: Error
回滚内存 --> [*]: 失败
成功 --> [*]: 完成
note right of 内存更新
• 更新 segments map
• 更新 channel2Growing
• 更新 collection2Segments
end note
note right of etcd持久化
事务写入
key: segment/{segID}
end note
3.1.4 性能优化
| 优化点 | 方法 | 效果 |
|---|---|---|
| Channel级锁 | 使用KeyLock[string] | 减少锁竞争,提高并发度 |
| 内存缓存 | 所有Segment信息在内存 | 查询延迟<1ms |
| Segment复用 | 优先填充未满Segment | 减少小文件数量 |
| 批量分配 | 一次请求可分配多个Segment | 减少RPC次数 |
| 异步持久化 | 先更新内存,异步写etcd | 降低P99延迟 |
3.1.5 异常处理
flowchart TD
Start[AssignSegmentID] --> Check{健康检查}
Check -->|失败| Err1[返回Unavailable]
Check -->|成功| GetColl{获取Collection}
GetColl -->|失败| Err2[返回CollectionNotFound]
GetColl -->|成功| AllocSeg{分配Segment}
AllocSeg -->|成功| Return[返回Assignment]
AllocSeg -->|ID分配失败| Err3[返回OutOfID]
AllocSeg -->|Meta持久化失败| Err4[返回InternalError]
Err1 --> End
Err2 --> End
Err3 --> End
Err4 --> End
Return --> End[结束]
style Err1 fill:#ffcdd2
style Err2 fill:#ffcdd2
style Err3 fill:#ffcdd2
style Err4 fill:#ffcdd2
错误码说明:
| 错误 | 原因 | 恢复策略 |
|---|---|---|
| Unavailable | DataCoord未就绪 | 重试 |
| CollectionNotFound | Collection不存在 | 检查DDL操作 |
| OutOfID | ID分配器耗尽 | 联系管理员 |
| InternalError | etcd写入失败 | 检查etcd连接,重试 |
3.2 Flush流程详解
3.2.1 整体时序图
sequenceDiagram
autonumber
participant User as User/Client
participant P as Proxy
participant DC as DataCoord::Server
participant SM as SegmentManager
participant Meta as Meta
participant CM as ChannelManager
participant Cluster as Cluster
participant DN as DataNode
participant S3 as Object Storage
participant QC as QueryCoord
User->>+P: Flush(collectionID)
P->>+DC: Flush(collectionID, segmentIDs)
Note over DC: 手动触发Flush
DC->>+DC: AllocTimestamp()
DC-->>-DC: flushTs
DC->>+DC: flushCollection(collID, flushTs, segIDs)
DC->>+Meta: GetCollection(collID)
Meta-->>-DC: CollectionInfo(channels)
loop 每个VChannel
DC->>+Meta: GetChannelCheckpoint(channel)
Meta-->>-DC: checkpoint
Note over DC: 记录当前Channel位置<br/>确保checkpoint早于segment endTs
alt segIDs为空(Flush全部)
DC->>+SM: SealAllSegments(channel, [])
SM->>+Meta: GetSegmentsByChannel(channel)
Meta-->>-SM: [growing_segments]
loop 每个Growing Segment
SM->>SM: 检查是否满足Seal条件
Note over SM: size>threshold OR<br/>time>timeout
alt 满足条件
SM->>+Meta: UpdateSegment(segID, state=Sealed)
Meta->>Meta: 更新内存状态
Meta->>ETCD: SaveSegment
ETCD-->>Meta: OK
Meta-->>-SM: Success
end
end
SM-->>-DC: [sealed_segment_ids]
else 指定segmentIDs
DC->>+SM: SealAllSegments(channel, segIDs)
Note over SM: 只Seal指定的Segments
SM-->>-DC: [sealed_segment_ids]
end
DC->>+CM: GetNodeChannelsByCollectionID(collID)
CM-->>-DC: nodeChannels{nodeID -> channels}
loop 每个DataNode
DC->>+Cluster: FlushChannels(nodeID, flushTs, channels)
Cluster->>+DN: FlushChannels RPC
Note over DN: 异步执行Flush<br/>写入Binlog
DN->>DN: 遍历所有Sealed Segment
loop 每个Segment
DN->>DN: 序列化Insert数据
DN->>DN: 序列化Delete数据
DN->>DN: 生成Stats Log
DN->>+S3: PutObject(insert_log)
S3-->>-DN: binlog_path
DN->>+S3: PutObject(delete_log)
S3-->>-DN: deltalog_path
DN->>+S3: PutObject(stats_log)
S3-->>-DN: statslog_path
end
DN->>+DC: SegmentFlushCompleted(segmentID, binlogs)
DC->>+Meta: UpdateSegmentBinlogs(segID, binlogs)
Meta->>Meta: 更新Binlog路径
Meta->>+ETCD: SaveSegment(binlogs)
ETCD-->>-Meta: OK
Meta-->>-DC: Success
DC->>+Meta: UpdateSegmentState(segID, Flushed)
Meta-->>-DC: Success
DC-->>-DN: ACK
DN-->>-Cluster: FlushCompleted
Cluster-->>-DC: Success
end
DC->>+QC: SegmentFlushCompleted(segments)
Note over QC: 触发Handoff<br/>加载Segment到QueryNode
QC-->>-DC: ACK
end
DC->>DC: 收集Flush结果
DC-->>-DC: FlushResult
DC-->>-P: FlushResponse(segmentIDs, flushTs)
P-->>-User: Success
3.2.2 调用链路详解
① API入口 - Server.Flush()
// internal/datacoord/services.go:77
func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
// 1. 健康检查
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
return &datapb.FlushResponse{Status: merr.Status(err)}, nil
}
// 2. 分配Flush时间戳(全局顺序保证)
ts, err := s.allocator.AllocTimestamp(ctx)
if err != nil {
return nil, err
}
// 3. 执行Collection级Flush
flushResult, err := s.flushCollection(ctx, req.GetCollectionID(), ts, req.GetSegmentIDs())
if err != nil {
return &datapb.FlushResponse{Status: merr.Status(err)}, nil
}
return &datapb.FlushResponse{
Status: merr.Success(),
SegmentIDs: flushResult.GetSegmentIDs(), // Sealed的Segment
FlushSegmentIDs: flushResult.GetFlushSegmentIDs(), // 已Flushed的Segment
TimeOfSeal: flushResult.GetTimeOfSeal(),
FlushTs: flushResult.GetFlushTs(),
ChannelCps: flushResult.GetChannelCps(), // Channel Checkpoint
}, nil
}
② 核心逻辑 - Server.flushCollection()
// internal/datacoord/services.go:116
func (s *Server) flushCollection(ctx context.Context, collectionID UniqueID,
flushTs uint64, toFlushSegments []UniqueID) (*datapb.FlushResult, error) {
// 1. 获取Collection信息
coll, err := s.handler.GetCollection(ctx, collectionID)
if err != nil {
return nil, err
}
// 2. 获取所有Channel的Checkpoint(必须在Seal之前)
// 确保checkpoint时间早于segment的endTs
channelCPs := make(map[string]*msgpb.MsgPosition, 0)
for _, vchannel := range coll.VChannelNames {
cp := s.meta.GetChannelCheckpoint(vchannel)
channelCPs[vchannel] = cp
}
// 3. Seal所有需要Flush的Segment
sealedSegmentsIDDict := make(map[UniqueID]bool)
for _, channel := range coll.VChannelNames {
sealedSegmentIDs, err := s.segmentManager.SealAllSegments(ctx, channel, toFlushSegments)
if err != nil {
return nil, errors.Wrapf(err, "failed to flush collection %d", collectionID)
}
for _, sealedSegmentID := range sealedSegmentIDs {
sealedSegmentsIDDict[sealedSegmentID] = true
}
}
// 4. 获取所有已Flushed的Segment(用于返回给用户)
segments := s.meta.GetSegmentsOfCollection(ctx, collectionID)
flushSegmentIDs := make([]UniqueID, 0, len(segments))
for _, segment := range segments {
if segment != nil &&
isFlushState(segment.GetState()) &&
segment.GetLevel() != datapb.SegmentLevel_L0 {
flushSegmentIDs = append(flushSegmentIDs, segment.GetID())
}
}
// 5. 通知DataNode执行Flush
nodeChannels := s.channelManager.GetNodeChannelsByCollectionID(collectionID)
for nodeID, channelNames := range nodeChannels {
err = s.cluster.FlushChannels(ctx, nodeID, flushTs, channelNames)
if err != nil {
return nil, err
}
}
return &datapb.FlushResult{
CollectionID: collectionID,
SegmentIDs: lo.Keys(sealedSegmentsIDDict),
FlushSegmentIDs: flushSegmentIDs,
FlushTs: flushTs,
ChannelCps: channelCPs,
}, nil
}
③ Seal逻辑 - SegmentManager.SealAllSegments()
// internal/datacoord/segment_manager.go:450
func (sm *SegmentManager) SealAllSegments(ctx context.Context, channel string,
segIDs []UniqueID) ([]UniqueID, error) {
sm.channelLock.Lock(channel)
defer sm.channelLock.Unlock(channel)
// 1. 获取Channel的所有Growing Segment
segments := sm.meta.GetGrowingSegments(channel)
// 2. 过滤需要Seal的Segment
toSeal := []UniqueID{}
if len(segIDs) > 0 {
// 指定Segment
segIDSet := typeutil.NewUniqueSet(segIDs...)
for _, seg := range segments {
if segIDSet.Contain(seg.GetID()) {
toSeal = append(toSeal, seg.GetID())
}
}
} else {
// 所有Growing Segment
for _, seg := range segments {
toSeal = append(toSeal, seg.GetID())
}
}
// 3. 执行Seal操作
sealed := []UniqueID{}
for _, segID := range toSeal {
segment := sm.meta.GetSegment(ctx, segID)
if segment == nil || segment.GetState() != commonpb.SegmentState_Growing {
continue
}
// 更新状态为Sealed
segment.State = commonpb.SegmentState_Sealed
if err := sm.meta.UpdateSegment(ctx, segment); err != nil {
return nil, err
}
sealed = append(sealed, segID)
// 从Growing索引移除
sm.channel2Growing.GetOrInsert(channel, typeutil.NewUniqueSet()).Remove(segID)
// 加入Sealed索引
sm.channel2Sealed.GetOrInsert(channel, typeutil.NewUniqueSet()).Insert(segID)
}
return sealed, nil
}
④ Binlog更新 - SegmentFlushCompleted()
// internal/datacoord/services.go:1900
func (s *Server) SegmentFlushCompleted(ctx context.Context,
req *datapb.SegmentFlushCompletedRequest) (*commonpb.Status, error) {
segmentID := req.GetSegment().GetID()
// 1. 更新Segment的Binlog路径
segment := s.meta.GetSegment(ctx, segmentID)
if segment == nil {
return merr.Status(merr.WrapErrSegmentNotFound(segmentID)), nil
}
// 2. 更新Binlog、Deltalog、Statslog
segment.Binlogs = req.GetSegment().GetBinlogs()
segment.Deltalogs = req.GetSegment().GetDeltalogs()
segment.Statslogs = req.GetSegment().GetStatslogs()
segment.NumOfRows = req.GetSegment().GetNumOfRows()
// 3. 更新状态为Flushed
segment.State = commonpb.SegmentState_Flushed
segment.DmlPosition = req.GetSegment().GetDmlPosition()
// 4. 持久化
if err := s.meta.UpdateSegment(ctx, segment); err != nil {
return merr.Status(err), nil
}
// 5. 通知QueryCoord Handoff
s.notifyQueryCoordSegmentFlush(ctx, segment)
return merr.Success(), nil
}
3.2.3 Flush触发机制
自动触发条件:
| 触发条件 | 阈值 | 检测周期 | 说明 |
|---|---|---|---|
| Segment大小 | ≥512MB | 实时 | 达到最大容量立即Seal |
| 数据行数 | ≥100万行 | 实时 | 行数上限 |
| Growing时长 | ≥10分钟 | 每分钟 | 避免长时间不Flush |
| Channel空闲 | ≥30秒无数据 | 每秒 | 及时持久化 |
手动触发方式:
# 方式1:Flush指定Collection
client.flush(collection_name="my_collection")
# 方式2:Flush指定Segment
client.flush(collection_name="my_collection", segment_ids=[seg1, seg2])
# 方式3:FlushAll(所有Collection)
# 管理员操作
3.2.4 Flush状态机
stateDiagram-v2
[*] --> Growing: 创建Segment
Growing --> Sealed: Flush触发
Sealed --> Flushing: DataNode开始Flush
Flushing --> Flushed: Binlog写入完成
Flushed --> Indexed: 索引构建完成
Indexed --> [*]: 可被查询
Growing --> Dropped: Collection删除
Sealed --> Dropped: Collection删除
Flushing --> Dropped: Collection删除
note right of Growing
状态:内存中
特征:
• 接受新数据写入
• 大小动态增长
• NumOfRows递增
end note
note right of Sealed
状态:已封闭
特征:
• 不再接受新数据
• 等待DataNode Flush
• DmlPosition已确定
end note
note right of Flushed
状态:已持久化
特征:
• Binlog已写入S3
• 只读
• 可被QueryNode加载
end note
3.2.5 DataNode Flush详细流程
flowchart TD
Start[收到FlushChannels命令] --> Loop{遍历Sealed Segment}
Loop -->|有Segment| GetData[从Insert Buffer获取数据]
GetData --> Serialize[序列化数据]
Serialize --> WriteInsert[写Insert Log]
WriteInsert --> WriteDelete[写Delete Log]
WriteDelete --> WriteStats[写Stats Log]
WriteStats --> CalcSize[计算Segment大小]
CalcSize --> Report[上报FlushCompleted]
Report --> Loop
Loop -->|无Segment| End[完成]
style WriteInsert fill:#e3f2fd
style WriteDelete fill:#fff3e0
style WriteStats fill:#f3e5f5
Binlog文件格式:
├── Insert Log (列式存储)
│ ├── field_0/ (FieldID=0, 主键)
│ │ └── 435978159261483010
│ ├── field_1/ (FieldID=1, 向量字段)
│ │ └── 435978159261483011
│ └── field_2/ (FieldID=2, 标量字段)
│ └── 435978159261483012
│
├── Delete Log (删除记录)
│ └── 435978159261483020
│
└── Stats Log (统计信息)
├── 435978159261483030 (min/max/count)
└── 435978159261483031
3.2.6 性能优化
| 优化点 | 实现方式 | 效果 |
|---|---|---|
| 批量Flush | FlushChannels一次触发多个Segment | 减少RPC开销 |
| 异步写入 | DataNode异步写Binlog | 不阻塞数据写入 |
| Channel Checkpoint | 提前获取,保证顺序 | 避免Checkpoint倒退 |
| 并发Flush | 多个DataNode并行Flush | 提高吞吐 |
| 压缩Binlog | 使用Zstd压缩 | 减少存储空间50% |
3.2.7 异常处理
flowchart TD
Start[Flush请求] --> Check1{Collection存在?}
Check1 -->|否| Err1[返回CollectionNotFound]
Check1 -->|是| Check2{Segment有效?}
Check2 -->|否| Err2[返回SegmentNotFound]
Check2 -->|是| Seal{Seal Segment}
Seal -->|成功| Notify{通知DataNode}
Seal -->|失败| Err3[返回Meta更新失败]
Notify -->|成功| Wait[等待DataNode完成]
Notify -->|超时| Err4[返回Timeout<br/>Segment仍会后台Flush]
Wait -->|完成| Update{更新Meta}
Update -->|成功| Success[返回Success]
Update -->|失败| Err5[返回Meta更新失败<br/>Binlog已写入<br/>后台重试]
style Err1 fill:#ffcdd2
style Err2 fill:#ffcdd2
style Err3 fill:#ffcdd2
style Err4 fill:#fff9c4
style Err5 fill:#fff9c4
style Success fill:#c8e6c9
容错机制:
- Seal失败:保持Growing状态,下次继续触发
- DataNode超时:Segment仍会后台Flush,不影响后续写入
- Binlog写入失败:DataNode重试,最多3次
- Meta更新失败:后台定期重试,直到成功
3.3 Compaction流程详解
3.3.1 整体时序图
sequenceDiagram
autonumber
participant CTM as CompactionTriggerManager
participant Policy as CompactionPolicy
participant CI as CompactionInspector
participant Meta as Meta
participant Cluster as Cluster
participant DN as DataNode
participant S3 as Object Storage
participant QC as QueryCoord
Note over CTM: 定时触发(每10分钟)
CTM->>+CTM: loop() - 后台任务
CTM->>+Policy: Trigger(ctx)
Note over Policy: 扫描Collection<br/>评估Compaction收益
Policy->>+Meta: GetSegmentsOfCollection(collID)
Meta-->>-Policy: [segments]
Policy->>Policy: 按类型生成CompactionView
Note over Policy: • L0 Compaction<br/>• Mix Compaction<br/>• Clustering
Policy->>Policy: 计算收益分数
Note over Policy: DeleteRatio * 0.4<br/>+ FragmentScore * 0.3<br/>+ AgeScore * 0.2
alt 分数 > 阈值
Policy-->>-CTM: [CompactionViews]
loop 每个CompactionView
CTM->>CTM: triggerViewForCompaction()
CTM->>+CI: SubmitTask(CompactionTask)
Note over CI: 加入任务队列
CI->>+Meta: GetSegmentDetails(segmentIDs)
Meta-->>-CI: [SegmentInfo with binlogs]
CI->>CI: 构建CompactionPlan
Note over CI: 选择DataNode<br/>分配资源
CI->>+Cluster: Compaction(nodeID, plan)
Cluster->>+DN: Compaction RPC
Note over DN: 异步执行Compaction
DN->>DN: 创建CompactionTask
loop 每个输入Segment
DN->>+S3: GetObject(insert_binlog)
S3-->>-DN: binlog_data
DN->>+S3: GetObject(delta_binlog)
S3-->>-DN: delta_data
DN->>DN: 反序列化数据
DN->>DN: 应用Delete操作
end
DN->>DN: 合并数据
Note over DN: • 去重<br/>• 删除已标记删除的行<br/>• 按PK排序
alt Mix Compaction
Note over DN: N个Segment -> 1个大Segment
DN->>DN: 合并所有数据到一个Segment
else L0 Compaction
Note over DN: L0 Delta -> L1 Segment
DN->>DN: 将L0删除应用到L1
else Clustering Compaction
Note over DN: 按标量字段聚类
DN->>DN: 按ClusteringKey重新分组
end
DN->>DN: 序列化新Segment
DN->>+S3: PutObject(new_insert_binlog)
S3-->>-DN: new_binlog_path
DN->>+S3: PutObject(new_stats_binlog)
S3-->>-DN: new_statslog_path
DN->>+DC: CompactionCompleted(result)
DC->>+CI: HandleCompactionResult(result)
CI->>+Meta: AddSegment(newSegment)
Meta->>Meta: 更新内存索引
Meta->>+ETCD: SaveSegment
ETCD-->>-Meta: OK
Meta-->>-CI: Success
loop 每个旧Segment
CI->>+Meta: DropSegment(oldSegID)
Meta->>Meta: 标记state=Dropped
Meta->>+ETCD: SaveSegment(state=Dropped)
ETCD-->>-Meta: OK
Meta-->>-CI: Success
end
CI->>+QC: NotifySegmentChange(newSegment, oldSegments)
Note over QC: 触发Segment替换<br/>先加载新Segment<br/>再释放旧Segment
QC-->>-CI: ACK
CI-->>-DC: Success
DC-->>-DN: ACK
DN-->>-Cluster: CompactionCompleted
Cluster-->>-CI: Success
CI->>CI: 更新任务状态
Note over CI: state=Completed
CI-->>-CTM: TaskCompleted
end
else 分数 <= 阈值
Policy-->>CTM: nil (不触发)
end
CTM-->>-CTM: 继续下一轮循环
3.3.2 Compaction类型详解
1. L0 Compaction(删除操作压缩)
flowchart LR
subgraph Input["输入"]
L0[L0 Segment<br/>━━━━━━<br/>Delete Records<br/>PK: 1,3,5,7]
L1[L1 Segment<br/>━━━━━━<br/>Data Records<br/>PK: 1,2,3,4,5,6,7,8]
end
subgraph Process["处理"]
Apply[应用Delete操作<br/>━━━━━━<br/>移除 PK=1,3,5,7]
end
subgraph Output["输出"]
NewL1[New L1 Segment<br/>━━━━━━<br/>PK: 2,4,6,8]
DropL0[Drop L0<br/>标记删除]
DropOldL1[Drop Old L1<br/>标记删除]
end
L0 --> Apply
L1 --> Apply
Apply --> NewL1
Apply --> DropL0
Apply --> DropOldL1
style L0 fill:#ffebee
style L1 fill:#e3f2fd
style NewL1 fill:#c8e6c9
触发条件:
- L0 Segment数量 > 8个
- L0 Segment总大小 > 256MB
- L0 Segment存在时间 > 5分钟
2. Mix Compaction(小文件合并)
flowchart LR
subgraph Input["输入"]
S1[Segment1<br/>64MB]
S2[Segment2<br/>128MB]
S3[Segment3<br/>96MB]
S4[Segment4<br/>80MB]
end
subgraph Process["处理"]
Merge[合并数据<br/>━━━━━━<br/>• 去重<br/>• 应用Delete<br/>• 按PK排序]
end
subgraph Output["输出"]
NewS[New Segment<br/>━━━━━━<br/>368MB<br/>已优化]
end
S1 --> Merge
S2 --> Merge
S3 --> Merge
S4 --> Merge
Merge --> NewS
style Input fill:#fff3e0
style NewS fill:#c8e6c9
触发条件:
- 小Segment(<256MB)数量 > 4个
- Delete比例 > 10%
- 碎片化分数 > 0.5
3. Clustering Compaction(数据聚类)
flowchart TB
subgraph Input["输入数据"]
Data[原始Segment<br/>━━━━━━<br/>随机分布<br/>ClusteringKey无序]
end
subgraph Process["处理"]
Analyze[分析ClusteringKey<br/>━━━━━━<br/>计算分区边界]
Partition[按Key分区<br/>━━━━━━<br/>Range Partitioning]
end
subgraph Output["输出"]
C1[Cluster1<br/>key: 0-100]
C2[Cluster2<br/>key: 100-200]
C3[Cluster3<br/>key: 200-300]
end
Data --> Analyze
Analyze --> Partition
Partition --> C1
Partition --> C2
Partition --> C3
style C1 fill:#e8f5e9
style C2 fill:#e8f5e9
style C3 fill:#e8f5e9
触发条件:
- 用户配置了ClusteringKey
- 手动触发或定时触发
- 数据量 > 1GB
3.3.3 Compaction类型对比
| 类型 | 目的 | 输入 | 输出 | 触发频率 | 收益 |
|---|---|---|---|---|---|
| L0 Compaction | 应用删除操作 | L0 + L1 Segments | New L1 | 高(每10秒) | 减少查询开销 |
| Mix Compaction | 合并小文件 | N个小Segment | 1个大Segment | 中(每10分钟) | 减少文件数 提高查询性能 |
| Clustering | 数据聚类优化 | N个Segment | N个聚类Segment | 低(手动/每天) | 大幅提升 范围查询性能 |
| Sort Compaction | 重新排序 | N个Segment | N个排序Segment | 低(手动) | 优化排序查询 |
3.3.4 调用链路详解
① CompactionTriggerManager.loop()
// internal/datacoord/compaction_trigger_v2.go:212
func (m *CompactionTriggerManager) loop(ctx context.Context) {
// 定时器
l0Ticker := time.NewTicker(Params.DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second))
clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second))
singleTicker := time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second))
for {
select {
case <-ctx.Done():
return
case <-l0Ticker.C:
if !m.l0Policy.Enable() {
continue
}
if m.inspector.isFull() {
log.RatedInfo(10, "Skip trigger l0 compaction since inspector is full")
continue
}
// 触发L0 Compaction
events, err := m.l0Policy.Trigger(ctx)
if err != nil {
log.Warn("Fail to trigger L0 policy", zap.Error(err))
continue
}
// 提交任务
for triggerType, views := range events {
m.notify(ctx, triggerType, views)
}
case <-clusteringTicker.C:
// 触发Clustering Compaction
events, err := m.clusteringPolicy.Trigger(ctx)
for triggerType, views := range events {
m.notify(ctx, triggerType, views)
}
case <-singleTicker.C:
// 触发Mix Compaction
events, err := m.singlePolicy.Trigger(ctx)
for triggerType, views := range events {
m.notify(ctx, triggerType, views)
}
}
}
}
② CompactionPolicy.Trigger()
// internal/datacoord/compaction_policy_single.go:100
func (policy *singleCompactionPolicy) Trigger(ctx context.Context) (map[CompactionTriggerType][]CompactionView, error) {
collections := policy.meta.GetCollections()
views := make([]CompactionView, 0)
for _, collectionID := range collections {
// 获取所有Segment
segments := policy.meta.GetSegmentsOfCollection(ctx, collectionID)
// 按Channel分组
channelSegments := groupSegmentsByChannel(segments)
for channel, segs := range channelSegments {
// 计算Compaction收益
score := policy.calculateScore(segs)
if score > policy.threshold {
// 生成CompactionView
view := &compactionView{
collectionID: collectionID,
channel: channel,
segments: segs,
triggerID: allocator.AllocID(),
}
views = append(views, view)
}
}
}
result := make(map[CompactionTriggerType][]CompactionView)
if len(views) > 0 {
result[TriggerTypeSingle] = views
}
return result, nil
}
// 计算Compaction收益分数
func (policy *singleCompactionPolicy) calculateScore(segments []*SegmentInfo) float64 {
var totalSize, totalRows, deletedRows int64
var smallFileCount int
for _, seg := range segments {
totalSize += seg.Size
totalRows += seg.NumRows
deletedRows += seg.DeletedRows
if seg.Size < SmallSegmentThreshold { // 256MB
smallFileCount++
}
}
deleteRatio := float64(deletedRows) / float64(totalRows)
fragmentScore := float64(smallFileCount) / float64(len(segments))
ageScore := float64(time.Since(segments[0].LastCompactionTime).Hours()) / (30 * 24)
sizeScore := float64(totalSize) / (10 * 1024 * 1024 * 1024) // 10GB
// 加权计算
return deleteRatio*0.4 + fragmentScore*0.3 + ageScore*0.2 + sizeScore*0.1
}
③ CompactionInspector.SubmitTask()
// internal/datacoord/compaction_inspector.go:200
func (ci *CompactionInspector) SubmitTask(task *CompactionTask) error {
// 1. 检查任务队列是否满
if ci.queue.IsFull() {
return errors.New("compaction queue is full")
}
// 2. 选择DataNode
nodeID, err := ci.selectDataNode(task)
if err != nil {
return err
}
// 3. 加入任务队列
task.NodeID = nodeID
task.State = datapb.CompactionTaskState_Pending
ci.queue.Enqueue(task)
// 4. 持久化任务状态
ci.meta.SaveCompactionTask(task)
// 5. 通知Scheduler调度
ci.scheduler.Schedule(task)
return nil
}
3.3.5 性能优化
| 优化点 | 实现 | 效果 |
|---|---|---|
| 并发执行 | 多个DataNode并行Compaction | 10x吞吐提升 |
| 增量读取 | 流式读取Binlog | 减少内存占用 |
| 零拷贝 | 直接操作S3数据 | 减少CPU开销 |
| 智能调度 | 优先级队列 | 高收益任务优先 |
| 资源限制 | 限制并发数和内存 | 避免OOM |
3.3.6 监控指标
# Compaction任务数
milvus_datacoord_compaction_task_num:
labels: [state, type] # Pending/Running/Completed/Failed
# Compaction延迟
milvus_datacoord_compaction_latency:
labels: [type] # L0/Mix/Clustering
unit: seconds
percentiles: [p50, p90, p99]
# Compaction吞吐
milvus_datacoord_compaction_throughput:
labels: [type]
unit: bytes_per_second
# Compaction收益
milvus_datacoord_compaction_benefit:
labels: [type]
description: 节省的存储空间或查询加速比
3.4 WatchChannels流程详解
3.4.1 整体时序图
sequenceDiagram
autonumber
participant RC as RootCoord
participant DC as DataCoord::Server
participant CM as ChannelManager
participant Store as ChannelStore
participant Policy as AssignPolicy
participant Cluster as Cluster
participant DN as DataNode
participant ETCD as etcd
RC->>+DC: WatchChannels(collID, channels, startPos)
Note over RC: 创建Collection时触发
DC->>DC: 健康检查
loop 每个Channel
DC->>DC: NewRWChannel(channelName, collID, startPos)
Note over DC: 构建Channel对象<br/>包含schema、dbProperties
DC->>+CM: Watch(ctx, channel)
Note over CM: 加锁确保串行
CM->>+Store: AddChannel(bufferID, channel)
Note over Store: 先加入Buffer<br/>等待分配给DataNode
Store->>+ETCD: SaveChannelWatchInfo
Note over ETCD: key: channel/{channelName}<br/>state: ToWatch<br/>opID: xxx
ETCD-->>-Store: OK
Store-->>-CM: Success
CM->>+Policy: apply(nodesChannels, bufferChannel)
Note over Policy: 选择DataNode<br/>负载均衡算法
Policy->>Policy: 计算每个Node负载
Policy->>Policy: 选择负载最低的Node
Policy-->>-CM: ChannelOpSet(nodeID, Watch, channel)
CM->>+Store: UpdateChannelWatchInfo
Note over Store: 更新state: ToWatch -> Watching
Store->>+ETCD: SaveChannelWatchInfo(nodeID, state)
ETCD-->>-Store: OK
Store-->>-CM: Success
CM->>+Cluster: NotifyChannelOperation(nodeID, watchInfo)
Cluster->>+DN: WatchDmChannels RPC
Note over DN: DataNode开始订阅消息
DN->>DN: 创建FlowGraph
DN->>DN: 订阅MQ Channel
DN->>DN: Seek到startPosition
DN-->>-Cluster: Success
Cluster-->>-CM: Success
CM->>+Store: UpdateChannelWatchInfo(state=Watched)
Store->>+ETCD: SaveChannelWatchInfo
ETCD-->>-Store: OK
Store-->>-CM: Success
CM-->>-DC: Success
DC->>+ETCD: MarkChannelAdded(channelName)
Note over ETCD: 标记Channel已添加<br/>防止重复创建
ETCD-->>-DC: OK
DC->>+DC: UpdateChannelCheckpoint(channel, startPos)
Note over DC: 初始化Channel Checkpoint
DC-->>-DC: Success
end
DC-->>-RC: WatchChannelsResponse(Success)
3.4.2 Channel分配策略
负载均衡算法:
// internal/datacoord/policy.go:211
func assignNewChannels(availableNodes Assignments, toAssign *NodeChannelInfo,
nodeCount int, totalChannelCount int, exclusiveNodes []int64) *ChannelOpSet {
// 1. 计算目标分布
totalChannelsAfterAssignment := totalChannelCount + len(toAssign.Channels)
baseChannelsPerNode := totalChannelsAfterAssignment / nodeCount
extraChannels := totalChannelsAfterAssignment % nodeCount
// 2. 为每个Node设定目标Channel数
targetChannelCounts := make(map[int64]int)
for _, nodeInfo := range availableNodes {
targetChannelCounts[nodeInfo.NodeID] = baseChannelsPerNode
if extraChannels > 0 {
targetChannelCounts[nodeInfo.NodeID]++
extraChannels--
}
}
// 3. 分配Channel
nodeAssignments := make(map[int64][]RWChannel)
for _, channel := range toAssign.GetChannels() {
// 按当前负载排序
sort.Slice(availableNodes, func(i, j int) bool {
iTotal := len(availableNodes[i].Channels) + len(nodeAssignments[availableNodes[i].NodeID])
jTotal := len(availableNodes[j].Channels) + len(nodeAssignments[availableNodes[j].NodeID])
return iTotal < jTotal
})
// 选择负载最低的Node
bestNode := availableNodes[0]
for _, node := range availableNodes {
currentTotal := len(node.Channels) + len(nodeAssignments[node.NodeID])
if currentTotal < targetChannelCounts[node.NodeID] {
bestNode = node
break
}
}
nodeAssignments[bestNode.NodeID] = append(nodeAssignments[bestNode.NodeID], channel)
}
// 4. 构建操作集
operations := NewChannelOpSet()
for nodeID, channels := range nodeAssignments {
operations.Add(NewChannelOp(nodeID, Watch, channels...))
}
operations.Add(NewChannelOp(bufferID, Delete, toAssign.Channels...))
return operations
}
Channel状态机:
stateDiagram-v2
[*] --> ToWatch: Watch请求
ToWatch --> Watching: 分配到DataNode
Watching --> Watched: DataNode确认
Watched --> ToRelease: Release请求
ToRelease --> Releasing: 通知DataNode
Releasing --> Released: DataNode确认
Released --> [*]: 删除
ToWatch --> Failed: 分配失败
Watching --> Failed: 超时
Failed --> ToWatch: 重试
note right of ToWatch
状态:待分配
位置:Buffer
end note
note right of Watched
状态:已监听
位置:DataNode
操作:正常写入
end note
3.4.3 Channel迁移流程
sequenceDiagram
autonumber
participant CM as ChannelManager
participant OldDN as Old DataNode
participant NewDN as New DataNode
participant Meta as Meta
Note over CM: 触发条件:<br/>• DataNode下线<br/>• 负载均衡<br/>• 手动迁移
CM->>CM: DeleteNode(oldNodeID)
Note over CM: 获取旧Node的所有Channel
loop 每个Channel
CM->>OldDN: UnwatchDmChannels(channel)
OldDN->>OldDN: 停止消费消息
OldDN->>OldDN: Flush未持久化数据
OldDN-->>CM: Unwatched
CM->>CM: assignPolicy(channel)
Note over CM: 选择新的DataNode
CM->>NewDN: WatchDmChannels(channel, checkpoint)
NewDN->>NewDN: 创建FlowGraph
NewDN->>NewDN: 从checkpoint开始消费
NewDN-->>CM: Watched
CM->>Meta: UpdateChannelMapping
Note over Meta: 更新Channel -> DataNode映射
end
3.5 垃圾回收(GC)流程详解
3.5.1 整体时序图
sequenceDiagram
autonumber
participant GC as GarbageCollector
participant Meta as Meta
participant Handler as Handler
participant S3 as Object Storage
participant ETCD as etcd
Note over GC: 定时触发(每30分钟)
GC->>+GC: recycleDroppedSegments()
GC->>+Meta: SelectSegments(state=Dropped)
Meta-->>-GC: [dropped_segments]
GC->>+Meta: SelectSegments(state=All)
Meta-->>-GC: [all_segments]
GC->>GC: 构建compactTo映射
Note over GC: 找出Compaction关系<br/>A, B -> C
GC->>+Handler: ListLoadedSegments()
Handler->>QC: 查询已加载的Segment
QC-->>Handler: [loaded_segments]
Handler-->>-GC: [loaded_segment_ids]
loop 每个Dropped Segment
GC->>GC: checkDroppedSegmentGC(segment)
alt 不满足GC条件
Note over GC: 跳过:<br/>• 未过期<br/>• 子Segment未索引<br/>• CheckPoint未到达<br/>• 仍被加载
GC->>GC: continue
else 满足GC条件
GC->>GC: 收集Binlog路径
Note over GC: • InsertLog<br/>• DeltaLog<br/>• StatsLog<br/>• BM25Log<br/>• TextLog<br/>• IndexFiles
loop 每个Binlog文件
GC->>+S3: DeleteObject(binlog_path)
S3-->>-GC: Success
end
GC->>+Meta: DropSegment(segmentID)
Meta->>Meta: 从内存移除
Meta->>+ETCD: DeleteSegment(segmentID)
ETCD-->>-Meta: OK
Meta-->>-GC: Success
Note over GC: Segment完全删除
end
end
GC-->>-GC: recycleDroppedSegments完成
GC->>+GC: recycleUnusedBinlogFiles()
Note over GC: 清理孤立Binlog
loop 扫描Object Storage
GC->>+S3: ListObjects(prefix="/binlog/")
S3-->>-GC: [object_infos]
loop 每个文件
GC->>GC: ParseSegmentID(file_path)
GC->>+Meta: GetSegment(segmentID)
alt Segment不存在
Meta-->>-GC: nil
alt 文件超过容忍时间
GC->>+S3: DeleteObject(file_path)
S3-->>-GC: Success
Note over GC: 清理孤立文件
end
else Segment存在
Meta-->>GC: segment
GC->>GC: 检查文件是否在Segment的Binlog列表
alt 不在列表中
GC->>+S3: DeleteObject(file_path)
S3-->>-GC: Success
Note over GC: 清理无效引用
end
end
end
end
GC-->>-GC: recycleUnusedBinlogFiles完成
3.5.2 GC条件检查
// internal/datacoord/garbage_collector.go:437
func (gc *garbageCollector) checkDroppedSegmentGC(segment *SegmentInfo,
childSegment *SegmentInfo,
indexSet typeutil.UniqueSet,
cpTimestamp Timestamp) bool {
// 1. 检查过期时间(默认1天)
if !gc.isExpire(segment.GetDroppedAt()) {
return false
}
// 2. 如果是Compaction产生的旧Segment
isCompacted := childSegment != nil || segment.GetCompacted()
if isCompacted {
// 确保子Segment已经索引,避免查询性能下降
if childSegment != nil && !indexSet.Contain(childSegment.GetID()) {
log.RatedInfo(60, "skipping GC when compact target segment is not indexed",
zap.Int64("child segment ID", childSegment.GetID()))
return false
}
}
// 3. 检查Channel Checkpoint
segInsertChannel := segment.GetInsertChannel()
if gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) &&
segment.GetDmlPosition().GetTimestamp() > cpTimestamp {
// Segment的DML位置必须在Channel CP之前
log.RatedInfo(60, "dropped segment dml position after channel cp, skip meta gc",
zap.Uint64("dmlPosTs", segment.GetDmlPosition().GetTimestamp()),
zap.Uint64("channelCpTs", cpTimestamp))
return false
}
return true
}
3.5.3 GC清理对象
| 对象类型 | 检查条件 | 清理时机 | 容忍时间 |
|---|---|---|---|
| Dropped Segment | state=Dropped + 已过期 + 子Segment已索引 + CP已到达 |
满足所有条件 | 1天 |
| 孤立Binlog | Segment不存在 + 文件存在 |
文件修改时间>容忍时间 | 1天 |
| 无效Binlog引用 | Segment存在 + 文件不在Binlog列表 |
立即清理 | 无 |
| 过期索引文件 | Segment已删除 + 索引文件存在 |
Drop后1天 | 1天 |
| 临时文件 | 前缀为temp + 修改时间>2小时 |
立即清理 | 2小时 |
3.5.4 GC性能优化
// 1. 并发删除Binlog
futures := make([]*conc.Future[struct{}], 0)
for _, binlogPath := range binlogPaths {
future := getOrCreateIOPool().Submit(func() (struct{}, error) {
return struct{}{}, gc.option.cli.Remove(ctx, binlogPath)
})
futures = append(futures, future)
}
// 2. 批量等待完成
for _, f := range futures {
if _, err := f.Await(); err != nil {
log.Warn("failed to remove binlog", zap.Error(err))
failed++
}
}
// 3. 限流避免对S3造成压力
rateLimiter := rate.NewLimiter(100, 100) // 每秒100个请求
rateLimiter.Wait(ctx)
4. 关键算法与设计
4.1 Segment分配策略
目标:最大化Segment利用率,减少小文件数量
策略:
- 优先填充未满的Growing Segment
- 单个Channel串行分配(避免并发冲突)
- 预留10%容量(避免频繁Seal)
// Segment分配核心逻辑
func (m *SegmentManager) selectOrCreateSegment(channel string, count int64) (*SegmentInfo, error) {
// 1. 获取Channel的所有Growing Segment
segments := m.meta.GetGrowingSegments(channel)
// 2. 按可用空间排序,优先选择最满的Segment(减少碎片)
sort.Slice(segments, func(i, j int) bool {
return segments[i].AvailableSize < segments[j].AvailableSize
})
// 3. 查找容量足够的Segment
for _, seg := range segments {
if seg.AvailableSize >= count {
return seg, nil
}
}
// 4. 无可用Segment,创建新Segment
return m.createSegment(channel, count)
}
4.2 Flush时机决策
目标:平衡写入延迟与文件大小
决策树:
flowchart TD
Start[收到TimeTick] --> CheckSize{Segment大小>512MB}
CheckSize -->|是| Flush[立即Flush]
CheckSize -->|否| CheckRows{行数>100万}
CheckRows -->|是| Flush
CheckRows -->|否| CheckTime{Growing时长>10分钟}
CheckTime -->|是| Flush
CheckTime -->|否| CheckManual{手动Flush API}
CheckManual -->|是| Flush
CheckManual -->|否| Wait[继续等待]
Flush --> End[Seal Segment]
Wait --> End[保持Growing]
style Flush fill:#ffe6e6
style Wait fill:#e6f3ff
4.3 Compaction优先级算法
目标:优先处理收益最大的Compaction任务
优先级计算公式:
Priority = DeleteRatio * 0.4 + FragmentScore * 0.3 + AgeScore * 0.2 + SizeScore * 0.1
各因子说明:
| 因子 | 权重 | 计算方法 | 说明 |
|---|---|---|---|
| DeleteRatio | 0.4 | deleted_rows / total_rows | 删除比例越高,优先级越高 |
| FragmentScore | 0.3 | small_files / total_files | 小文件比例越高,优先级越高 |
| AgeScore | 0.2 | days_since_last_compaction / 30 | 越久未Compact,优先级越高 |
| SizeScore | 0.1 | total_size / 10GB | 总大小越大,优先级越高 |
实现代码:
// 计算Compaction优先级
func (t *compactionTrigger) calculatePriority(plan *CompactionPlan) float64 {
var totalSize, totalRows, deletedRows int64
var smallFileCount int
for _, seg := range plan.Segments {
totalSize += seg.Size
totalRows += seg.NumRows
deletedRows += seg.DeletedRows
if seg.Size < SmallSegmentThreshold {
smallFileCount++
}
}
deleteRatio := float64(deletedRows) / float64(totalRows)
fragmentScore := float64(smallFileCount) / float64(len(plan.Segments))
ageScore := float64(time.Since(plan.LastCompactionTime).Hours()) / (30 * 24)
sizeScore := float64(totalSize) / (10 * 1024 * 1024 * 1024) // 10GB
return deleteRatio*0.4 + fragmentScore*0.3 + ageScore*0.2 + sizeScore*0.1
}
5. 性能与容量
5.1 性能指标
| 指标 | 数值 | 说明 |
|---|---|---|
| AssignSegmentID延迟 | P50: 5ms, P99: 20ms | 包含元数据查询 |
| Flush触发延迟 | <100ms | 从TimeTick到发送Flush命令 |
| Compaction吞吐 | 100GB/小时/节点 | 取决于DataNode性能 |
| GC扫描周期 | 30分钟 | 可配置 |
| 元数据查询QPS | >10000 | 内存缓存 |
5.2 容量规划
单个DataCoord支持规模:
| 维度 | 容量 | 说明 |
|---|---|---|
| Collection数量 | 1000 | 超过需要分片 |
| Segment数量 | 100万 | 内存占用约10GB |
| Channel数量 | 10000 | 对应DataNode数量 |
| DataNode数量 | 1000 | 心跳监控 |
| Compaction并发 | 100 | 可配置 |
5.3 监控指标
关键Metrics:
# Segment相关
milvus_datacoord_segment_num:
labels: [state, collection] # Growing/Sealed/Flushed/Indexed
milvus_datacoord_segment_size:
labels: [collection]
unit: bytes
# Compaction相关
milvus_datacoord_compaction_task_num:
labels: [state, type] # Pending/Running/Completed/Failed
milvus_datacoord_compaction_latency:
labels: [type] # MixCompaction/MergeCompaction
unit: seconds
# 性能相关
milvus_datacoord_assign_segment_latency:
labels: []
unit: milliseconds
milvus_datacoord_flush_latency:
labels: [collection]
unit: seconds
6. 配置参数
6.1 核心配置
dataCoord:
# Segment配置
segment:
maxSize: 536870912 # 512MB
maxRows: 1000000 # 100万行
sealProportion: 0.9 # 90%触发Seal
assignmentExpiration: 2000 # 分配过期时间(ms)
# Flush配置
flush:
maxIdleDuration: 600 # 10分钟无数据自动Flush
minSizeToFlush: 1048576 # 最小1MB才Flush
# Compaction配置
compaction:
enableAutoCompaction: true
triggerInterval: 600 # 10分钟扫描一次
minSegmentToMerge: 4 # 最少4个Segment才合并
maxSegmentToMerge: 30 # 最多合并30个Segment
# GC配置
gc:
interval: 1800 # 30分钟执行一次
missingTolerance: 86400 # 文件丢失容忍时间(1天)
dropTolerance: 86400 # Segment删除后保留时间(1天)
7. 故障处理
7.1 常见故障
| 故障类型 | 现象 | 原因 | 处理 |
|---|---|---|---|
| Segment分配失败 | AssignSegmentID超时 | etcd不可用 | 重试;检查etcd连接 |
| Flush超时 | Segment长时间Sealed | DataNode故障 | 重新分配Channel |
| Compaction堆积 | 任务pending数量增加 | DataNode性能不足 | 扩容DataNode |
| 元数据不一致 | 查询返回错误数据 | etcd数据损坏 | 重启DataCoord重新加载 |
7.2 降级策略
当etcd不可用时:
- 只读模式:继续提供Segment查询服务(基于内存缓存)
- 禁止新Segment创建
- 延迟Flush和Compaction任务
当DataNode不足时:
- 降低Flush触发阈值(减少Growing Segment)
- 暂停Compaction任务
- 告警通知运维扩容
8. 最佳实践
8.1 Segment大小调优
推荐配置:
| 数据特征 | Segment大小 | 原因 |
|---|---|---|
| 高频写入 | 256MB | 减少Flush延迟 |
| 低频写入 | 512MB | 减少小文件数量 |
| 大向量(>1024维) | 1GB | 提高检索效率 |
| 小向量(<128维) | 256MB | 平衡内存与性能 |
8.2 Compaction策略
定期手动Compaction:
# 在业务低峰期(如凌晨2点)手动触发
curl -X POST "http://datacoord:13333/api/v1/compaction" \
-d '{"collectionID": 123}'
监控Compaction积压:
# 告警规则:Compaction任务pending数量>50
milvus_datacoord_compaction_task_num{state="pending"} > 50
8.3 容量规划
估算公式:
Segment数量 = 数据总大小 / Segment大小
内存占用 = Segment数量 * 10KB # 每个Segment元数据约10KB
示例:
- 数据总量:10TB
- Segment大小:512MB
- Segment数量:10TB / 512MB = 20,000个
- 元数据内存:20,000 * 10KB = 200MB
相关文档:
- Milvus-00-总览.md
- Milvus-03-DataCoord-API.md (待生成)
- Milvus-03-DataCoord-数据结构.md (待生成)
- Milvus-03-DataCoord-时序图.md (待生成)
- Milvus-05-DataNode-概览.md (待生成)
Milvus-03-DataCoord-详细文档
本文档整合DataCoord的API、数据结构和关键时序图。
📋 目录
1. 核心API
1.1 AssignSegmentID
功能:为数据写入分配Segment
请求参数:
type AssignSegmentIDRequest struct {
SegmentIDRequests []*SegmentIDRequest
}
type SegmentIDRequest struct {
CollectionID int64
PartitionID int64
ChannelName string
Count uint32 // 需要写入的行数
}
响应参数:
type SegmentIDAssignment struct {
SegID int64 // 分配的SegmentID
ChannelName string
Count uint32
CollectionID int64
PartitionID int64
ExpireTime uint64 // 分配过期时间
}
核心代码:
func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
assigns := make([]*datapb.SegmentIDAssignment, 0)
for _, r := range req.SegmentIDRequests {
// 调用SegmentManager分配
allocations, err := s.segmentManager.AllocSegment(ctx,
r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count), r.StorageVersion)
for _, allocation := range allocations {
assigns = append(assigns, &datapb.SegmentIDAssignment{
SegID: allocation.SegmentID,
ChannelName: r.ChannelName,
Count: uint32(allocation.NumOfRows),
CollectionID: r.CollectionID,
PartitionID: r.PartitionID,
ExpireTime: allocation.ExpireTime,
})
}
}
return &datapb.AssignSegmentIDResponse{
Status: merr.Success(),
SegIDAssignments: assigns,
}, nil
}
1.2 Flush
功能:触发DataNode Flush操作
请求参数:
type FlushRequest struct {
DbID int64
CollectionID int64
SegmentIDs []int64 // 指定要Flush的Segment(可选)
}
核心代码:
func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
// 1. 分配Flush时间戳
ts, err := s.allocator.AllocTimestamp(ctx)
// 2. 执行Flush
flushResult, err := s.flushCollection(ctx, req.CollectionID, ts, req.SegmentIDs)
return &datapb.FlushResponse{
Status: merr.Success(),
SegmentIDs: flushResult.SegmentIDs,
FlushSegmentIDs: flushResult.FlushSegmentIDs,
TimeOfSeal: flushResult.TimeOfSeal,
FlushTs: flushResult.FlushTs,
}, nil
}
func (s *Server) flushCollection(ctx context.Context, collID int64, ts Timestamp, segIDs []int64) (*datapb.FlushResult, error) {
// 1. 获取所有Channel
channels := s.channelManager.GetChannelsByCollectionID(collID)
// 2. 为每个Channel触发Flush
for _, channel := range channels {
segments := s.meta.GetSegmentsByChannel(channel)
// 过滤:只Flush Growing Segment
growingSegments := filterGrowingSegments(segments)
// 调用DataNode Flush
s.cluster.Flush(ctx, channel, growingSegments)
}
return &datapb.FlushResult{...}, nil
}
1.3 GetSegmentInfo
功能:查询Segment元信息
请求参数:
type GetSegmentInfoRequest struct {
SegmentIDs []int64
IncludeUnHealthy bool
}
响应参数:
type SegmentInfo struct {
ID int64
CollectionID int64
PartitionID int64
InsertChannel string
NumOfRows int64
State commonpb.SegmentState // Growing/Sealed/Flushed
MaxRowNum int64
LastExpireTime uint64
StartPosition *msgpb.MsgPosition
DmlPosition *msgpb.MsgPosition
Binlogs []*FieldBinlog
Statslogs []*FieldBinlog
Deltalogs []*FieldBinlog
}
2. 数据结构
2.1 核心数据结构UML
classDiagram
class Server {
+meta *meta
+segmentManager Manager
+channelManager ChannelManager
+compactionTrigger trigger
+garbageCollector *garbageCollector
+AssignSegmentID(req) response
+Flush(req) response
}
class meta {
-sync.RWMutex mu
-map segments
-map collections
-Catalog catalog
+AddSegment(segment)
+UpdateSegment(segment)
+GetSegment(id) segment
}
class SegmentInfo {
+int64 SegmentID
+int64 CollectionID
+SegmentState State
+int64 NumRows
+int64 MaxRowNum
+string InsertChannel
+[]FieldBinlog Binlogs
}
class SegmentManager {
-meta *meta
-allocator Allocator
+AllocSegment(collID, count) allocation
+SealSegment(segID)
+DropSegment(segID)
}
class ChannelManager {
-map channels
-NodeManager nodeManager
+Watch(channel) error
+Release(channel)
+GetDataNode(channel) nodeID
}
Server *-- meta
Server *-- SegmentManager
Server *-- ChannelManager
meta *-- SegmentInfo
SegmentManager --> meta
2.2 SegmentInfo详解
type SegmentInfo struct {
// 标识
SegmentID int64
CollectionID int64
PartitionID int64
// 状态
State commonpb.SegmentState // Growing/Sealed/Flushed/Dropped
// 容量
NumOfRows int64 // 当前行数
MaxRowNum int64 // 最大行数(默认100万)
// Channel
InsertChannel string // 所属DML Channel
// 时间信息
StartPosition *msgpb.MsgPosition // 起始消息位置
DmlPosition *msgpb.MsgPosition // 最新DML位置
LastExpireTime uint64 // 最后过期时间
// Binlog文件
Binlogs []*FieldBinlog // Insert Log
Statslogs []*FieldBinlog // Stats Log
Deltalogs []*FieldBinlog // Delete Log
// 索引
IndexInfos []*IndexInfo
// Compaction
CompactionFrom []int64 // 由哪些Segment Compact而来
// 大小
Size int64 // 字节数
}
// FieldBinlog 字段Binlog
type FieldBinlog struct {
FieldID int64
Binlogs []*Binlog
}
type Binlog struct {
EntriesNum int64
TimestampFrom uint64
TimestampTo uint64
LogPath string // Object Storage路径
LogSize int64
}
2.3 Segment状态机
Growing → Sealed → Flushing → Flushed → Indexed → Dropped
状态说明:
- Growing: 内存中,接受新数据
- Sealed: 已封闭,不再接受新数据
- Flushing: Flush进行中
- Flushed: 已持久化到Object Storage
- Indexed: 索引构建完成
- Dropped: 已标记删除,等待GC
2.4 SegmentManager
type SegmentManager struct {
meta *meta
allocator allocator.Allocator
mu sync.RWMutex
// Channel → Growing Segments
segments map[string][]*SegmentInfo
}
// AllocSegment 分配Segment
func (sm *SegmentManager) AllocSegment(ctx context.Context, collID, partID int64, channelName string, count int64, version int64) ([]*Allocation, error) {
// 1. 查找未满的Growing Segment
segment := sm.getGrowingSegment(channelName, collID, partID)
if segment != nil && segment.AvailableSize() >= count {
// 复用现有Segment
return []*Allocation{{
SegmentID: segment.ID,
NumOfRows: count,
ExpireTime: time.Now().Add(SegmentExpireDuration).Unix(),
}}, nil
}
// 2. 创建新Segment
newSegID, err := sm.allocator.allocID(ctx)
if err != nil {
return nil, err
}
newSegment := &SegmentInfo{
SegmentID: newSegID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: channelName,
State: commonpb.SegmentState_Growing,
MaxRowNum: Params.SegmentMaxSize,
NumOfRows: 0,
}
// 3. 持久化
err = sm.meta.AddSegment(ctx, newSegment)
return []*Allocation{{SegmentID: newSegID, NumOfRows: count}}, nil
}
3. 关键时序图
3.1 AssignSegmentID时序图
sequenceDiagram
participant P as Proxy
participant DC as DataCoord
participant SM as SegmentManager
participant Meta as Meta
participant ETCD as etcd
P->>DC: AssignSegmentID(coll, channel, count=1000)
DC->>SM: AllocSegment(coll, partition, channel, 1000)
SM->>Meta: GetGrowingSegment(channel)
Meta-->>SM: segment or nil
alt Segment存在且未满
SM-->>DC: [existing_seg_id]
else 创建新Segment
SM->>DC: AllocID()
DC-->>SM: new_seg_id
SM->>Meta: AddSegment(new_segment)
Meta->>ETCD: Save(segment_meta)
ETCD-->>Meta: OK
Meta-->>SM: Success
SM-->>DC: [new_seg_id]
end
DC-->>P: SegmentIDAssignment
3.2 Flush时序图
sequenceDiagram
participant DN as DataNode
participant DC as DataCoord
participant Meta as Meta
participant Cluster as Cluster(DataNode Manager)
participant QC as QueryCoord
DN->>DC: DataNodeTtMsg(channel, ts)
Note over DC: 周期性收到DataNode心跳
DC->>DC: GetFlushableSegments(channel, ts)
Note over DC: 过滤:size>threshold 或 time>timeout
alt 有可Flush的Segment
DC->>Meta: UpdateSegmentState(segments, Sealed)
Meta-->>DC: Success
DC->>Cluster: Flush(nodeID, channel, segments)
Cluster->>DN: FlushSegments RPC
DN->>DN: 序列化数据
DN->>DN: 写入Object Storage(Binlog)
DN->>DC: FlushCompleted(segments, binlog_paths)
DC->>Meta: UpdateSegment(state=Flushed, binlogs)
Meta-->>DC: Success
DC->>QC: SegmentFlushCompleted(segments)
Note over QC: 触发Handoff
end
3.3 Compaction时序图
sequenceDiagram
participant CT as CompactionTrigger
participant CI as CompactionInspector
participant Meta as Meta
participant DN as DataNode
participant OS as Object Storage
loop 每10分钟
CT->>Meta: 扫描所有Segment
Meta-->>CT: [all_segments]
CT->>CT: 计算Compaction收益
alt 分数>阈值
CT->>CI: SubmitTask(CompactionPlan)
CI->>Meta: GetSegmentDetails
Meta-->>CI: [segment_details]
CI->>DN: Compaction(plan)
DN->>OS: 读取旧Segment Binlog
OS-->>DN: binlog_data
DN->>DN: 合并数据、删除重复/已删除记录
DN->>OS: 写入新Segment Binlog
OS-->>DN: Success
DN->>CI: CompactionCompleted(new_segment)
CI->>Meta: AddSegment(new_segment)
CI->>Meta: DropSegment(old_segments)
Meta-->>CI: Success
end
end
3.4 GC流程时序图
sequenceDiagram
participant GC as GarbageCollector
participant Meta as Meta
participant OS as Object Storage
loop 每30分钟
GC->>Meta: 扫描Dropped Segment
Meta-->>GC: [seg1, seg2, ...]
loop 每个Segment
GC->>Meta: GetSegmentBinlogs(segID)
Meta-->>GC: [binlog_paths]
GC->>OS: DeleteFiles(binlog_paths)
OS-->>GC: Success
GC->>Meta: RemoveSegment(segID)
Meta-->>GC: Success
end
end
4. 关键算法
4.1 Segment分配策略
// 优先级:
// 1. 优先复用未满的Growing Segment(减少小文件)
// 2. 单个Channel串行分配(避免并发冲突)
// 3. 预留10%容量(避免频繁Seal)
func (sm *SegmentManager) selectOrCreateSegment(channel string, count int64) (*SegmentInfo, error) {
segments := sm.getGrowingSegments(channel)
// 按可用空间排序,优先选择最满的Segment
sort.Slice(segments, func(i, j int) bool {
return segments[i].AvailableSize() < segments[j].AvailableSize()
})
for _, seg := range segments {
if seg.AvailableSize() >= count {
return seg, nil
}
}
// 无可用Segment,创建新的
return sm.createSegment(channel, count)
}
4.2 Flush触发条件
func (sm *SegmentManager) GetFlushableSegments(channel string, ts Timestamp) ([]int64, error) {
segments := sm.getGrowingSegments(channel)
flushable := []int64{}
for _, seg := range segments {
// 条件1:大小超过阈值(512MB)
if seg.Size >= SegmentMaxSize {
flushable = append(flushable, seg.ID)
continue
}
// 条件2:行数超过阈值(100万)
if seg.NumOfRows >= SegmentMaxRows {
flushable = append(flushable, seg.ID)
continue
}
// 条件3:时间超过阈值(10分钟)
duration := time.Since(seg.CreateTime)
if duration > SegmentFlushTimeout {
flushable = append(flushable, seg.ID)
continue
}
}
return flushable, nil
}
4.3 Compaction优先级
func (ct *compactionTrigger) calculatePriority(segments []*SegmentInfo) float64 {
var totalSize, totalRows, deletedRows int64
var smallFileCount int
for _, seg := range segments {
totalSize += seg.Size
totalRows += seg.NumRows
deletedRows += seg.DeletedRows
if seg.Size < SmallSegmentThreshold {
smallFileCount++
}
}
deleteRatio := float64(deletedRows) / float64(totalRows)
fragmentScore := float64(smallFileCount) / float64(len(segments))
ageScore := float64(time.Since(segments[0].LastCompactionTime).Hours()) / (30 * 24)
sizeScore := float64(totalSize) / (10 * 1024 * 1024 * 1024)
// 加权计算优先级
return deleteRatio*0.4 + fragmentScore*0.3 + ageScore*0.2 + sizeScore*0.1
}