Milvus-03-DataCoord-概览

1. 模块概述

1.1 职责定义

DataCoord(数据协调器)是Milvus数据管理层的核心组件,负责数据的生命周期管理、存储优化和资源调度。

核心职责

  1. Segment生命周期管理

    • Segment分配与创建
    • Growing → Sealed → Flushed → Indexed状态转换
    • Segment元数据维护
  2. 数据持久化协调

    • 触发DataNode Flush操作
    • 监控Flush进度
    • 管理Binlog文件
  3. Compaction调度

    • 自动触发Compaction任务
    • 分配Compaction资源
    • 监控Compaction进度
  4. Channel与DataNode管理

    • Channel分配与负载均衡
    • DataNode注册与心跳监控
    • 故障检测与恢复
  5. 索引管理协调

    • 触发索引构建任务
    • 监控索引构建进度
    • 管理索引文件元数据
  6. 垃圾回收(GC)

    • 清理过期Segment
    • 清理无效Binlog文件
    • 回收Object Storage空间

1.2 输入与输出

输入

  • Proxy:AssignSegmentID请求(数据写入)
  • DataNode:Flush完成通知、Compaction进度
  • RootCoord:Collection/Partition DDL事件
  • etcd/TiKV:Segment元数据持久化
  • Object Storage:Binlog/Deltalog文件元数据

输出

  • DataNode:Flush/Compaction任务
  • QueryCoord:Handoff通知(Segment状态变更)
  • etcd/TiKV:元数据更新
  • 监控系统:Metrics指标

1.3 上下游依赖

flowchart TD
    RC[RootCoord] -->|DDL事件| DC[DataCoord]
    P[Proxy] -->|AssignSegmentID| DC
    DN1[DataNode1] -.->|Flush完成| DC
    DN2[DataNode2] -.->|Compaction进度| DC
    
    DC -->|Flush任务| DN1
    DC -->|Compaction任务| DN2
    DC -->|Handoff通知| QC[QueryCoord]
    
    DC <-->|元数据读写| ETCD[etcd/TiKV]
    DC -->|清理文件| OS[Object Storage]
    
    DC -->|Metrics| Prom[Prometheus]
    
    style DC fill:#f9f,stroke:#333,stroke-width:4px

依赖关系说明

依赖组件 依赖类型 用途
RootCoord 强依赖 获取Collection元信息、DDL事件订阅
DataNode 强依赖 执行Flush/Compaction任务
QueryCoord 弱依赖 通知Segment状态变更(Handoff)
etcd/TiKV 强依赖 元数据持久化与Watch
Object Storage 强依赖 Binlog文件管理

1.4 生命周期

stateDiagram-v2
    [*] --> Created: CreateServer()
    Created --> Initializing: Init()
    Initializing --> Initializing: 初始化元数据/连接
    Initializing --> Healthy: Start()
    Healthy --> Healthy: 处理请求
    Healthy --> Stopping: Stop()
    Stopping --> [*]: 清理资源
    
    note right of Initializing
        1. 连接etcd
        2. 加载元数据
        3. 初始化SegmentManager
        4. 启动Compaction
        5. 启动GC
    end note
    
    note right of Healthy
        1. 接收Segment分配请求
        2. 调度Flush任务
        3. 调度Compaction任务
        4. 处理DataNode心跳
        5. 执行垃圾回收
    end note

生命周期阶段

阶段 状态码 主要操作 耗时
Created - 创建Server实例 <1ms
Initializing StateCode_Initializing 连接依赖组件、加载元数据 1-5s
Healthy StateCode_Healthy 正常服务 -
Stopping StateCode_Abnormal 停止后台任务、关闭连接 1-3s

2. 架构设计

2.1 整体服务架构图

flowchart TB
    subgraph Upstream["上游组件"]
        Proxy[Proxy<br/>数据写入入口]
        RC[RootCoord<br/>DDL协调器]
        QC[QueryCoord<br/>查询协调器]
    end
    
    subgraph DataCoord["DataCoord Server"]
        subgraph APILayer["API 层"]
            GRPC[gRPC Server<br/>对外接口]
        end
        
        subgraph CoreManagers["核心管理器"]
            direction TB
            SM[SegmentManager<br/>━━━━━━━━━━━━<br/>• Segment分配<br/>• 状态管理<br/>• Seal策略]
            
            CM[ChannelManager<br/>━━━━━━━━━━━━<br/>• Channel分配<br/>• 负载均衡<br/>• 故障转移]
            
            Meta[Meta<br/>━━━━━━━━━━━━<br/>• 内存缓存<br/>• 元数据持久化<br/>• 事务支持]
        end
        
        subgraph TaskSchedulers["任务调度层"]
            direction LR
            CTM[CompactionTriggerManager<br/>━━━━━━━━━━━━<br/>• L0 Compaction<br/>• Mix Compaction<br/>• Clustering]
            
            CI[CompactionInspector<br/>━━━━━━━━━━━━<br/>• 任务调度<br/>• 进度监控<br/>• 结果处理]
            
            II[IndexInspector<br/>━━━━━━━━━━━━<br/>• 索引任务调度<br/>• 构建监控]
        end
        
        subgraph BackgroundServices["后台服务"]
            GC[GarbageCollector<br/>━━━━━━━━━━━━<br/>• Dropped Segment清理<br/>• Binlog文件回收<br/>• 索引文件清理]
            
            Handler[Handler<br/>━━━━━━━━━━━━<br/>• Collection信息缓存<br/>• RootCoord交互<br/>• QueryCoord交互]
        end
        
        GRPC --> SM
        GRPC --> CM
        GRPC --> Meta
        GRPC --> CTM
        
        SM --> Meta
        CM --> Meta
        CTM --> Meta
        CI --> Meta
        II --> Meta
        GC --> Meta
        
        Handler --> Meta
    end
    
    subgraph Downstream["下游组件"]
        DN1[DataNode-1<br/>━━━━━━<br/>数据写入]
        DN2[DataNode-2<br/>━━━━━━<br/>数据写入]
        DNn[DataNode-N<br/>━━━━━━<br/>数据写入]
        
        Cluster[Cluster Manager<br/>━━━━━━━━━━━━<br/>DataNode通信层]
    end
    
    subgraph Storage["存储层"]
        direction TB
        ETCD[(etcd/TiKV<br/>━━━━━━━━<br/>元数据存储)]
        S3[(Object Storage<br/>━━━━━━━━<br/>MinIO/S3<br/>Binlog文件)]
    end
    
    %% 上游到 DataCoord
    Proxy -->|AssignSegmentID<br/>获取Segment分配| GRPC
    RC -->|WatchChannels<br/>创建Collection| GRPC
    RC -->|DropCollection<br/>删除Collection| GRPC
    QC -->|GetSegmentInfo<br/>查询Segment| GRPC
    
    %% DataCoord 到下游
    CM -->|Watch/Release Channel<br/>分配/释放| Cluster
    CI -->|Compaction Task<br/>压缩任务| Cluster
    SM -->|Flush Command<br/>刷盘命令| Cluster
    
    Cluster --> DN1
    Cluster --> DN2
    Cluster --> DNn
    
    %% DataCoord 到存储
    Meta <-->|Save/Load<br/>Segment Meta| ETCD
    CM <-->|Save/Load<br/>Channel Meta| ETCD
    GC -->|Delete<br/>Binlog Files| S3
    
    %% DataNode 反馈
    DN1 -.->|FlushCompleted<br/>CompactionCompleted| GRPC
    DN2 -.->|FlushCompleted<br/>CompactionCompleted| GRPC
    DNn -.->|FlushCompleted<br/>CompactionCompleted| GRPC
    
    style DataCoord fill:#e1f5ff,stroke:#1976d2,stroke-width:3px
    style CoreManagers fill:#fff4e6,stroke:#f57c00,stroke-width:2px
    style TaskSchedulers fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
    style BackgroundServices fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
    style Upstream fill:#ffebee,stroke:#c62828,stroke-width:2px
    style Downstream fill:#e0f2f1,stroke:#00695c,stroke-width:2px
    style Storage fill:#fce4ec,stroke:#880e4f,stroke-width:2px

架构分层说明

层次 组件 职责 交互方式
API层 gRPC Server 对外提供RPC接口 同步调用
核心管理器 SegmentManager Segment生命周期管理 内存+持久化
ChannelManager Channel分配与负载均衡 内存+持久化
Meta 元数据缓存与持久化 读多写少
任务调度层 CompactionTriggerManager 自动触发Compaction 定时+事件触发
CompactionInspector Compaction任务调度 异步调度
IndexInspector 索引任务调度 异步调度
后台服务 GarbageCollector 垃圾回收 定时清理
Handler 跨组件通信 RPC调用

2.2 模块交互矩阵

下表展示了DataCoord内部各模块之间的依赖关系和交互方式:

调用方 ↓ 被调方 → SegmentManager ChannelManager Meta CompactionTrigger GarbageCollector
API Layer 分配Segment
同步调用
Watch/Release
同步调用
查询元数据
同步调用
手动触发
同步调用
-
SegmentManager - 查询DataNode
同步调用
读写Segment元数据
同步调用
- -
ChannelManager - - 读写Channel元数据
同步调用
- -
CompactionTrigger 获取待压缩Segment
同步调用
- 读取元数据
同步调用
- -
GarbageCollector - - 查询Dropped Segment
删除元数据
同步调用
- -

2.3 数据流转示意图

flowchart LR
    subgraph Write["写入流程"]
        W1[Proxy请求分配] --> W2[SegmentManager分配]
        W2 --> W3[Meta持久化]
        W3 --> W4[返回SegmentID]
        W4 --> W5[DataNode写入数据]
        W5 --> W6[Segment达到阈值]
    end
    
    subgraph Flush["Flush流程"]
        F1[SegmentManager检测] --> F2[Seal Segment]
        F2 --> F3[通知DataNode Flush]
        F3 --> F4[DataNode写Binlog]
        F4 --> F5[更新Meta状态]
        F5 --> F6[通知QueryCoord]
    end
    
    subgraph Compact["Compaction流程"]
        C1[CompactionTrigger扫描] --> C2[计算收益]
        C2 --> C3[提交CompactionInspector]
        C3 --> C4[DataNode合并]
        C4 --> C5[写新Segment]
        C5 --> C6[Drop旧Segment]
    end
    
    subgraph GC["GC流程"]
        G1[GC扫描Dropped] --> G2[删除Binlog]
        G2 --> G3[删除Meta]
    end
    
    W6 --> F1
    F6 --> C1
    C6 --> G1
    
    style Write fill:#e3f2fd
    style Flush fill:#fff3e0
    style Compact fill:#f3e5f5
    style GC fill:#e8f5e9

架构层次

  1. API层:gRPC接口,处理外部请求
  2. 核心管理器:Segment、Channel、Meta管理
  3. 调度器:Compaction、索引、统计任务调度
  4. 后台服务:GC、心跳监控、时间同步

2.2 核心组件说明

2.2.1 SegmentManager

职责:Segment分配与生命周期管理

核心功能

  • AllocSegment:为数据写入分配Segment
  • SealSegment:将Growing Segment标记为Sealed
  • DropSegment:删除Segment元数据
  • GetFlushableSegments:获取可Flush的Segment列表

Segment状态机

stateDiagram-v2
    [*] --> Growing: AllocSegment
    Growing --> Sealed: SealSegment(满或超时)
    Sealed --> Flushing: DataNode开始Flush
    Flushing --> Flushed: Flush完成
    Flushed --> Indexed: 索引构建完成
    Indexed --> Dropped: DropSegment
    Dropped --> [*]: GC清理
    
    Growing --> Dropped: Collection被删除
    Sealed --> Dropped: Collection被删除
    
    note right of Growing
        状态:内存中
        操作:接受新数据写入
        大小:动态增长
    end note
    
    note right of Flushed
        状态:已持久化
        操作:只读
        位置:Object Storage
    end note

Segment分配策略

// 分配策略:优先复用未满Segment,不存在则创建新Segment
func (m *SegmentManager) AllocSegment(collectionID, partitionID int64, channelName string, count int64) ([]*Allocation, error) {
    // 1. 查找未满的Growing Segment
    segment := m.meta.GetGrowingSegment(collectionID, partitionID, channelName)
    
    // 2. 检查容量是否足够
    if segment != nil && segment.AvailableSize >= count {
        allocation := &Allocation{
            SegmentID: segment.ID,
            NumOfRows: count,
            ExpireTime: time.Now().Add(SegmentExpireDuration),
        }
        return []*Allocation{allocation}, nil
    }
    
    // 3. 创建新Segment
    newSegmentID, err := m.allocator.AllocID()
    if err != nil {
        return nil, err
    }
    
    segment = &SegmentInfo{
        SegmentID:    newSegmentID,
        CollectionID: collectionID,
        PartitionID:  partitionID,
        State:        commonpb.SegmentState_Growing,
        MaxSize:      SegmentMaxSize,  // 默认512MB
        Channel:      channelName,
    }
    
    m.meta.AddSegment(segment)
    
    return []*Allocation{{SegmentID: newSegmentID, NumOfRows: count}}, nil
}

2.2.2 ChannelManager

职责:管理DML Channel与DataNode的映射关系

核心功能

  • Watch:为Channel分配DataNode
  • Release:释放Channel(Collection被删除)
  • GetDataNode:查询Channel对应的DataNode
  • Balance:负载均衡(将Channel迁移到其他DataNode)

Channel分配示意

flowchart LR
    C1[Collection1<br/>4 Channels] --> VC1[vchan_0]
    C1 --> VC2[vchan_1]
    C1 --> VC3[vchan_2]
    C1 --> VC4[vchan_3]
    
    VC1 -.->|Watch| DN1[DataNode1]
    VC2 -.->|Watch| DN2[DataNode2]
    VC3 -.->|Watch| DN1
    VC4 -.->|Watch| DN3[DataNode3]
    
    DN1 -->|Flush| S1[Segment1-3]
    DN2 -->|Flush| S2[Segment4-5]
    DN3 -->|Flush| S3[Segment6-7]
    
    style C1 fill:#e1f5ff
    style DN1 fill:#fff4e6
    style DN2 fill:#fff4e6
    style DN3 fill:#fff4e6

2.2.3 Meta

职责:管理Segment元数据的内存缓存与持久化

数据结构

type meta struct {
    sync.RWMutex
    
    // Segment元数据:segmentID -> SegmentInfo
    segments map[UniqueID]*SegmentInfo
    
    // Collection -> Segments映射
    collections map[UniqueID][]UniqueID
    
    // Channel -> Segments映射
    channelSegments map[string][]UniqueID
    
    // 元数据持久化层
    catalog metastore.DataCoordCatalog
    
    // Collection信息缓存
    collectionInfos map[UniqueID]*collectionInfo
}

元数据操作

操作 说明 持久化
AddSegment 添加新Segment
UpdateSegment 更新Segment状态
DropSegment 删除Segment
GetSegment 查询Segment信息 否(内存)
ListSegments 列出Collection的Segment 否(内存)

2.2.4 CompactionTrigger

职责:自动触发Compaction任务

触发策略

  1. 时间触发:定期扫描(每10分钟)
  2. 事件触发:Segment Flush完成后检查
  3. 手动触发:用户主动调用API

Compaction类型

flowchart TD
    Start[扫描Segment] --> CheckDelete{Delete比例>20%}
    CheckDelete -->|是| MixCompaction[MixCompaction<br/>合并Delete Delta]
    CheckDelete -->|否| CheckSize{小文件数量>10}
    
    CheckSize -->|是| MergeCompaction[MergeCompaction<br/>合并小Segment]
    CheckSize -->|否| CheckCluster{是否启用Clustering}
    
    CheckCluster -->|是| ClusteringCompaction[ClusteringCompaction<br/>按标量字段聚类]
    CheckCluster -->|否| End[不触发]
    
    MixCompaction --> Schedule[提交到CompactionInspector]
    MergeCompaction --> Schedule
    ClusteringCompaction --> Schedule
    
    Schedule --> End
    
    style MixCompaction fill:#ffe6e6
    style MergeCompaction fill:#e6f3ff
    style ClusteringCompaction fill:#e6ffe6

Compaction收益评估

// 计算Compaction收益分数
func (t *compactionTrigger) calculateScore(segments []*SegmentInfo) float64 {
    var totalSize int64
    var totalDeleted int64
    var fragmentScore float64
    
    for _, seg := range segments {
        totalSize += seg.Size
        totalDeleted += seg.NumOfRows * seg.DeleteRatio
        
        // 碎片化分数:小文件越多,分数越高
        if seg.Size < SmallSegmentThreshold {
            fragmentScore += 1.0
        }
    }
    
    deleteRatio := float64(totalDeleted) / float64(totalSize)
    
    // 综合分数 = 删除比例 * 0.7 + 碎片化分数 * 0.3
    return deleteRatio*0.7 + (fragmentScore/float64(len(segments)))*0.3
}

2.2.5 GarbageCollector

职责:清理无效数据,回收存储空间

清理对象

  1. Dropped Segment的Binlog
  2. Compaction后的旧Segment
  3. 过期的临时文件
  4. 孤立的索引文件

GC流程

sequenceDiagram
    autonumber
    participant GC as GarbageCollector
    participant Meta as Meta
    participant OS as Object Storage
    
    loop 每30分钟
        GC->>Meta: 扫描Dropped Segment
        Meta-->>GC: [seg1, seg2, ...]
        
        loop 每个Segment
            GC->>Meta: 获取Segment的Binlog路径
            Meta-->>GC: [binlog1, binlog2, ...]
            
            GC->>OS: 检查文件是否存在
            OS-->>GC: 存在
            
            GC->>OS: 删除Binlog文件
            OS-->>GC: 删除成功
            
            GC->>Meta: 移除Segment元数据
            Meta-->>GC: 成功
        end
    end

3. 核心流程详解

3.1 AssignSegmentID - Segment分配流程

3.1.1 整体时序图

sequenceDiagram
    autonumber
    participant P as Proxy
    participant DC as DataCoord::Server
    participant H as Handler
    participant SM as SegmentManager
    participant Meta as Meta
    participant Alloc as Allocator
    participant ETCD as etcd/TiKV
    
    P->>+DC: AssignSegmentID(requests)
    Note over DC: 检查健康状态
    
    loop 处理每个请求
        DC->>+H: GetCollection(collectionID)
        H->>RC: DescribeCollection(collectionID)
        RC-->>H: CollectionInfo(schema, properties)
        H-->>-DC: CollectionInfo
        
        DC->>+SM: AllocSegment(collID, partID, channel, count, version)
        
        Note over SM: 加Channel锁<br/>确保串行分配
        
        SM->>+Meta: GetGrowingSegments(channel)
        Note over Meta: 从内存缓存查询<br/>channel -> segments映射
        Meta-->>-SM: [segment1, segment2, ...]
        
        alt 存在可用Segment
            Note over SM: 按可用空间排序<br/>优先使用最满的Segment
            SM->>SM: 检查容量是否充足
            alt 容量足够
                SM->>Meta: 更新分配记录
                SM-->>DC: Allocation(existingSegID, count)
            else 容量不足需要新Segment
                SM->>+Alloc: AllocID()
                Alloc->>ETCD: IncrementBy(1)
                ETCD-->>Alloc: new_id
                Alloc-->>-SM: new_segment_id
                
                SM->>SM: 构建SegmentInfo
                Note over SM: state=Growing<br/>maxRows=1000000<br/>maxSize=512MB
                
                SM->>+Meta: AddSegment(newSegment)
                Meta->>Meta: 更新内存索引
                Note over Meta: 更新 segments map<br/>更新 channel2Growing map<br/>更新 collection2Segments map
                
                Meta->>+ETCD: SaveSegment(segmentInfo)
                Note over ETCD: key: segment/{segID}<br/>事务写入
                ETCD-->>-Meta: OK
                Meta-->>-SM: Success
                
                SM-->>DC: Allocation(newSegID, count)
            end
        else 不存在Growing Segment
            Note over SM: 同上创建新Segment流程
        end
        
        SM-->>-DC: [Allocation...]
        
        DC->>DC: 构建SegmentIDAssignment
        Note over DC: segmentID, count<br/>expireTime, collectionID
    end
    
    DC-->>-P: AssignSegmentIDResponse
    Note over P: 使用分配的SegmentID<br/>写入数据到DataNode

3.1.2 调用链路详解

① API入口 - Server.AssignSegmentID()

// internal/datacoord/services.go:307
func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
    // 1. 健康检查
    if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
        return &datapb.AssignSegmentIDResponse{
            Status: merr.Status(err),
        }, nil
    }
    
    // 2. 处理每个分配请求
    assigns := make([]*datapb.SegmentIDAssignment, 0, len(req.SegmentIDRequests))
    for _, r := range req.SegmentIDRequests {
        // 3. 获取Collection信息(从RootCoord)
        _, err := s.handler.GetCollection(ctx, r.GetCollectionID())
        
        // 4. 调用SegmentManager执行分配
        segmentAllocations, err := s.segmentManager.AllocSegment(ctx,
            r.CollectionID, r.PartitionID, r.ChannelName, 
            int64(r.Count), r.GetStorageVersion())
        
        // 5. 构造返回结果
        for _, allocation := range segmentAllocations {
            assigns = append(assigns, &datapb.SegmentIDAssignment{
                SegID:        allocation.SegmentID,
                ChannelName:  r.ChannelName,
                Count:        uint32(allocation.NumOfRows),
                CollectionID: r.CollectionID,
                PartitionID:  r.PartitionID,
                ExpireTime:   allocation.ExpireTime,
            })
        }
    }
    
    return &datapb.AssignSegmentIDResponse{
        Status:           merr.Success(),
        SegIDAssignments: assigns,
    }, nil
}

② 核心逻辑 - SegmentManager.AllocSegment()

// internal/datacoord/segment_manager.go:240
func (sm *SegmentManager) AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, 
    channelName string, requestRows int64, storageVersion int64) ([]*Allocation, error) {
    
    // 1. 加锁:确保同一个Channel的Segment分配串行化
    sm.channelLock.Lock(channelName)
    defer sm.channelLock.Unlock(channelName)
    
    // 2. 获取分配策略
    allocations, err := sm.allocPolicy.apply(collectionID, partitionID, 
        channelName, requestRows)
    
    // 3. 根据策略执行分配
    for _, alloc := range allocations {
        if alloc.SegmentID == 0 {
            // 需要创建新Segment
            segmentID, err := sm.allocator.allocID(ctx)
            
            segment := &SegmentInfo{
                SegmentID:     segmentID,
                CollectionID:  collectionID,
                PartitionID:   partitionID,
                InsertChannel: channelName,
                State:         commonpb.SegmentState_Growing,
                MaxRowNum:     Params.SegmentMaxSize,
                NumOfRows:     0,
                StorageVersion: storageVersion,
            }
            
            // 持久化到Meta
            err = sm.meta.AddSegment(ctx, segment)
            
            alloc.SegmentID = segmentID
        }
    }
    
    return allocations, nil
}

③ 元数据管理 - Meta.AddSegment()

// internal/datacoord/meta.go:450
func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error {
    m.Lock()
    defer m.Unlock()
    
    // 1. 更新内存索引
    m.segments.Insert(segment.GetID(), segment)
    
    // 2. 更新Channel -> Segments映射
    if segment.State == commonpb.SegmentState_Growing {
        growingSet := m.channel2Growing.GetOrInsert(segment.InsertChannel, 
            typeutil.NewUniqueSet())
        growingSet.Insert(segment.GetID())
    }
    
    // 3. 更新Collection -> Segments映射
    collSegments := m.collections.GetOrInsert(segment.CollectionID, []int64{})
    m.collections.Insert(segment.CollectionID, append(collSegments, segment.GetID()))
    
    // 4. 持久化到etcd
    if err := m.catalog.AddSegment(ctx, segment.SegmentInfo); err != nil {
        // 回滚内存操作
        m.segments.Remove(segment.GetID())
        return err
    }
    
    return nil
}

3.1.3 关键设计点

1. 并发控制

// 使用Channel级别的锁,避免全局锁竞争
type SegmentManager struct {
    channelLock *lock.KeyLock[string]  // channel -> lock
    // ...
}

// 同一Channel的分配请求串行化
sm.channelLock.Lock(channelName)
defer sm.channelLock.Unlock(channelName)

2. 分配策略

// 优先复用未满的Growing Segment
func (sm *SegmentManager) selectOrCreateSegment(channel string, count int64) (*SegmentInfo, error) {
    segments := sm.meta.GetGrowingSegments(channel)
    
    // 按可用空间排序,优先选择最满的Segment(减少碎片)
    sort.Slice(segments, func(i, j int) bool {
        return segments[i].AvailableSize() < segments[j].AvailableSize()
    })
    
    for _, seg := range segments {
        if seg.AvailableSize() >= count {
            return seg, nil
        }
    }
    
    // 无可用Segment,创建新的
    return sm.createSegment(channel, count)
}

3. 元数据一致性

stateDiagram-v2
    [*] --> 内存更新: AddSegment
    内存更新 --> etcd持久化: Save
    etcd持久化 --> 成功: OK
    etcd持久化 --> 回滚内存: Error
    回滚内存 --> [*]: 失败
    成功 --> [*]: 完成
    
    note right of 内存更新
        • 更新 segments map
        • 更新 channel2Growing
        • 更新 collection2Segments
    end note
    
    note right of etcd持久化
        事务写入
        key: segment/{segID}
    end note

3.1.4 性能优化

优化点 方法 效果
Channel级锁 使用KeyLock[string] 减少锁竞争,提高并发度
内存缓存 所有Segment信息在内存 查询延迟<1ms
Segment复用 优先填充未满Segment 减少小文件数量
批量分配 一次请求可分配多个Segment 减少RPC次数
异步持久化 先更新内存,异步写etcd 降低P99延迟

3.1.5 异常处理

flowchart TD
    Start[AssignSegmentID] --> Check{健康检查}
    Check -->|失败| Err1[返回Unavailable]
    Check -->|成功| GetColl{获取Collection}
    
    GetColl -->|失败| Err2[返回CollectionNotFound]
    GetColl -->|成功| AllocSeg{分配Segment}
    
    AllocSeg -->|成功| Return[返回Assignment]
    AllocSeg -->|ID分配失败| Err3[返回OutOfID]
    AllocSeg -->|Meta持久化失败| Err4[返回InternalError]
    
    Err1 --> End
    Err2 --> End
    Err3 --> End
    Err4 --> End
    Return --> End[结束]
    
    style Err1 fill:#ffcdd2
    style Err2 fill:#ffcdd2
    style Err3 fill:#ffcdd2
    style Err4 fill:#ffcdd2

错误码说明

错误 原因 恢复策略
Unavailable DataCoord未就绪 重试
CollectionNotFound Collection不存在 检查DDL操作
OutOfID ID分配器耗尽 联系管理员
InternalError etcd写入失败 检查etcd连接,重试

3.2 Flush流程详解

3.2.1 整体时序图

sequenceDiagram
    autonumber
    participant User as User/Client
    participant P as Proxy
    participant DC as DataCoord::Server
    participant SM as SegmentManager
    participant Meta as Meta
    participant CM as ChannelManager
    participant Cluster as Cluster
    participant DN as DataNode
    participant S3 as Object Storage
    participant QC as QueryCoord
    
    User->>+P: Flush(collectionID)
    P->>+DC: Flush(collectionID, segmentIDs)
    Note over DC: 手动触发Flush
    
    DC->>+DC: AllocTimestamp()
    DC-->>-DC: flushTs
    
    DC->>+DC: flushCollection(collID, flushTs, segIDs)
    
    DC->>+Meta: GetCollection(collID)
    Meta-->>-DC: CollectionInfo(channels)
    
    loop 每个VChannel
        DC->>+Meta: GetChannelCheckpoint(channel)
        Meta-->>-DC: checkpoint
        Note over DC: 记录当前Channel位置<br/>确保checkpoint早于segment endTs
        
        alt segIDs为空(Flush全部)
            DC->>+SM: SealAllSegments(channel, [])
            SM->>+Meta: GetSegmentsByChannel(channel)
            Meta-->>-SM: [growing_segments]
            
            loop 每个Growing Segment
                SM->>SM: 检查是否满足Seal条件
                Note over SM: size>threshold OR<br/>time>timeout
                
                alt 满足条件
                    SM->>+Meta: UpdateSegment(segID, state=Sealed)
                    Meta->>Meta: 更新内存状态
                    Meta->>ETCD: SaveSegment
                    ETCD-->>Meta: OK
                    Meta-->>-SM: Success
                end
            end
            SM-->>-DC: [sealed_segment_ids]
        else 指定segmentIDs
            DC->>+SM: SealAllSegments(channel, segIDs)
            Note over SM: 只Seal指定的Segments
            SM-->>-DC: [sealed_segment_ids]
        end
        
        DC->>+CM: GetNodeChannelsByCollectionID(collID)
        CM-->>-DC: nodeChannels{nodeID -> channels}
        
        loop 每个DataNode
            DC->>+Cluster: FlushChannels(nodeID, flushTs, channels)
            Cluster->>+DN: FlushChannels RPC
            
            Note over DN: 异步执行Flush<br/>写入Binlog
            
            DN->>DN: 遍历所有Sealed Segment
            
            loop 每个Segment
                DN->>DN: 序列化Insert数据
                DN->>DN: 序列化Delete数据
                DN->>DN: 生成Stats Log
                
                DN->>+S3: PutObject(insert_log)
                S3-->>-DN: binlog_path
                
                DN->>+S3: PutObject(delete_log)
                S3-->>-DN: deltalog_path
                
                DN->>+S3: PutObject(stats_log)
                S3-->>-DN: statslog_path
            end
            
            DN->>+DC: SegmentFlushCompleted(segmentID, binlogs)
            DC->>+Meta: UpdateSegmentBinlogs(segID, binlogs)
            Meta->>Meta: 更新Binlog路径
            Meta->>+ETCD: SaveSegment(binlogs)
            ETCD-->>-Meta: OK
            Meta-->>-DC: Success
            
            DC->>+Meta: UpdateSegmentState(segID, Flushed)
            Meta-->>-DC: Success
            DC-->>-DN: ACK
            
            DN-->>-Cluster: FlushCompleted
            Cluster-->>-DC: Success
        end
        
        DC->>+QC: SegmentFlushCompleted(segments)
        Note over QC: 触发Handoff<br/>加载Segment到QueryNode
        QC-->>-DC: ACK
    end
    
    DC->>DC: 收集Flush结果
    DC-->>-DC: FlushResult
    DC-->>-P: FlushResponse(segmentIDs, flushTs)
    P-->>-User: Success

3.2.2 调用链路详解

① API入口 - Server.Flush()

// internal/datacoord/services.go:77
func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
    // 1. 健康检查
    if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
        return &datapb.FlushResponse{Status: merr.Status(err)}, nil
    }
    
    // 2. 分配Flush时间戳(全局顺序保证)
    ts, err := s.allocator.AllocTimestamp(ctx)
    if err != nil {
        return nil, err
    }
    
    // 3. 执行Collection级Flush
    flushResult, err := s.flushCollection(ctx, req.GetCollectionID(), ts, req.GetSegmentIDs())
    if err != nil {
        return &datapb.FlushResponse{Status: merr.Status(err)}, nil
    }
    
    return &datapb.FlushResponse{
        Status:          merr.Success(),
        SegmentIDs:      flushResult.GetSegmentIDs(),      // Sealed的Segment
        FlushSegmentIDs: flushResult.GetFlushSegmentIDs(), // 已Flushed的Segment
        TimeOfSeal:      flushResult.GetTimeOfSeal(),
        FlushTs:         flushResult.GetFlushTs(),
        ChannelCps:      flushResult.GetChannelCps(),      // Channel Checkpoint
    }, nil
}

② 核心逻辑 - Server.flushCollection()

// internal/datacoord/services.go:116
func (s *Server) flushCollection(ctx context.Context, collectionID UniqueID, 
    flushTs uint64, toFlushSegments []UniqueID) (*datapb.FlushResult, error) {
    
    // 1. 获取Collection信息
    coll, err := s.handler.GetCollection(ctx, collectionID)
    if err != nil {
        return nil, err
    }
    
    // 2. 获取所有Channel的Checkpoint(必须在Seal之前)
    //    确保checkpoint时间早于segment的endTs
    channelCPs := make(map[string]*msgpb.MsgPosition, 0)
    for _, vchannel := range coll.VChannelNames {
        cp := s.meta.GetChannelCheckpoint(vchannel)
        channelCPs[vchannel] = cp
    }
    
    // 3. Seal所有需要Flush的Segment
    sealedSegmentsIDDict := make(map[UniqueID]bool)
    for _, channel := range coll.VChannelNames {
        sealedSegmentIDs, err := s.segmentManager.SealAllSegments(ctx, channel, toFlushSegments)
        if err != nil {
            return nil, errors.Wrapf(err, "failed to flush collection %d", collectionID)
        }
        for _, sealedSegmentID := range sealedSegmentIDs {
            sealedSegmentsIDDict[sealedSegmentID] = true
        }
    }
    
    // 4. 获取所有已Flushed的Segment(用于返回给用户)
    segments := s.meta.GetSegmentsOfCollection(ctx, collectionID)
    flushSegmentIDs := make([]UniqueID, 0, len(segments))
    for _, segment := range segments {
        if segment != nil && 
           isFlushState(segment.GetState()) && 
           segment.GetLevel() != datapb.SegmentLevel_L0 {
            flushSegmentIDs = append(flushSegmentIDs, segment.GetID())
        }
    }
    
    // 5. 通知DataNode执行Flush
    nodeChannels := s.channelManager.GetNodeChannelsByCollectionID(collectionID)
    for nodeID, channelNames := range nodeChannels {
        err = s.cluster.FlushChannels(ctx, nodeID, flushTs, channelNames)
        if err != nil {
            return nil, err
        }
    }
    
    return &datapb.FlushResult{
        CollectionID:    collectionID,
        SegmentIDs:      lo.Keys(sealedSegmentsIDDict),
        FlushSegmentIDs: flushSegmentIDs,
        FlushTs:         flushTs,
        ChannelCps:      channelCPs,
    }, nil
}

③ Seal逻辑 - SegmentManager.SealAllSegments()

// internal/datacoord/segment_manager.go:450
func (sm *SegmentManager) SealAllSegments(ctx context.Context, channel string, 
    segIDs []UniqueID) ([]UniqueID, error) {
    
    sm.channelLock.Lock(channel)
    defer sm.channelLock.Unlock(channel)
    
    // 1. 获取Channel的所有Growing Segment
    segments := sm.meta.GetGrowingSegments(channel)
    
    // 2. 过滤需要Seal的Segment
    toSeal := []UniqueID{}
    if len(segIDs) > 0 {
        // 指定Segment
        segIDSet := typeutil.NewUniqueSet(segIDs...)
        for _, seg := range segments {
            if segIDSet.Contain(seg.GetID()) {
                toSeal = append(toSeal, seg.GetID())
            }
        }
    } else {
        // 所有Growing Segment
        for _, seg := range segments {
            toSeal = append(toSeal, seg.GetID())
        }
    }
    
    // 3. 执行Seal操作
    sealed := []UniqueID{}
    for _, segID := range toSeal {
        segment := sm.meta.GetSegment(ctx, segID)
        if segment == nil || segment.GetState() != commonpb.SegmentState_Growing {
            continue
        }
        
        // 更新状态为Sealed
        segment.State = commonpb.SegmentState_Sealed
        if err := sm.meta.UpdateSegment(ctx, segment); err != nil {
            return nil, err
        }
        
        sealed = append(sealed, segID)
        
        // 从Growing索引移除
        sm.channel2Growing.GetOrInsert(channel, typeutil.NewUniqueSet()).Remove(segID)
        // 加入Sealed索引
        sm.channel2Sealed.GetOrInsert(channel, typeutil.NewUniqueSet()).Insert(segID)
    }
    
    return sealed, nil
}

④ Binlog更新 - SegmentFlushCompleted()

// internal/datacoord/services.go:1900
func (s *Server) SegmentFlushCompleted(ctx context.Context, 
    req *datapb.SegmentFlushCompletedRequest) (*commonpb.Status, error) {
    
    segmentID := req.GetSegment().GetID()
    
    // 1. 更新Segment的Binlog路径
    segment := s.meta.GetSegment(ctx, segmentID)
    if segment == nil {
        return merr.Status(merr.WrapErrSegmentNotFound(segmentID)), nil
    }
    
    // 2. 更新Binlog、Deltalog、Statslog
    segment.Binlogs = req.GetSegment().GetBinlogs()
    segment.Deltalogs = req.GetSegment().GetDeltalogs()
    segment.Statslogs = req.GetSegment().GetStatslogs()
    segment.NumOfRows = req.GetSegment().GetNumOfRows()
    
    // 3. 更新状态为Flushed
    segment.State = commonpb.SegmentState_Flushed
    segment.DmlPosition = req.GetSegment().GetDmlPosition()
    
    // 4. 持久化
    if err := s.meta.UpdateSegment(ctx, segment); err != nil {
        return merr.Status(err), nil
    }
    
    // 5. 通知QueryCoord Handoff
    s.notifyQueryCoordSegmentFlush(ctx, segment)
    
    return merr.Success(), nil
}

3.2.3 Flush触发机制

自动触发条件

触发条件 阈值 检测周期 说明
Segment大小 ≥512MB 实时 达到最大容量立即Seal
数据行数 ≥100万行 实时 行数上限
Growing时长 ≥10分钟 每分钟 避免长时间不Flush
Channel空闲 ≥30秒无数据 每秒 及时持久化

手动触发方式

# 方式1:Flush指定Collection
client.flush(collection_name="my_collection")

# 方式2:Flush指定Segment
client.flush(collection_name="my_collection", segment_ids=[seg1, seg2])

# 方式3:FlushAll(所有Collection)
# 管理员操作

3.2.4 Flush状态机

stateDiagram-v2
    [*] --> Growing: 创建Segment
    Growing --> Sealed: Flush触发
    Sealed --> Flushing: DataNode开始Flush
    Flushing --> Flushed: Binlog写入完成
    Flushed --> Indexed: 索引构建完成
    Indexed --> [*]: 可被查询
    
    Growing --> Dropped: Collection删除
    Sealed --> Dropped: Collection删除
    Flushing --> Dropped: Collection删除
    
    note right of Growing
        状态:内存中
        特征:
        • 接受新数据写入
        • 大小动态增长
        • NumOfRows递增
    end note
    
    note right of Sealed
        状态:已封闭
        特征:
        • 不再接受新数据
        • 等待DataNode Flush
        • DmlPosition已确定
    end note
    
    note right of Flushed
        状态:已持久化
        特征:
        • Binlog已写入S3
        • 只读
        • 可被QueryNode加载
    end note

3.2.5 DataNode Flush详细流程

flowchart TD
    Start[收到FlushChannels命令] --> Loop{遍历Sealed Segment}
    
    Loop -->|有Segment| GetData[从Insert Buffer获取数据]
    GetData --> Serialize[序列化数据]
    
    Serialize --> WriteInsert[写Insert Log]
    WriteInsert --> WriteDelete[写Delete Log]
    WriteDelete --> WriteStats[写Stats Log]
    
    WriteStats --> CalcSize[计算Segment大小]
    CalcSize --> Report[上报FlushCompleted]
    
    Report --> Loop
    Loop -->|无Segment| End[完成]
    
    style WriteInsert fill:#e3f2fd
    style WriteDelete fill:#fff3e0
    style WriteStats fill:#f3e5f5

Binlog文件格式

├── Insert Log (列式存储)
│   ├── field_0/  (FieldID=0, 主键)
│   │   └── 435978159261483010
│   ├── field_1/  (FieldID=1, 向量字段)
│   │   └── 435978159261483011
│   └── field_2/  (FieldID=2, 标量字段)
│       └── 435978159261483012
├── Delete Log (删除记录)
│   └── 435978159261483020
└── Stats Log (统计信息)
    ├── 435978159261483030 (min/max/count)
    └── 435978159261483031

3.2.6 性能优化

优化点 实现方式 效果
批量Flush FlushChannels一次触发多个Segment 减少RPC开销
异步写入 DataNode异步写Binlog 不阻塞数据写入
Channel Checkpoint 提前获取,保证顺序 避免Checkpoint倒退
并发Flush 多个DataNode并行Flush 提高吞吐
压缩Binlog 使用Zstd压缩 减少存储空间50%

3.2.7 异常处理

flowchart TD
    Start[Flush请求] --> Check1{Collection存在?}
    Check1 -->|否| Err1[返回CollectionNotFound]
    Check1 -->|是| Check2{Segment有效?}
    
    Check2 -->|否| Err2[返回SegmentNotFound]
    Check2 -->|是| Seal{Seal Segment}
    
    Seal -->|成功| Notify{通知DataNode}
    Seal -->|失败| Err3[返回Meta更新失败]
    
    Notify -->|成功| Wait[等待DataNode完成]
    Notify -->|超时| Err4[返回Timeout<br/>Segment仍会后台Flush]
    
    Wait -->|完成| Update{更新Meta}
    Update -->|成功| Success[返回Success]
    Update -->|失败| Err5[返回Meta更新失败<br/>Binlog已写入<br/>后台重试]
    
    style Err1 fill:#ffcdd2
    style Err2 fill:#ffcdd2
    style Err3 fill:#ffcdd2
    style Err4 fill:#fff9c4
    style Err5 fill:#fff9c4
    style Success fill:#c8e6c9

容错机制

  1. Seal失败:保持Growing状态,下次继续触发
  2. DataNode超时:Segment仍会后台Flush,不影响后续写入
  3. Binlog写入失败:DataNode重试,最多3次
  4. Meta更新失败:后台定期重试,直到成功

3.3 Compaction流程详解

3.3.1 整体时序图

sequenceDiagram
    autonumber
    participant CTM as CompactionTriggerManager
    participant Policy as CompactionPolicy
    participant CI as CompactionInspector
    participant Meta as Meta
    participant Cluster as Cluster
    participant DN as DataNode
    participant S3 as Object Storage
    participant QC as QueryCoord
    
    Note over CTM: 定时触发(每10分钟)
    
    CTM->>+CTM: loop() - 后台任务
    
    CTM->>+Policy: Trigger(ctx)
    Note over Policy: 扫描Collection<br/>评估Compaction收益
    
    Policy->>+Meta: GetSegmentsOfCollection(collID)
    Meta-->>-Policy: [segments]
    
    Policy->>Policy: 按类型生成CompactionView
    Note over Policy: • L0 Compaction<br/>• Mix Compaction<br/>• Clustering
    
    Policy->>Policy: 计算收益分数
    Note over Policy: DeleteRatio * 0.4<br/>+ FragmentScore * 0.3<br/>+ AgeScore * 0.2
    
    alt 分数 > 阈值
        Policy-->>-CTM: [CompactionViews]
        
        loop 每个CompactionView
            CTM->>CTM: triggerViewForCompaction()
            
            CTM->>+CI: SubmitTask(CompactionTask)
            Note over CI: 加入任务队列
            
            CI->>+Meta: GetSegmentDetails(segmentIDs)
            Meta-->>-CI: [SegmentInfo with binlogs]
            
            CI->>CI: 构建CompactionPlan
            Note over CI: 选择DataNode<br/>分配资源
            
            CI->>+Cluster: Compaction(nodeID, plan)
            Cluster->>+DN: Compaction RPC
            
            Note over DN: 异步执行Compaction
            
            DN->>DN: 创建CompactionTask
            
            loop 每个输入Segment
                DN->>+S3: GetObject(insert_binlog)
                S3-->>-DN: binlog_data
                
                DN->>+S3: GetObject(delta_binlog)
                S3-->>-DN: delta_data
                
                DN->>DN: 反序列化数据
                DN->>DN: 应用Delete操作
            end
            
            DN->>DN: 合并数据
            Note over DN: • 去重<br/>• 删除已标记删除的行<br/>• 按PK排序
            
            alt Mix Compaction
                Note over DN: N个Segment -> 1个大Segment
                DN->>DN: 合并所有数据到一个Segment
            else L0 Compaction
                Note over DN: L0 Delta -> L1 Segment
                DN->>DN: 将L0删除应用到L1
            else Clustering Compaction
                Note over DN: 按标量字段聚类
                DN->>DN: 按ClusteringKey重新分组
            end
            
            DN->>DN: 序列化新Segment
            
            DN->>+S3: PutObject(new_insert_binlog)
            S3-->>-DN: new_binlog_path
            
            DN->>+S3: PutObject(new_stats_binlog)
            S3-->>-DN: new_statslog_path
            
            DN->>+DC: CompactionCompleted(result)
            DC->>+CI: HandleCompactionResult(result)
            
            CI->>+Meta: AddSegment(newSegment)
            Meta->>Meta: 更新内存索引
            Meta->>+ETCD: SaveSegment
            ETCD-->>-Meta: OK
            Meta-->>-CI: Success
            
            loop 每个旧Segment
                CI->>+Meta: DropSegment(oldSegID)
                Meta->>Meta: 标记state=Dropped
                Meta->>+ETCD: SaveSegment(state=Dropped)
                ETCD-->>-Meta: OK
                Meta-->>-CI: Success
            end
            
            CI->>+QC: NotifySegmentChange(newSegment, oldSegments)
            Note over QC: 触发Segment替换<br/>先加载新Segment<br/>再释放旧Segment
            QC-->>-CI: ACK
            
            CI-->>-DC: Success
            DC-->>-DN: ACK
            DN-->>-Cluster: CompactionCompleted
            Cluster-->>-CI: Success
            
            CI->>CI: 更新任务状态
            Note over CI: state=Completed
            CI-->>-CTM: TaskCompleted
        end
    else 分数 <= 阈值
        Policy-->>CTM: nil (不触发)
    end
    
    CTM-->>-CTM: 继续下一轮循环

3.3.2 Compaction类型详解

1. L0 Compaction(删除操作压缩)

flowchart LR
    subgraph Input["输入"]
        L0[L0 Segment<br/>━━━━━━<br/>Delete Records<br/>PK: 1,3,5,7]
        L1[L1 Segment<br/>━━━━━━<br/>Data Records<br/>PK: 1,2,3,4,5,6,7,8]
    end
    
    subgraph Process["处理"]
        Apply[应用Delete操作<br/>━━━━━━<br/>移除 PK=1,3,5,7]
    end
    
    subgraph Output["输出"]
        NewL1[New L1 Segment<br/>━━━━━━<br/>PK: 2,4,6,8]
        DropL0[Drop L0<br/>标记删除]
        DropOldL1[Drop Old L1<br/>标记删除]
    end
    
    L0 --> Apply
    L1 --> Apply
    Apply --> NewL1
    Apply --> DropL0
    Apply --> DropOldL1
    
    style L0 fill:#ffebee
    style L1 fill:#e3f2fd
    style NewL1 fill:#c8e6c9

触发条件

  • L0 Segment数量 > 8个
  • L0 Segment总大小 > 256MB
  • L0 Segment存在时间 > 5分钟

2. Mix Compaction(小文件合并)

flowchart LR
    subgraph Input["输入"]
        S1[Segment1<br/>64MB]
        S2[Segment2<br/>128MB]
        S3[Segment3<br/>96MB]
        S4[Segment4<br/>80MB]
    end
    
    subgraph Process["处理"]
        Merge[合并数据<br/>━━━━━━<br/>• 去重<br/>• 应用Delete<br/>• 按PK排序]
    end
    
    subgraph Output["输出"]
        NewS[New Segment<br/>━━━━━━<br/>368MB<br/>已优化]
    end
    
    S1 --> Merge
    S2 --> Merge
    S3 --> Merge
    S4 --> Merge
    Merge --> NewS
    
    style Input fill:#fff3e0
    style NewS fill:#c8e6c9

触发条件

  • 小Segment(<256MB)数量 > 4个
  • Delete比例 > 10%
  • 碎片化分数 > 0.5

3. Clustering Compaction(数据聚类)

flowchart TB
    subgraph Input["输入数据"]
        Data[原始Segment<br/>━━━━━━<br/>随机分布<br/>ClusteringKey无序]
    end
    
    subgraph Process["处理"]
        Analyze[分析ClusteringKey<br/>━━━━━━<br/>计算分区边界]
        Partition[按Key分区<br/>━━━━━━<br/>Range Partitioning]
    end
    
    subgraph Output["输出"]
        C1[Cluster1<br/>key: 0-100]
        C2[Cluster2<br/>key: 100-200]
        C3[Cluster3<br/>key: 200-300]
    end
    
    Data --> Analyze
    Analyze --> Partition
    Partition --> C1
    Partition --> C2
    Partition --> C3
    
    style C1 fill:#e8f5e9
    style C2 fill:#e8f5e9
    style C3 fill:#e8f5e9

触发条件

  • 用户配置了ClusteringKey
  • 手动触发或定时触发
  • 数据量 > 1GB

3.3.3 Compaction类型对比

类型 目的 输入 输出 触发频率 收益
L0 Compaction 应用删除操作 L0 + L1 Segments New L1 高(每10秒) 减少查询开销
Mix Compaction 合并小文件 N个小Segment 1个大Segment 中(每10分钟) 减少文件数
提高查询性能
Clustering 数据聚类优化 N个Segment N个聚类Segment 低(手动/每天) 大幅提升
范围查询性能
Sort Compaction 重新排序 N个Segment N个排序Segment 低(手动) 优化排序查询

3.3.4 调用链路详解

① CompactionTriggerManager.loop()

// internal/datacoord/compaction_trigger_v2.go:212
func (m *CompactionTriggerManager) loop(ctx context.Context) {
    // 定时器
    l0Ticker := time.NewTicker(Params.DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second))
    clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second))
    singleTicker := time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second))
    
    for {
        select {
        case <-ctx.Done():
            return
            
        case <-l0Ticker.C:
            if !m.l0Policy.Enable() {
                continue
            }
            if m.inspector.isFull() {
                log.RatedInfo(10, "Skip trigger l0 compaction since inspector is full")
                continue
            }
            
            // 触发L0 Compaction
            events, err := m.l0Policy.Trigger(ctx)
            if err != nil {
                log.Warn("Fail to trigger L0 policy", zap.Error(err))
                continue
            }
            
            // 提交任务
            for triggerType, views := range events {
                m.notify(ctx, triggerType, views)
            }
            
        case <-clusteringTicker.C:
            // 触发Clustering Compaction
            events, err := m.clusteringPolicy.Trigger(ctx)
            for triggerType, views := range events {
                m.notify(ctx, triggerType, views)
            }
            
        case <-singleTicker.C:
            // 触发Mix Compaction
            events, err := m.singlePolicy.Trigger(ctx)
            for triggerType, views := range events {
                m.notify(ctx, triggerType, views)
            }
        }
    }
}

② CompactionPolicy.Trigger()

// internal/datacoord/compaction_policy_single.go:100
func (policy *singleCompactionPolicy) Trigger(ctx context.Context) (map[CompactionTriggerType][]CompactionView, error) {
    collections := policy.meta.GetCollections()
    views := make([]CompactionView, 0)
    
    for _, collectionID := range collections {
        // 获取所有Segment
        segments := policy.meta.GetSegmentsOfCollection(ctx, collectionID)
        
        // 按Channel分组
        channelSegments := groupSegmentsByChannel(segments)
        
        for channel, segs := range channelSegments {
            // 计算Compaction收益
            score := policy.calculateScore(segs)
            
            if score > policy.threshold {
                // 生成CompactionView
                view := &compactionView{
                    collectionID: collectionID,
                    channel:      channel,
                    segments:     segs,
                    triggerID:    allocator.AllocID(),
                }
                views = append(views, view)
            }
        }
    }
    
    result := make(map[CompactionTriggerType][]CompactionView)
    if len(views) > 0 {
        result[TriggerTypeSingle] = views
    }
    return result, nil
}

// 计算Compaction收益分数
func (policy *singleCompactionPolicy) calculateScore(segments []*SegmentInfo) float64 {
    var totalSize, totalRows, deletedRows int64
    var smallFileCount int
    
    for _, seg := range segments {
        totalSize += seg.Size
        totalRows += seg.NumRows
        deletedRows += seg.DeletedRows
        
        if seg.Size < SmallSegmentThreshold {  // 256MB
            smallFileCount++
        }
    }
    
    deleteRatio := float64(deletedRows) / float64(totalRows)
    fragmentScore := float64(smallFileCount) / float64(len(segments))
    ageScore := float64(time.Since(segments[0].LastCompactionTime).Hours()) / (30 * 24)
    sizeScore := float64(totalSize) / (10 * 1024 * 1024 * 1024)  // 10GB
    
    // 加权计算
    return deleteRatio*0.4 + fragmentScore*0.3 + ageScore*0.2 + sizeScore*0.1
}

③ CompactionInspector.SubmitTask()

// internal/datacoord/compaction_inspector.go:200
func (ci *CompactionInspector) SubmitTask(task *CompactionTask) error {
    // 1. 检查任务队列是否满
    if ci.queue.IsFull() {
        return errors.New("compaction queue is full")
    }
    
    // 2. 选择DataNode
    nodeID, err := ci.selectDataNode(task)
    if err != nil {
        return err
    }
    
    // 3. 加入任务队列
    task.NodeID = nodeID
    task.State = datapb.CompactionTaskState_Pending
    ci.queue.Enqueue(task)
    
    // 4. 持久化任务状态
    ci.meta.SaveCompactionTask(task)
    
    // 5. 通知Scheduler调度
    ci.scheduler.Schedule(task)
    
    return nil
}

3.3.5 性能优化

优化点 实现 效果
并发执行 多个DataNode并行Compaction 10x吞吐提升
增量读取 流式读取Binlog 减少内存占用
零拷贝 直接操作S3数据 减少CPU开销
智能调度 优先级队列 高收益任务优先
资源限制 限制并发数和内存 避免OOM

3.3.6 监控指标

# Compaction任务数
milvus_datacoord_compaction_task_num:
  labels: [state, type]  # Pending/Running/Completed/Failed
  
# Compaction延迟
milvus_datacoord_compaction_latency:
  labels: [type]  # L0/Mix/Clustering
  unit: seconds
  percentiles: [p50, p90, p99]
  
# Compaction吞吐
milvus_datacoord_compaction_throughput:
  labels: [type]
  unit: bytes_per_second
  
# Compaction收益
milvus_datacoord_compaction_benefit:
  labels: [type]
  description: 节省的存储空间或查询加速比

3.4 WatchChannels流程详解

3.4.1 整体时序图

sequenceDiagram
    autonumber
    participant RC as RootCoord
    participant DC as DataCoord::Server
    participant CM as ChannelManager
    participant Store as ChannelStore
    participant Policy as AssignPolicy
    participant Cluster as Cluster
    participant DN as DataNode
    participant ETCD as etcd
    
    RC->>+DC: WatchChannels(collID, channels, startPos)
    Note over RC: 创建Collection时触发
    
    DC->>DC: 健康检查
    
    loop 每个Channel
        DC->>DC: NewRWChannel(channelName, collID, startPos)
        Note over DC: 构建Channel对象<br/>包含schema、dbProperties
        
        DC->>+CM: Watch(ctx, channel)
        Note over CM: 加锁确保串行
        
        CM->>+Store: AddChannel(bufferID, channel)
        Note over Store: 先加入Buffer<br/>等待分配给DataNode
        
        Store->>+ETCD: SaveChannelWatchInfo
        Note over ETCD: key: channel/{channelName}<br/>state: ToWatch<br/>opID: xxx
        ETCD-->>-Store: OK
        Store-->>-CM: Success
        
        CM->>+Policy: apply(nodesChannels, bufferChannel)
        Note over Policy: 选择DataNode<br/>负载均衡算法
        
        Policy->>Policy: 计算每个Node负载
        Policy->>Policy: 选择负载最低的Node
        Policy-->>-CM: ChannelOpSet(nodeID, Watch, channel)
        
        CM->>+Store: UpdateChannelWatchInfo
        Note over Store: 更新state: ToWatch -> Watching
        Store->>+ETCD: SaveChannelWatchInfo(nodeID, state)
        ETCD-->>-Store: OK
        Store-->>-CM: Success
        
        CM->>+Cluster: NotifyChannelOperation(nodeID, watchInfo)
        
        Cluster->>+DN: WatchDmChannels RPC
        Note over DN: DataNode开始订阅消息
        
        DN->>DN: 创建FlowGraph
        DN->>DN: 订阅MQ Channel
        DN->>DN: Seek到startPosition
        
        DN-->>-Cluster: Success
        Cluster-->>-CM: Success
        
        CM->>+Store: UpdateChannelWatchInfo(state=Watched)
        Store->>+ETCD: SaveChannelWatchInfo
        ETCD-->>-Store: OK
        Store-->>-CM: Success
        
        CM-->>-DC: Success
        
        DC->>+ETCD: MarkChannelAdded(channelName)
        Note over ETCD: 标记Channel已添加<br/>防止重复创建
        ETCD-->>-DC: OK
        
        DC->>+DC: UpdateChannelCheckpoint(channel, startPos)
        Note over DC: 初始化Channel Checkpoint
        DC-->>-DC: Success
    end
    
    DC-->>-RC: WatchChannelsResponse(Success)

3.4.2 Channel分配策略

负载均衡算法

// internal/datacoord/policy.go:211
func assignNewChannels(availableNodes Assignments, toAssign *NodeChannelInfo, 
    nodeCount int, totalChannelCount int, exclusiveNodes []int64) *ChannelOpSet {
    
    // 1. 计算目标分布
    totalChannelsAfterAssignment := totalChannelCount + len(toAssign.Channels)
    baseChannelsPerNode := totalChannelsAfterAssignment / nodeCount
    extraChannels := totalChannelsAfterAssignment % nodeCount
    
    // 2. 为每个Node设定目标Channel数
    targetChannelCounts := make(map[int64]int)
    for _, nodeInfo := range availableNodes {
        targetChannelCounts[nodeInfo.NodeID] = baseChannelsPerNode
        if extraChannels > 0 {
            targetChannelCounts[nodeInfo.NodeID]++
            extraChannels--
        }
    }
    
    // 3. 分配Channel
    nodeAssignments := make(map[int64][]RWChannel)
    for _, channel := range toAssign.GetChannels() {
        // 按当前负载排序
        sort.Slice(availableNodes, func(i, j int) bool {
            iTotal := len(availableNodes[i].Channels) + len(nodeAssignments[availableNodes[i].NodeID])
            jTotal := len(availableNodes[j].Channels) + len(nodeAssignments[availableNodes[j].NodeID])
            return iTotal < jTotal
        })
        
        // 选择负载最低的Node
        bestNode := availableNodes[0]
        for _, node := range availableNodes {
            currentTotal := len(node.Channels) + len(nodeAssignments[node.NodeID])
            if currentTotal < targetChannelCounts[node.NodeID] {
                bestNode = node
                break
            }
        }
        
        nodeAssignments[bestNode.NodeID] = append(nodeAssignments[bestNode.NodeID], channel)
    }
    
    // 4. 构建操作集
    operations := NewChannelOpSet()
    for nodeID, channels := range nodeAssignments {
        operations.Add(NewChannelOp(nodeID, Watch, channels...))
    }
    operations.Add(NewChannelOp(bufferID, Delete, toAssign.Channels...))
    
    return operations
}

Channel状态机

stateDiagram-v2
    [*] --> ToWatch: Watch请求
    ToWatch --> Watching: 分配到DataNode
    Watching --> Watched: DataNode确认
    Watched --> ToRelease: Release请求
    ToRelease --> Releasing: 通知DataNode
    Releasing --> Released: DataNode确认
    Released --> [*]: 删除
    
    ToWatch --> Failed: 分配失败
    Watching --> Failed: 超时
    Failed --> ToWatch: 重试
    
    note right of ToWatch
        状态:待分配
        位置:Buffer
    end note
    
    note right of Watched
        状态:已监听
        位置:DataNode
        操作:正常写入
    end note

3.4.3 Channel迁移流程

sequenceDiagram
    autonumber
    participant CM as ChannelManager
    participant OldDN as Old DataNode
    participant NewDN as New DataNode
    participant Meta as Meta
    
    Note over CM: 触发条件:<br/>• DataNode下线<br/>• 负载均衡<br/>• 手动迁移
    
    CM->>CM: DeleteNode(oldNodeID)
    Note over CM: 获取旧Node的所有Channel
    
    loop 每个Channel
        CM->>OldDN: UnwatchDmChannels(channel)
        OldDN->>OldDN: 停止消费消息
        OldDN->>OldDN: Flush未持久化数据
        OldDN-->>CM: Unwatched
        
        CM->>CM: assignPolicy(channel)
        Note over CM: 选择新的DataNode
        
        CM->>NewDN: WatchDmChannels(channel, checkpoint)
        NewDN->>NewDN: 创建FlowGraph
        NewDN->>NewDN: 从checkpoint开始消费
        NewDN-->>CM: Watched
        
        CM->>Meta: UpdateChannelMapping
        Note over Meta: 更新Channel -> DataNode映射
    end

3.5 垃圾回收(GC)流程详解

3.5.1 整体时序图

sequenceDiagram
    autonumber
    participant GC as GarbageCollector
    participant Meta as Meta
    participant Handler as Handler
    participant S3 as Object Storage
    participant ETCD as etcd
    
    Note over GC: 定时触发(每30分钟)
    
    GC->>+GC: recycleDroppedSegments()
    
    GC->>+Meta: SelectSegments(state=Dropped)
    Meta-->>-GC: [dropped_segments]
    
    GC->>+Meta: SelectSegments(state=All)
    Meta-->>-GC: [all_segments]
    
    GC->>GC: 构建compactTo映射
    Note over GC: 找出Compaction关系<br/>A, B -> C
    
    GC->>+Handler: ListLoadedSegments()
    Handler->>QC: 查询已加载的Segment
    QC-->>Handler: [loaded_segments]
    Handler-->>-GC: [loaded_segment_ids]
    
    loop 每个Dropped Segment
        GC->>GC: checkDroppedSegmentGC(segment)
        
        alt 不满足GC条件
            Note over GC: 跳过:<br/> 未过期<br/> Segment未索引<br/> CheckPoint未到达<br/> 仍被加载
            GC->>GC: continue
        else 满足GC条件
            GC->>GC: 收集Binlog路径
            Note over GC:  InsertLog<br/> DeltaLog<br/> StatsLog<br/> BM25Log<br/> TextLog<br/> IndexFiles
            
            loop 每个Binlog文件
                GC->>+S3: DeleteObject(binlog_path)
                S3-->>-GC: Success
            end
            
            GC->>+Meta: DropSegment(segmentID)
            Meta->>Meta: 从内存移除
            Meta->>+ETCD: DeleteSegment(segmentID)
            ETCD-->>-Meta: OK
            Meta-->>-GC: Success
            
            Note over GC: Segment完全删除
        end
    end
    
    GC-->>-GC: recycleDroppedSegments完成
    
    GC->>+GC: recycleUnusedBinlogFiles()
    Note over GC: 清理孤立Binlog
    
    loop 扫描Object Storage
        GC->>+S3: ListObjects(prefix="/binlog/")
        S3-->>-GC: [object_infos]
        
        loop 每个文件
            GC->>GC: ParseSegmentID(file_path)
            
            GC->>+Meta: GetSegment(segmentID)
            alt Segment不存在
                Meta-->>-GC: nil
                
                alt 文件超过容忍时间
                    GC->>+S3: DeleteObject(file_path)
                    S3-->>-GC: Success
                    Note over GC: 清理孤立文件
                end
            else Segment存在
                Meta-->>GC: segment
                GC->>GC: 检查文件是否在Segment的Binlog列表
                
                alt 不在列表中
                    GC->>+S3: DeleteObject(file_path)
                    S3-->>-GC: Success
                    Note over GC: 清理无效引用
                end
            end
        end
    end
    
    GC-->>-GC: recycleUnusedBinlogFiles完成

3.5.2 GC条件检查

// internal/datacoord/garbage_collector.go:437
func (gc *garbageCollector) checkDroppedSegmentGC(segment *SegmentInfo,
    childSegment *SegmentInfo,
    indexSet typeutil.UniqueSet,
    cpTimestamp Timestamp) bool {
    
    // 1. 检查过期时间(默认1天)
    if !gc.isExpire(segment.GetDroppedAt()) {
        return false
    }
    
    // 2. 如果是Compaction产生的旧Segment
    isCompacted := childSegment != nil || segment.GetCompacted()
    if isCompacted {
        // 确保子Segment已经索引,避免查询性能下降
        if childSegment != nil && !indexSet.Contain(childSegment.GetID()) {
            log.RatedInfo(60, "skipping GC when compact target segment is not indexed",
                zap.Int64("child segment ID", childSegment.GetID()))
            return false
        }
    }
    
    // 3. 检查Channel Checkpoint
    segInsertChannel := segment.GetInsertChannel()
    if gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) &&
        segment.GetDmlPosition().GetTimestamp() > cpTimestamp {
        // Segment的DML位置必须在Channel CP之前
        log.RatedInfo(60, "dropped segment dml position after channel cp, skip meta gc",
            zap.Uint64("dmlPosTs", segment.GetDmlPosition().GetTimestamp()),
            zap.Uint64("channelCpTs", cpTimestamp))
        return false
    }
    
    return true
}

3.5.3 GC清理对象

对象类型 检查条件 清理时机 容忍时间
Dropped Segment state=Dropped
+ 已过期
+ 子Segment已索引
+ CP已到达
满足所有条件 1天
孤立Binlog Segment不存在
+ 文件存在
文件修改时间>容忍时间 1天
无效Binlog引用 Segment存在
+ 文件不在Binlog列表
立即清理
过期索引文件 Segment已删除
+ 索引文件存在
Drop后1天 1天
临时文件 前缀为temp
+ 修改时间>2小时
立即清理 2小时

3.5.4 GC性能优化

// 1. 并发删除Binlog
futures := make([]*conc.Future[struct{}], 0)
for _, binlogPath := range binlogPaths {
    future := getOrCreateIOPool().Submit(func() (struct{}, error) {
        return struct{}{}, gc.option.cli.Remove(ctx, binlogPath)
    })
    futures = append(futures, future)
}

// 2. 批量等待完成
for _, f := range futures {
    if _, err := f.Await(); err != nil {
        log.Warn("failed to remove binlog", zap.Error(err))
        failed++
    }
}

// 3. 限流避免对S3造成压力
rateLimiter := rate.NewLimiter(100, 100)  // 每秒100个请求
rateLimiter.Wait(ctx)

4. 关键算法与设计

4.1 Segment分配策略

目标:最大化Segment利用率,减少小文件数量

策略

  1. 优先填充未满的Growing Segment
  2. 单个Channel串行分配(避免并发冲突)
  3. 预留10%容量(避免频繁Seal)
// Segment分配核心逻辑
func (m *SegmentManager) selectOrCreateSegment(channel string, count int64) (*SegmentInfo, error) {
    // 1. 获取Channel的所有Growing Segment
    segments := m.meta.GetGrowingSegments(channel)
    
    // 2. 按可用空间排序,优先选择最满的Segment(减少碎片)
    sort.Slice(segments, func(i, j int) bool {
        return segments[i].AvailableSize < segments[j].AvailableSize
    })
    
    // 3. 查找容量足够的Segment
    for _, seg := range segments {
        if seg.AvailableSize >= count {
            return seg, nil
        }
    }
    
    // 4. 无可用Segment,创建新Segment
    return m.createSegment(channel, count)
}

4.2 Flush时机决策

目标:平衡写入延迟与文件大小

决策树

flowchart TD
    Start[收到TimeTick] --> CheckSize{Segment大小>512MB}
    CheckSize -->|是| Flush[立即Flush]
    CheckSize -->|否| CheckRows{行数>100万}
    
    CheckRows -->|是| Flush
    CheckRows -->|否| CheckTime{Growing时长>10分钟}
    
    CheckTime -->|是| Flush
    CheckTime -->|否| CheckManual{手动Flush API}
    
    CheckManual -->|是| Flush
    CheckManual -->|否| Wait[继续等待]
    
    Flush --> End[Seal Segment]
    Wait --> End[保持Growing]
    
    style Flush fill:#ffe6e6
    style Wait fill:#e6f3ff

4.3 Compaction优先级算法

目标:优先处理收益最大的Compaction任务

优先级计算公式

Priority = DeleteRatio * 0.4 + FragmentScore * 0.3 + AgeScore * 0.2 + SizeScore * 0.1

各因子说明

因子 权重 计算方法 说明
DeleteRatio 0.4 deleted_rows / total_rows 删除比例越高,优先级越高
FragmentScore 0.3 small_files / total_files 小文件比例越高,优先级越高
AgeScore 0.2 days_since_last_compaction / 30 越久未Compact,优先级越高
SizeScore 0.1 total_size / 10GB 总大小越大,优先级越高

实现代码

// 计算Compaction优先级
func (t *compactionTrigger) calculatePriority(plan *CompactionPlan) float64 {
    var totalSize, totalRows, deletedRows int64
    var smallFileCount int
    
    for _, seg := range plan.Segments {
        totalSize += seg.Size
        totalRows += seg.NumRows
        deletedRows += seg.DeletedRows
        
        if seg.Size < SmallSegmentThreshold {
            smallFileCount++
        }
    }
    
    deleteRatio := float64(deletedRows) / float64(totalRows)
    fragmentScore := float64(smallFileCount) / float64(len(plan.Segments))
    ageScore := float64(time.Since(plan.LastCompactionTime).Hours()) / (30 * 24)
    sizeScore := float64(totalSize) / (10 * 1024 * 1024 * 1024) // 10GB
    
    return deleteRatio*0.4 + fragmentScore*0.3 + ageScore*0.2 + sizeScore*0.1
}

5. 性能与容量

5.1 性能指标

指标 数值 说明
AssignSegmentID延迟 P50: 5ms, P99: 20ms 包含元数据查询
Flush触发延迟 <100ms 从TimeTick到发送Flush命令
Compaction吞吐 100GB/小时/节点 取决于DataNode性能
GC扫描周期 30分钟 可配置
元数据查询QPS >10000 内存缓存

5.2 容量规划

单个DataCoord支持规模

维度 容量 说明
Collection数量 1000 超过需要分片
Segment数量 100万 内存占用约10GB
Channel数量 10000 对应DataNode数量
DataNode数量 1000 心跳监控
Compaction并发 100 可配置

5.3 监控指标

关键Metrics

# Segment相关
milvus_datacoord_segment_num:
  labels: [state, collection]  # Growing/Sealed/Flushed/Indexed
  
milvus_datacoord_segment_size:
  labels: [collection]
  unit: bytes

# Compaction相关
milvus_datacoord_compaction_task_num:
  labels: [state, type]  # Pending/Running/Completed/Failed
  
milvus_datacoord_compaction_latency:
  labels: [type]  # MixCompaction/MergeCompaction
  unit: seconds

# 性能相关
milvus_datacoord_assign_segment_latency:
  labels: []
  unit: milliseconds
  
milvus_datacoord_flush_latency:
  labels: [collection]
  unit: seconds

6. 配置参数

6.1 核心配置

dataCoord:
  # Segment配置
  segment:
    maxSize: 536870912          # 512MB
    maxRows: 1000000            # 100万行
    sealProportion: 0.9         # 90%触发Seal
    assignmentExpiration: 2000  # 分配过期时间(ms)
    
  # Flush配置
  flush:
    maxIdleDuration: 600        # 10分钟无数据自动Flush
    minSizeToFlush: 1048576     # 最小1MB才Flush
    
  # Compaction配置
  compaction:
    enableAutoCompaction: true
    triggerInterval: 600        # 10分钟扫描一次
    minSegmentToMerge: 4        # 最少4个Segment才合并
    maxSegmentToMerge: 30       # 最多合并30个Segment
    
  # GC配置
  gc:
    interval: 1800              # 30分钟执行一次
    missingTolerance: 86400     # 文件丢失容忍时间(1天)
    dropTolerance: 86400        # Segment删除后保留时间(1天)

7. 故障处理

7.1 常见故障

故障类型 现象 原因 处理
Segment分配失败 AssignSegmentID超时 etcd不可用 重试;检查etcd连接
Flush超时 Segment长时间Sealed DataNode故障 重新分配Channel
Compaction堆积 任务pending数量增加 DataNode性能不足 扩容DataNode
元数据不一致 查询返回错误数据 etcd数据损坏 重启DataCoord重新加载

7.2 降级策略

当etcd不可用时

  • 只读模式:继续提供Segment查询服务(基于内存缓存)
  • 禁止新Segment创建
  • 延迟Flush和Compaction任务

当DataNode不足时

  • 降低Flush触发阈值(减少Growing Segment)
  • 暂停Compaction任务
  • 告警通知运维扩容

8. 最佳实践

8.1 Segment大小调优

推荐配置

数据特征 Segment大小 原因
高频写入 256MB 减少Flush延迟
低频写入 512MB 减少小文件数量
大向量(>1024维) 1GB 提高检索效率
小向量(<128维) 256MB 平衡内存与性能

8.2 Compaction策略

定期手动Compaction

# 在业务低峰期(如凌晨2点)手动触发
curl -X POST "http://datacoord:13333/api/v1/compaction" \
  -d '{"collectionID": 123}'

监控Compaction积压

# 告警规则:Compaction任务pending数量>50
milvus_datacoord_compaction_task_num{state="pending"} > 50

8.3 容量规划

估算公式

Segment数量 = 数据总大小 / Segment大小
内存占用 = Segment数量 * 10KB  # 每个Segment元数据约10KB

示例

  • 数据总量:10TB
  • Segment大小:512MB
  • Segment数量:10TB / 512MB = 20,000个
  • 元数据内存:20,000 * 10KB = 200MB

相关文档


Milvus-03-DataCoord-详细文档

本文档整合DataCoord的API、数据结构和关键时序图。

📋 目录

  1. 核心API
  2. 数据结构
  3. 关键时序图

1. 核心API

1.1 AssignSegmentID

功能:为数据写入分配Segment

请求参数

type AssignSegmentIDRequest struct {
    SegmentIDRequests []*SegmentIDRequest
}

type SegmentIDRequest struct {
    CollectionID int64
    PartitionID  int64
    ChannelName  string
    Count        uint32  // 需要写入的行数
}

响应参数

type SegmentIDAssignment struct {
    SegID        int64   // 分配的SegmentID
    ChannelName  string
    Count        uint32
    CollectionID int64
    PartitionID  int64
    ExpireTime   uint64  // 分配过期时间
}

核心代码

func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
    assigns := make([]*datapb.SegmentIDAssignment, 0)
    
    for _, r := range req.SegmentIDRequests {
        // 调用SegmentManager分配
        allocations, err := s.segmentManager.AllocSegment(ctx,
            r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count), r.StorageVersion)
        
        for _, allocation := range allocations {
            assigns = append(assigns, &datapb.SegmentIDAssignment{
                SegID:        allocation.SegmentID,
                ChannelName:  r.ChannelName,
                Count:        uint32(allocation.NumOfRows),
                CollectionID: r.CollectionID,
                PartitionID:  r.PartitionID,
                ExpireTime:   allocation.ExpireTime,
            })
        }
    }
    
    return &datapb.AssignSegmentIDResponse{
        Status:      merr.Success(),
        SegIDAssignments: assigns,
    }, nil
}

1.2 Flush

功能:触发DataNode Flush操作

请求参数

type FlushRequest struct {
    DbID         int64
    CollectionID int64
    SegmentIDs   []int64  // 指定要Flush的Segment(可选)
}

核心代码

func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
    // 1. 分配Flush时间戳
    ts, err := s.allocator.AllocTimestamp(ctx)
    
    // 2. 执行Flush
    flushResult, err := s.flushCollection(ctx, req.CollectionID, ts, req.SegmentIDs)
    
    return &datapb.FlushResponse{
        Status:          merr.Success(),
        SegmentIDs:      flushResult.SegmentIDs,
        FlushSegmentIDs: flushResult.FlushSegmentIDs,
        TimeOfSeal:      flushResult.TimeOfSeal,
        FlushTs:         flushResult.FlushTs,
    }, nil
}

func (s *Server) flushCollection(ctx context.Context, collID int64, ts Timestamp, segIDs []int64) (*datapb.FlushResult, error) {
    // 1. 获取所有Channel
    channels := s.channelManager.GetChannelsByCollectionID(collID)
    
    // 2. 为每个Channel触发Flush
    for _, channel := range channels {
        segments := s.meta.GetSegmentsByChannel(channel)
        
        // 过滤:只Flush Growing Segment
        growingSegments := filterGrowingSegments(segments)
        
        // 调用DataNode Flush
        s.cluster.Flush(ctx, channel, growingSegments)
    }
    
    return &datapb.FlushResult{...}, nil
}

1.3 GetSegmentInfo

功能:查询Segment元信息

请求参数

type GetSegmentInfoRequest struct {
    SegmentIDs   []int64
    IncludeUnHealthy bool
}

响应参数

type SegmentInfo struct {
    ID             int64
    CollectionID   int64
    PartitionID    int64
    InsertChannel  string
    NumOfRows      int64
    State          commonpb.SegmentState  // Growing/Sealed/Flushed
    MaxRowNum      int64
    LastExpireTime uint64
    StartPosition  *msgpb.MsgPosition
    DmlPosition    *msgpb.MsgPosition
    Binlogs        []*FieldBinlog
    Statslogs      []*FieldBinlog
    Deltalogs      []*FieldBinlog
}

2. 数据结构

2.1 核心数据结构UML

classDiagram
    class Server {
        +meta *meta
        +segmentManager Manager
        +channelManager ChannelManager
        +compactionTrigger trigger
        +garbageCollector *garbageCollector
        
        +AssignSegmentID(req) response
        +Flush(req) response
    }
    
    class meta {
        -sync.RWMutex mu
        -map segments
        -map collections
        -Catalog catalog
        
        +AddSegment(segment)
        +UpdateSegment(segment)
        +GetSegment(id) segment
    }
    
    class SegmentInfo {
        +int64 SegmentID
        +int64 CollectionID
        +SegmentState State
        +int64 NumRows
        +int64 MaxRowNum
        +string InsertChannel
        +[]FieldBinlog Binlogs
    }
    
    class SegmentManager {
        -meta *meta
        -allocator Allocator
        
        +AllocSegment(collID, count) allocation
        +SealSegment(segID)
        +DropSegment(segID)
    }
    
    class ChannelManager {
        -map channels
        -NodeManager nodeManager
        
        +Watch(channel) error
        +Release(channel)
        +GetDataNode(channel) nodeID
    }
    
    Server *-- meta
    Server *-- SegmentManager
    Server *-- ChannelManager
    meta *-- SegmentInfo
    SegmentManager --> meta

2.2 SegmentInfo详解

type SegmentInfo struct {
    // 标识
    SegmentID    int64
    CollectionID int64
    PartitionID  int64
    
    // 状态
    State        commonpb.SegmentState  // Growing/Sealed/Flushed/Dropped
    
    // 容量
    NumOfRows    int64   // 当前行数
    MaxRowNum    int64   // 最大行数(默认100万)
    
    // Channel
    InsertChannel string  // 所属DML Channel
    
    // 时间信息
    StartPosition  *msgpb.MsgPosition  // 起始消息位置
    DmlPosition    *msgpb.MsgPosition  // 最新DML位置
    LastExpireTime uint64              // 最后过期时间
    
    // Binlog文件
    Binlogs    []*FieldBinlog  // Insert Log
    Statslogs  []*FieldBinlog  // Stats Log
    Deltalogs  []*FieldBinlog  // Delete Log
    
    // 索引
    IndexInfos []*IndexInfo
    
    // Compaction
    CompactionFrom []int64  // 由哪些Segment Compact而来
    
    // 大小
    Size int64  // 字节数
}

// FieldBinlog 字段Binlog
type FieldBinlog struct {
    FieldID int64
    Binlogs []*Binlog
}

type Binlog struct {
    EntriesNum    int64
    TimestampFrom uint64
    TimestampTo   uint64
    LogPath       string  // Object Storage路径
    LogSize       int64
}

2.3 Segment状态机

Growing → Sealed → Flushing → Flushed → Indexed → Dropped

状态说明:
- Growing: 内存中,接受新数据
- Sealed: 已封闭,不再接受新数据
- Flushing: Flush进行中
- Flushed: 已持久化到Object Storage
- Indexed: 索引构建完成
- Dropped: 已标记删除,等待GC

2.4 SegmentManager

type SegmentManager struct {
    meta      *meta
    allocator allocator.Allocator
    
    mu sync.RWMutex
    // Channel → Growing Segments
    segments map[string][]*SegmentInfo
}

// AllocSegment 分配Segment
func (sm *SegmentManager) AllocSegment(ctx context.Context, collID, partID int64, channelName string, count int64, version int64) ([]*Allocation, error) {
    // 1. 查找未满的Growing Segment
    segment := sm.getGrowingSegment(channelName, collID, partID)
    
    if segment != nil && segment.AvailableSize() >= count {
        // 复用现有Segment
        return []*Allocation{{
            SegmentID:  segment.ID,
            NumOfRows:  count,
            ExpireTime: time.Now().Add(SegmentExpireDuration).Unix(),
        }}, nil
    }
    
    // 2. 创建新Segment
    newSegID, err := sm.allocator.allocID(ctx)
    if err != nil {
        return nil, err
    }
    
    newSegment := &SegmentInfo{
        SegmentID:     newSegID,
        CollectionID:  collID,
        PartitionID:   partID,
        InsertChannel: channelName,
        State:         commonpb.SegmentState_Growing,
        MaxRowNum:     Params.SegmentMaxSize,
        NumOfRows:     0,
    }
    
    // 3. 持久化
    err = sm.meta.AddSegment(ctx, newSegment)
    
    return []*Allocation{{SegmentID: newSegID, NumOfRows: count}}, nil
}

3. 关键时序图

3.1 AssignSegmentID时序图

sequenceDiagram
    participant P as Proxy
    participant DC as DataCoord
    participant SM as SegmentManager
    participant Meta as Meta
    participant ETCD as etcd
    
    P->>DC: AssignSegmentID(coll, channel, count=1000)
    DC->>SM: AllocSegment(coll, partition, channel, 1000)
    
    SM->>Meta: GetGrowingSegment(channel)
    Meta-->>SM: segment or nil
    
    alt Segment存在且未满
        SM-->>DC: [existing_seg_id]
    else 创建新Segment
        SM->>DC: AllocID()
        DC-->>SM: new_seg_id
        
        SM->>Meta: AddSegment(new_segment)
        Meta->>ETCD: Save(segment_meta)
        ETCD-->>Meta: OK
        Meta-->>SM: Success
        
        SM-->>DC: [new_seg_id]
    end
    
    DC-->>P: SegmentIDAssignment

3.2 Flush时序图

sequenceDiagram
    participant DN as DataNode
    participant DC as DataCoord
    participant Meta as Meta
    participant Cluster as Cluster(DataNode Manager)
    participant QC as QueryCoord
    
    DN->>DC: DataNodeTtMsg(channel, ts)
    Note over DC: 周期性收到DataNode心跳
    
    DC->>DC: GetFlushableSegments(channel, ts)
    Note over DC: 过滤:size>threshold 或 time>timeout
    
    alt 有可Flush的Segment
        DC->>Meta: UpdateSegmentState(segments, Sealed)
        Meta-->>DC: Success
        
        DC->>Cluster: Flush(nodeID, channel, segments)
        Cluster->>DN: FlushSegments RPC
        
        DN->>DN: 序列化数据
        DN->>DN: 写入Object Storage(Binlog)
        DN->>DC: FlushCompleted(segments, binlog_paths)
        
        DC->>Meta: UpdateSegment(state=Flushed, binlogs)
        Meta-->>DC: Success
        
        DC->>QC: SegmentFlushCompleted(segments)
        Note over QC: 触发Handoff
    end

3.3 Compaction时序图

sequenceDiagram
    participant CT as CompactionTrigger
    participant CI as CompactionInspector
    participant Meta as Meta
    participant DN as DataNode
    participant OS as Object Storage
    
    loop 每10分钟
        CT->>Meta: 扫描所有Segment
        Meta-->>CT: [all_segments]
        
        CT->>CT: 计算Compaction收益
        
        alt 分数>阈值
            CT->>CI: SubmitTask(CompactionPlan)
            CI->>Meta: GetSegmentDetails
            Meta-->>CI: [segment_details]
            
            CI->>DN: Compaction(plan)
            DN->>OS: 读取旧Segment Binlog
            OS-->>DN: binlog_data
            
            DN->>DN: 合并数据、删除重复/已删除记录
            DN->>OS: 写入新Segment Binlog
            OS-->>DN: Success
            
            DN->>CI: CompactionCompleted(new_segment)
            
            CI->>Meta: AddSegment(new_segment)
            CI->>Meta: DropSegment(old_segments)
            Meta-->>CI: Success
        end
    end

3.4 GC流程时序图

sequenceDiagram
    participant GC as GarbageCollector
    participant Meta as Meta
    participant OS as Object Storage
    
    loop 每30分钟
        GC->>Meta: 扫描Dropped Segment
        Meta-->>GC: [seg1, seg2, ...]
        
        loop 每个Segment
            GC->>Meta: GetSegmentBinlogs(segID)
            Meta-->>GC: [binlog_paths]
            
            GC->>OS: DeleteFiles(binlog_paths)
            OS-->>GC: Success
            
            GC->>Meta: RemoveSegment(segID)
            Meta-->>GC: Success
        end
    end

4. 关键算法

4.1 Segment分配策略

// 优先级:
// 1. 优先复用未满的Growing Segment(减少小文件)
// 2. 单个Channel串行分配(避免并发冲突)
// 3. 预留10%容量(避免频繁Seal)

func (sm *SegmentManager) selectOrCreateSegment(channel string, count int64) (*SegmentInfo, error) {
    segments := sm.getGrowingSegments(channel)
    
    // 按可用空间排序,优先选择最满的Segment
    sort.Slice(segments, func(i, j int) bool {
        return segments[i].AvailableSize() < segments[j].AvailableSize()
    })
    
    for _, seg := range segments {
        if seg.AvailableSize() >= count {
            return seg, nil
        }
    }
    
    // 无可用Segment,创建新的
    return sm.createSegment(channel, count)
}

4.2 Flush触发条件

func (sm *SegmentManager) GetFlushableSegments(channel string, ts Timestamp) ([]int64, error) {
    segments := sm.getGrowingSegments(channel)
    flushable := []int64{}
    
    for _, seg := range segments {
        // 条件1:大小超过阈值(512MB)
        if seg.Size >= SegmentMaxSize {
            flushable = append(flushable, seg.ID)
            continue
        }
        
        // 条件2:行数超过阈值(100万)
        if seg.NumOfRows >= SegmentMaxRows {
            flushable = append(flushable, seg.ID)
            continue
        }
        
        // 条件3:时间超过阈值(10分钟)
        duration := time.Since(seg.CreateTime)
        if duration > SegmentFlushTimeout {
            flushable = append(flushable, seg.ID)
            continue
        }
    }
    
    return flushable, nil
}

4.3 Compaction优先级

func (ct *compactionTrigger) calculatePriority(segments []*SegmentInfo) float64 {
    var totalSize, totalRows, deletedRows int64
    var smallFileCount int
    
    for _, seg := range segments {
        totalSize += seg.Size
        totalRows += seg.NumRows
        deletedRows += seg.DeletedRows
        
        if seg.Size < SmallSegmentThreshold {
            smallFileCount++
        }
    }
    
    deleteRatio := float64(deletedRows) / float64(totalRows)
    fragmentScore := float64(smallFileCount) / float64(len(segments))
    ageScore := float64(time.Since(segments[0].LastCompactionTime).Hours()) / (30 * 24)
    sizeScore := float64(totalSize) / (10 * 1024 * 1024 * 1024)
    
    // 加权计算优先级
    return deleteRatio*0.4 + fragmentScore*0.3 + ageScore*0.2 + sizeScore*0.1
}