Milvus-02-RootCoord-概览

1. 模块概述

1.1 职责定义

RootCoord(根协调器)是Milvus的核心控制平面组件,负责元数据管理、全局ID/时间戳分配和DDL操作协调。

核心职责

  1. DDL操作管理

    • CreateCollection/DropCollection
    • CreatePartition/DropPartition
    • CreateDatabase/DropDatabase
    • CreateIndex/DropIndex
  2. TSO服务(Timestamp Oracle)

    • 全局时间戳分配
    • 保证分布式事务顺序
    • MVCC版本控制基础
  3. ID分配服务

    • CollectionID、PartitionID分配
    • SegmentID、RowID分配(通过GlobalIDAllocator)
  4. 元数据管理

    • Collection Schema维护
    • Database/Partition信息
    • 用户权限(RBAC)
  5. 配额与限流

    • 全局配额管理
    • 集合级别限流策略
    • 资源使用统计

1.2 整体服务架构图

flowchart TB
    subgraph Client["客户端层"]
        SDK[Milvus SDK]
        APP[Application]
    end
    
    subgraph Proxy["Proxy层"]
        P1[Proxy-1]
        P2[Proxy-2]
        PN[Proxy-N]
        PCache[GlobalMetaCache<br/>元数据缓存]
    end
    
    subgraph RootCoord["RootCoord核心"]
        API[gRPC API Layer]
        
        subgraph Core["核心组件"]
            Scheduler[Scheduler<br/>DDL调度器]
            Broker[Broker<br/>组件通信层]
            StepExecutor[StepExecutor<br/>步骤执行器]
            DdlLock[DdlTsLockManager<br/>DDL时间戳锁]
        end
        
        subgraph Allocator["资源分配器"]
            TSO[TSOAllocator<br/>时间戳分配]
            IDA[IDAllocator<br/>全局ID分配]
        end
        
        subgraph MetaStore["元数据管理"]
            MetaTable[MetaTable<br/>内存缓存]
            Catalog[Catalog<br/>持久化层]
        end
        
        subgraph Background["后台服务"]
            QuotaCenter[QuotaCenter<br/>配额管理]
            GarbageCollector[GarbageCollector<br/>垃圾回收]
            TimeTickSync[TimeTickSync<br/>时间同步]
        end
    end
    
    subgraph Coordinator["其他Coordinator"]
        DataCoord[DataCoord<br/>数据管理]
        QueryCoord[QueryCoord<br/>查询管理]
    end
    
    subgraph Storage["存储层"]
        ETCD[etcd/TiKV<br/>元数据存储]
        MinIO[MinIO/S3<br/>对象存储]
        MQ[Pulsar/Kafka<br/>消息队列]
    end
    
    %% 请求流程
    SDK --> P1 & P2 & PN
    APP --> P1 & P2 & PN
    P1 & P2 & PN --> API
    
    %% 内部调用链
    API --> Scheduler
    Scheduler --> TSO
    Scheduler --> IDA
    Scheduler --> DdlLock
    Scheduler --> StepExecutor
    
    StepExecutor --> MetaTable
    StepExecutor --> Broker
    
    MetaTable --> Catalog
    Catalog <--> ETCD
    
    Broker --> DataCoord
    Broker --> QueryCoord
    
    %% 后台服务
    QuotaCenter --> P1 & P2 & PN
    GarbageCollector --> DataCoord
    TimeTickSync --> MQ
    TimeTickSync --> TSO
    
    %% DataCoord交互
    DataCoord --> MQ
    DataCoord --> MinIO
    
    %% 缓存失效
    API -.->|InvalidateCache| PCache
    
    style RootCoord fill:#e1f5ff,stroke:#333,stroke-width:3px
    style Core fill:#fff4e6,stroke:#333,stroke-width:2px
    style Allocator fill:#f3e5f5,stroke:#333,stroke-width:2px
    style MetaStore fill:#e8f5e9,stroke:#333,stroke-width:2px
    style Background fill:#fce4ec,stroke:#333,stroke-width:2px

架构说明

  1. 客户端层:SDK/应用通过gRPC与Proxy通信
  2. Proxy层:请求代理、负载均衡、元数据缓存
  3. RootCoord核心
    • API Layer:gRPC接口入口,参数校验
    • Scheduler:DDL任务串行化调度,保证元数据一致性
    • Broker:与DataCoord/QueryCoord通信的抽象层
    • StepExecutor:执行DDL步骤,支持事务回滚
    • TSOAllocator:全局时间戳分配,支持MVCC
    • IDAllocator:全局唯一ID分配
    • MetaTable:Collection/Database元数据内存缓存
    • Catalog:元数据持久化层(etcd/TiKV)
  4. 其他Coordinator:负责具体数据/查询管理
  5. 存储层:元数据存储(etcd)、对象存储(S3)、消息队列(Pulsar)

1.3 核心API

API 功能 调用频率 重要性 复杂度
CreateCollection 创建集合 ⭐⭐⭐⭐⭐ 高(跨组件协调)
DropCollection 删除集合 ⭐⭐⭐⭐ 高(级联删除)
DescribeCollection 查询Collection元信息 ⭐⭐⭐⭐⭐ 低(缓存查询)
AllocTimestamp 分配时间戳 极高 ⭐⭐⭐⭐⭐ 低(本地生成)
AllocID 分配全局ID ⭐⭐⭐⭐ 低(批量预分配)
CreatePartition 创建分区 ⭐⭐⭐ 中(元数据更新)
CreateDatabase 创建数据库 ⭐⭐⭐ 低(元数据写入)

1.4 TSO机制

TSO(Timestamp Oracle)原理

sequenceDiagram
    participant C as Client/Proxy
    participant RC as RootCoord
    participant TSO as TSO Allocator
    participant ETCD as etcd
    
    C->>RC: AllocTimestamp(count=1)
    RC->>TSO: GenerateTSO(1)
    
    alt 缓存足够
        TSO-->>RC: ts=12345678
    else 需要预分配
        TSO->>ETCD: UpdateTSO(+10000)
        ETCD-->>TSO: OK
        TSO->>TSO: 缓存10000个TS
        TSO-->>RC: ts=12345678
    end
    
    RC-->>C: Timestamp

TSO格式(64位):

|<--  Physical Time (46 bits) -->|<-- Logical Counter (18 bits) -->|
|   毫秒级物理时间戳                |   逻辑计数器(单位时间内的序号)    |

特性

  • 全局单调递增:保证分布式顺序
  • 物理时间关联:便于调试和时间旅行
  • 高吞吐:批量分配,减少etcd访问

2. 核心流程详解

2.1 CreateCollection完整调用链路

调用路径:Client → Proxy → RootCoord API → Scheduler → Task → StepExecutor → Broker → DataCoord/QueryCoord

2.1.1 上游接口(API层)

入口函数Core.CreateCollection()

// 路径:internal/rootcoord/root_coord.go
func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
    // 1. 健康检查
    if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
        return merr.Status(err), nil
    }
    
    // 2. 创建任务对象
    t := &createCollectionTask{
        baseTask: newBaseTask(ctx, c),
        Req:      in,
    }
    
    // 3. 提交到调度器(关键:串行化)
    if err := c.scheduler.AddTask(t); err != nil {
        return merr.Status(err), nil
    }
    
    // 4. 等待任务完成
    if err := t.WaitToFinish(); err != nil {
        return merr.Status(err), nil
    }
    
    return merr.Success(), nil
}

关键点

  • 健康检查确保RootCoord可用
  • 创建任务对象封装请求
  • 提交到Scheduler进行串行化处理
  • 同步等待任务完成(阻塞调用)

2.1.2 任务调度(Scheduler层)

Scheduler.AddTask() 调用链

// 路径:internal/rootcoord/scheduler.go
func (s *scheduler) AddTask(task task) error {
    // 1. 获取锁(保证原子性)
    s.lock.Lock()
    defer s.lock.Unlock()
    
    // 2. 分配任务ID
    if err := s.setID(task); err != nil {
        return err
    }
    
    // 3. 分配时间戳(DDL顺序保证)
    if err := s.setTs(task); err != nil {
        return err
    }
    s.taskHeap.Push(task)
    
    // 4. 将任务放入队列
    s.enqueue(task)
    return nil
}

// 任务循环(串行执行)
func (s *scheduler) taskLoop() {
    for {
        select {
        case task := <-s.taskChan:
            s.execute(task)  // 串行执行,保证顺序性
        }
    }
}

func (s *scheduler) execute(task task) {
    // 1. Prepare阶段:参数校验、资源预分配
    if err := task.Prepare(task.GetCtx()); err != nil {
        task.NotifyDone(err)
        return
    }
    
    // 2. Execute阶段:实际执行
    err := task.Execute(task.GetCtx())
    task.NotifyDone(err)
    
    // 3. 更新最小DDL时间戳
    s.setMinDdlTs()
}

关键设计

  • 串行化保证:任务通过单一channel串行处理
  • 时间戳顺序:每个任务分配单调递增的TS
  • 原子性保证:分配ID和TS是原子操作

2.1.3 任务执行(Task层)

createCollectionTask.Prepare() - 准备阶段

// 路径:internal/rootcoord/create_collection_task.go
func (t *createCollectionTask) Prepare(ctx context.Context) error {
    // 1. 获取Database信息
    db, err := t.core.meta.GetDatabaseByName(ctx, t.Req.GetDbName(), typeutil.MaxTimestamp)
    t.dbID = db.ID
    
    // 2. 参数校验
    if err := t.validate(ctx); err != nil {
        return err
    }
    
    // 3. 解析和校验Schema
    if err := t.prepareSchema(ctx); err != nil {
        return err
    }
    
    // 4. 分配CollectionID
    if err := t.assignCollectionID(); err != nil {
        return err
    }
    
    // 5. 分配PartitionIDs(默认分区 + Partition Key分区)
    if err := t.assignPartitionIDs(ctx); err != nil {
        return err
    }
    
    // 6. 分配VirtualChannels(Shard数量)
    return t.assignChannels()
}

createCollectionTask.Execute() - 执行阶段

func (t *createCollectionTask) Execute(ctx context.Context) error {
    // 1. 构建Collection模型
    collInfo := model.Collection{
        CollectionID:         t.collID,
        DBID:                 t.dbID,
        Name:                 t.schema.Name,
        Schema:               t.schema,
        VirtualChannelNames:  t.channels.virtualChannels,
        PhysicalChannelNames: t.channels.physicalChannels,
        ShardsNum:            t.Req.ShardsNum,
        State:                pb.CollectionState_CollectionCreating,
        Partitions:           partitions,
        CreateTime:           ts,
    }
    
    // 2. 幂等性检查
    existedColl, err := t.core.meta.GetCollectionByName(ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), typeutil.MaxTimestamp)
    if err == nil {
        // 已存在,检查是否相同
        if !existedColl.Equal(collInfo) {
            return fmt.Errorf("collection already exists with different params")
        }
        return nil  // 幂等返回
    }
    
    // 3. 添加Channel并获取StartPositions
    startPositions, err := t.addChannelsAndGetStartPositions(ctx, ts)
    collInfo.StartPositions = toKeyDataPairs(startPositions)
    
    // 4. 执行Step步骤(事务式)
    return executeCreateCollectionTaskSteps(ctx, t.core, &collInfo, t.Req.GetDbName(), t.dbProperties, ts)
}

2.1.4 步骤执行(StepExecutor层)

executeCreateCollectionTaskSteps() - 事务式步骤执行

func executeCreateCollectionTaskSteps(ctx context.Context, core *Core, col *model.Collection, dbName string, dbProperties []*commonpb.KeyValuePair, ts Timestamp) error {
    undoTask := newBaseUndoTask(core.stepExecutor)
    
    // Step 1: 失效Proxy缓存
    undoTask.AddStep(
        &expireCacheStep{...},      // do: 发送失效通知
        &nullStep{},                // undo: 无操作
    )
    
    // Step 2: 添加Collection元数据
    undoTask.AddStep(
        &addCollectionMetaStep{coll: col},      // do: 写入etcd
        &deleteCollectionMetaStep{collectionID: col.CollectionID},  // undo: 删除元数据
    )
    
    // Step 3: 注册DML Channels
    undoTask.AddStep(
        &nullStep{},                // do: 无操作(已在Execute中完成)
        &removeDmlChannelsStep{pChannels: col.PhysicalChannelNames},  // undo: 移除Channel
    )
    
    // Step 4: 通知DataCoord WatchChannels
    undoTask.AddStep(
        &watchChannelsStep{info: watchInfo{...}},  // do: 调用DataCoord
        &unwatchChannelsStep{...},                  // undo: 取消Watch
    )
    
    // Step 5: 修改Collection状态为Created
    undoTask.AddStep(
        &changeCollectionStateStep{state: CollectionCreated},  // do: 标记为Created
        &nullStep{},                                             // undo: 无操作
    )
    
    // 执行所有步骤(遇错则回滚)
    return undoTask.Execute(ctx)
}

Undo机制

// 路径:internal/rootcoord/undo.go
func (b *baseUndoTask) Execute(ctx context.Context) error {
    for i := 0; i < len(b.todoStep); i++ {
        todoStep := b.todoStep[i]
        
        // 执行正向步骤
        if _, err := todoStep.Execute(ctx); err != nil {
            log.Warn("step failed, starting undo", zap.Error(err))
            
            // 收集已执行的undo步骤
            undoSteps := b.undoStep[:i]
            
            // 异步执行undo(通过StepExecutor)
            go b.stepExecutor.AddSteps(&stepStack{undoSteps})
            return err
        }
    }
    return nil
}

2.1.5 Broker与其他组件交互

Broker.WatchChannels() - 通知DataCoord

// 路径:internal/rootcoord/broker.go
func (b *ServerBroker) WatchChannels(ctx context.Context, info *watchInfo) error {
    resp, err := b.s.mixCoord.WatchChannels(ctx, &datapb.WatchChannelsRequest{
        CollectionID:    info.collectionID,
        ChannelNames:    info.vChannels,
        StartPositions:  info.startPositions,
        Schema:          info.schema,
        CreateTimestamp: info.ts,
    })
    
    if err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
        return fmt.Errorf("failed to watch channels: %v", err)
    }
    
    return nil
}

2.1.6 完整时序图

sequenceDiagram
    autonumber
    participant Client
    participant Proxy
    participant RC as RootCoord
    participant Scheduler
    participant Task
    participant StepExec as StepExecutor
    participant IDA as IDAllocator
    participant TSO as TSOAllocator
    participant Meta as MetaTable
    participant ETCD as etcd
    participant Broker
    participant DC as DataCoord
    participant MQ as MessageQueue
    
    Client->>Proxy: CreateCollection(name, schema)
    Proxy->>RC: CreateCollection RPC
    
    rect rgb(240, 248, 255)
    Note over RC,Scheduler: API层 - 任务创建与提交
    RC->>RC: 健康检查
    RC->>Task: new createCollectionTask
    RC->>Scheduler: AddTask(task)
    end
    
    rect rgb(255, 250, 240)
    Note over Scheduler,TSO: Scheduler层 - 资源分配
    Scheduler->>IDA: AllocOne()
    IDA->>ETCD: CompareAndSwap(idKey)
    ETCD-->>IDA: OK
    IDA-->>Scheduler: taskID
    
    Scheduler->>TSO: GenerateTSO(1)
    TSO-->>Scheduler: timestamp
    
    Scheduler->>Scheduler: taskQueue.enqueue(task)
    end
    
    rect rgb(243, 229, 245)
    Note over Scheduler,Task: Task层 - Prepare阶段
    Scheduler->>Task: Prepare(ctx)
    Task->>Meta: GetDatabaseByName()
    Meta-->>Task: Database
    
    Task->>Task: validate(schema, shards, limits)
    Task->>Task: prepareSchema(解析Schema)
    
    Task->>IDA: AllocOne()
    IDA-->>Task: collectionID
    
    Task->>IDA: Alloc(partitionCount)
    IDA-->>Task: partitionIDs
    
    Task->>Task: assignChannels(shardsNum)
    Task-->>Scheduler: Prepare完成
    end
    
    rect rgb(232, 245, 233)
    Note over Scheduler,MQ: Task层 - Execute阶段
    Scheduler->>Task: Execute(ctx)
    
    Task->>Task: 构建Collection模型
    Task->>Meta: GetCollectionByName(幂等检查)
    Meta-->>Task: NotFound
    
    Task->>MQ: BroadcastCreateCollectionMsg(获取StartPosition)
    MQ-->>Task: startPositions
    
    Task->>StepExec: executeSteps(undoTask)
    end
    
    rect rgb(255, 228, 225)
    Note over StepExec,DC: StepExecutor层 - 事务式执行
    StepExec->>Proxy: ExpireCacheStep
    Proxy->>Proxy: InvalidateCache
    
    StepExec->>Meta: AddCollectionMetaStep
    Meta->>ETCD: Save(/collections/{id})
    ETCD-->>Meta: OK
    Meta->>Meta: 更新内存缓存
    
    StepExec->>Broker: WatchChannelsStep
    Broker->>DC: WatchChannels(collectionID, vChannels)
    DC->>DC: 创建FlowGraph
    DC->>MQ: Subscribe(vChannels)
    DC-->>Broker: Success
    
    StepExec->>Meta: ChangeCollectionStateStep
    Meta->>ETCD: Update(state=CollectionCreated)
    ETCD-->>Meta: OK
    end
    
    StepExec-->>Task: 所有步骤成功
    Task-->>Scheduler: Execute完成
    Scheduler->>Task: NotifyDone(nil)
    Task-->>RC: WaitToFinish返回
    RC-->>Proxy: Status(Success)
    Proxy-->>Client: CreateCollection成功

2.1.7 时序图说明

关键流程点

  1. 步骤1-4(API层):Client通过Proxy转发CreateCollection请求到RootCoord
  2. 步骤5-11(Scheduler层):任务进入调度器,分配taskID和timestamp,保证串行执行
  3. 步骤12-22(Prepare阶段)
    • 获取Database元数据
    • 校验参数(分片数、Collection数量限制等)
    • 解析和校验Schema
    • 分配CollectionID、PartitionIDs、VirtualChannels
  4. 步骤23-29(Execute阶段)
    • 构建Collection模型
    • 幂等性检查(避免重复创建)
    • 广播CreateCollectionMsg到消息队列,获取StartPosition
  5. 步骤30-43(StepExecutor层 - 事务式执行)
    • ExpireCacheStep:通知Proxy清理缓存
    • AddCollectionMetaStep:将Collection元数据写入etcd
    • WatchChannelsStep:通知DataCoord监听VirtualChannels
    • ChangeCollectionStateStep:修改Collection状态为Created

异常处理

  • 任何Step失败都会触发Undo机制
  • Undo步骤异步执行,不阻塞后续DDL
  • 通过状态机保证Collection状态一致性

性能优化

  • Prepare阶段提前校验,减少失败成本
  • 批量分配PartitionIDs,减少etcd访问
  • StepExecutor异步执行Undo,提升吞吐

2.2 AllocTimestamp完整调用链路

调用路径:Proxy → RootCoord API → TSOAllocator → etcd(定期持久化)

2.2.1 上游接口(API层)

入口函数Core.AllocTimestamp()

// 路径:internal/rootcoord/root_coord.go
func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
    // 1. 健康检查
    if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
        return &rootcoordpb.AllocTimestampResponse{
            Status: merr.Status(err),
        }, nil
    }
    
    // 2. BlockTimestamp处理(等待物理时间)
    if in.BlockTimestamp > 0 {
        blockTime, _ := tsoutil.ParseTS(in.BlockTimestamp)
        lastTime := c.tsoAllocator.GetLastSavedTime()
        deltaDuration := blockTime.Sub(lastTime)
        if deltaDuration > 0 {
            time.Sleep(deltaDuration + 200*time.Millisecond)
        }
    }
    
    // 3. 生成TSO(核心逻辑)
    ts, err := c.tsoAllocator.GenerateTSO(in.GetCount())
    if err != nil {
        return &rootcoordpb.AllocTimestampResponse{
            Status: merr.Status(err),
        }, nil
    }
    
    // 4. 返回第一个可用时间戳
    ts = ts - uint64(in.GetCount()) + 1
    
    return &rootcoordpb.AllocTimestampResponse{
        Status:    merr.Success(),
        Timestamp: ts,
        Count:     in.GetCount(),
    }, nil
}

关键点

  • 极低延迟:P99 < 10ms(无etcd访问)
  • BlockTimestamp:用于CDC场景,等待物理时间
  • 批量分配:一次请求可分配多个连续TS
  • 返回起始TS:客户端可使用[ts, ts+count-1]范围

2.2.2 TSOAllocator核心实现

GlobalTSOAllocator结构体

// 路径:internal/tso/global_allocator.go
type GlobalTSOAllocator struct {
    mu sync.Mutex
    
    // TSO状态
    lastPhysical int64   // 上次物理时间(毫秒)
    lastLogical  int64   // 上次逻辑计数器
    maxLogical   int64   // 最大逻辑计数器(262144 = 2^18)
    
    // 持久化
    kvBase kv.TxnKV      // etcd/TiKV客户端
    key    string        // TSO存储key
    
    // 更新策略
    updateInterval time.Duration  // 50ms
    saveInterval   time.Duration  // 3s
    lastSaveTime   time.Time
}

GenerateTSO() - 核心算法

func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) {
    gta.mu.Lock()
    defer gta.mu.Unlock()
    
    // 1. 获取当前物理时间
    physical := time.Now().UnixMilli()
    
    // 2. 物理时间前进,重置逻辑计数器
    if physical > gta.lastPhysical {
        gta.lastPhysical = physical
        gta.lastLogical = 0
    }
    
    // 3. 检查逻辑计数器是否溢出
    if gta.lastLogical+int64(count) >= gta.maxLogical {
        // 等待下一毫秒
        time.Sleep(time.Millisecond)
        gta.lastPhysical = time.Now().UnixMilli()
        gta.lastLogical = 0
    }
    
    // 4. 生成TSO(物理时间 | 逻辑计数器)
    ts := uint64(gta.lastPhysical)<<18 | uint64(gta.lastLogical)
    gta.lastLogical += int64(count)
    
    // 5. 定期持久化到etcd(每3秒)
    if time.Since(gta.lastSaveTime) > gta.saveInterval {
        gta.kvBase.Save(gta.key, gta.lastPhysical)
        gta.lastSaveTime = time.Now()
    }
    
    return ts, nil
}

TSO格式详解

+-------------------+-------------------+
|  Physical (46位)  | Logical (18位)    |
+-------------------+-------------------+
|  毫秒级时间戳      | 逻辑计数器         |
|  (0-70,368,744,177,663ms) | (0-262,143)  |
+-------------------+-------------------+

示例:
  时间:2024-01-01 00:00:00.000
  Physical: 1704067200000 (毫秒)
  Logical:  12345
  
  TSO计算:
  ts = (1704067200000 << 18) | 12345
     = 446676598505558329
  
  反向解析:
  physical = ts >> 18 = 1704067200000
  logical  = ts & 0x3FFFF = 12345

性能特性

容量上限:
- 每毫秒最多262,143个TSO
- 理论QPS:262,143,000(2.6亿/秒)
- 实际QPS:>100万/秒(单RootCoord,受锁限制)

持久化策略:
- 每3秒持久化一次physical到etcd
- RootCoord重启后从etcd恢复physical
- 恢复时:physical = saved_physical + 3000ms
- 保证重启后TSO单调递增

特殊情况处理:
1. 逻辑计数器溢出:sleep(1ms)等待下一毫秒
2. 时钟回拨:拒绝分配,返回错误
3. etcd故障:继续分配(最多3秒内的TSO可能丢失)

2.2.3 时序图

sequenceDiagram
    autonumber
    participant Proxy
    participant RC as RootCoord
    participant TSO as TSOAllocator
    participant ETCD as etcd
    
    Note over TSO,ETCD: 后台定期持久化
    loop 每3秒
        TSO->>ETCD: Save(tsoKey, lastPhysical)
        Note over TSO: 持久化物理时间<br/>防止重启后时钟回退
    end
    
    rect rgb(240, 248, 255)
    Note over Proxy,RC: 高频TSO分配请求
    Proxy->>RC: AllocTimestamp(count=10)
    RC->>RC: 健康检查
    RC->>TSO: GenerateTSO(10)
    end
    
    rect rgb(255, 250, 240)
    Note over TSO: TSO生成算法(本地)
    TSO->>TSO: 获取当前时间 physical=now()
    
    alt 物理时间前进
        TSO->>TSO: lastPhysical=physical<br/>lastLogical=0
    end
    
    alt 逻辑计数器溢出
        TSO->>TSO: sleep(1ms)等待下一毫秒
        TSO->>TSO: lastPhysical=now()<br/>lastLogical=0
    end
    
    TSO->>TSO: ts = (lastPhysical << 18) | lastLogical
    TSO->>TSO: lastLogical += 10
    end
    
    TSO-->>RC: ts (endTS)
    RC->>RC: firstTS = ts - count + 1
    RC-->>Proxy: Timestamp=firstTS, Count=10
    
    Note over Proxy: 使用范围:[firstTS, firstTS+9]
    
    rect rgb(255, 228, 225)
    Note over Proxy,ETCD: Proxy并发分配多个请求
    par 并发请求
        Proxy->>RC: AllocTimestamp(1)
        RC->>TSO: GenerateTSO(1)
        TSO-->>RC: ts1
    and
        Proxy->>RC: AllocTimestamp(1)
        RC->>TSO: GenerateTSO(1)
        TSO-->>RC: ts2
    and
        Proxy->>RC: AllocTimestamp(1)
        RC->>TSO: GenerateTSO(1)
        TSO-->>RC: ts3
    end
    
    Note over TSO: 所有TS单调递增<br/>ts1 < ts2 < ts3
    end

2.2.4 时序图说明

关键流程点

  1. 后台持久化(步骤1-2)

    • 每3秒持久化一次lastPhysical到etcd
    • 防止RootCoord重启后时钟回退
    • 恢复策略:physical = saved + 3000ms
  2. TSO分配请求(步骤3-6)

    • Proxy发起高频TSO请求
    • RootCoord健康检查
    • 调用TSOAllocator.GenerateTSO()
  3. TSO生成算法(步骤7-13)

    • 获取当前物理时间
    • 判断是否需要重置逻辑计数器(物理时间前进)
    • 判断是否溢出(逻辑计数器耗尽)
    • 位运算生成TSO:(physical << 18) | logical
    • 递增逻辑计数器
  4. 批量分配(步骤14-16)

    • 返回endTS,客户端计算firstTS
    • 客户端可使用[firstTS, firstTS+count-1]范围
    • 减少RPC次数,提升吞吐
  5. 并发保证(步骤17-25)

    • 多个Proxy并发请求TSO
    • GlobalTSOAllocator通过Mutex保证串行化
    • 所有TSO严格单调递增

性能优化

优化点1:本地生成,避免etcd访问
- 每次请求:0次etcd访问
- P99延迟:<10ms

优化点2:批量分配
- 单次请求可分配多个TS
- 减少RPC次数

优化点3:逻辑计数器
- 单个毫秒内可分配262,143个TS
- 避免频繁获取物理时间

优化点4:定期持久化
- 每3秒持久化1次(vs 每次请求)
- 性能提升:>1000倍

风险:
- RootCoord重启:最多丢失3秒内的TSO信息
- 影响:重启后跳过最多3秒(对MVCC无影响)

2.3 Scheduler调度机制详解

2.3.1 Scheduler架构图

flowchart TB
    subgraph API["API入口"]
        A1[CreateCollection]
        A2[DropCollection]
        A3[CreatePartition]
        AN[Other DDL APIs]
    end
    
    subgraph Scheduler["Scheduler调度器"]
        TQ[taskChan<br/>任务队列]
        TH[taskHeap<br/>任务堆]
        TL[taskLoop<br/>串行执行]
        
        subgraph Lock["锁管理"]
            CL[ClusterLock]
            DL[DatabaseLock]
            COL[CollectionLock]
        end
    end
    
    subgraph Allocators["资源分配器"]
        IDA[IDAllocator<br/>任务ID分配]
        TSO[TSOAllocator<br/>时间戳分配]
    end
    
    subgraph Tasks["任务执行"]
        T1[Task.Prepare<br/>参数校验]
        T2[Task.Execute<br/>实际执行]
        T3[Task.NotifyDone<br/>结果通知]
    end
    
    A1 & A2 & A3 & AN --> TQ
    TQ --> TH
    TH --> TL
    
    TL --> IDA
    TL --> TSO
    TL --> Lock
    
    TL --> T1
    T1 --> T2
    T2 --> T3
    
    style Scheduler fill:#e1f5ff,stroke:#333,stroke-width:2px
    style Lock fill:#fff4e6,stroke:#333,stroke-width:2px
    style Tasks fill:#f3e5f5,stroke:#333,stroke-width:2px

2.3.2 核心数据结构

Scheduler结构体

// 路径:internal/rootcoord/scheduler.go
type scheduler struct {
    ctx    context.Context
    cancel context.CancelFunc
    wg     sync.WaitGroup
    
    // 资源分配器
    idAllocator  allocator.Interface
    tsoAllocator tso.Allocator
    
    // 任务队列
    taskChan chan task          // 缓冲队列(容量10240)
    taskHeap typeutil.Heap[task]  // 最小堆(按TS排序)
    
    // 并发控制
    lock sync.Mutex
    
    // 最小DDL时间戳
    minDdlTs atomic.Uint64
    
    // 层级锁
    clusterLock    *lock.KeyLock[string]
    databaseLock   *lock.KeyLock[string]
    collectionLock *lock.KeyLock[string]
    lockMapping    map[LockLevel]*lock.KeyLock[string]
}

Task接口

// 路径:internal/rootcoord/task.go
type task interface {
    GetID() int64
    SetID(id int64)
    GetTs() Timestamp
    SetTs(ts Timestamp)
    GetType() commonpb.MsgType
    
    // 核心方法
    Prepare(ctx context.Context) error
    Execute(ctx context.Context) error
    
    // 异步通知
    WaitToFinish() error
    NotifyDone(err error)
    Notify(err error)
    
    // 锁管理
    GetLockerKey() LockerKey
}

2.3.3 调度流程

AddTask() - 任务提交

func (s *scheduler) AddTask(task task) error {
    // 1. 判断是否启用锁调度
    if Params.RootCoordCfg.UseLockScheduler.GetAsBool() {
        lockKey := task.GetLockerKey()
        if lockKey != nil {
            return s.executeTaskWithLock(task, lockKey)
        }
    }
    
    // 2. 默认串行化调度
    s.lock.Lock()
    defer s.lock.Unlock()
    
    // 3. 分配任务ID
    if err := s.setID(task); err != nil {
        return err
    }
    
    // 4. 分配时间戳
    if err := s.setTs(task); err != nil {
        return err
    }
    s.taskHeap.Push(task)
    
    // 5. 放入队列
    s.enqueue(task)
    return nil
}

taskLoop() - 串行执行循环

func (s *scheduler) taskLoop() {
    defer s.wg.Done()
    for {
        select {
        case <-s.ctx.Done():
            return
        case task := <-s.taskChan:
            s.execute(task)  // 串行执行
        }
    }
}

func (s *scheduler) execute(task task) {
    defer s.setMinDdlTs()  // 更新最小DDL时间戳
    
    // 1. 记录在队列中的时间
    task.SetInQueueDuration()
    
    // 2. Prepare阶段
    if err := task.Prepare(task.GetCtx()); err != nil {
        task.NotifyDone(err)
        return
    }
    
    // 3. Execute阶段
    err := task.Execute(task.GetCtx())
    
    // 4. 通知完成
    task.NotifyDone(err)
}

层级锁机制(可选)

func (s *scheduler) executeTaskWithLock(task task, lockerKey LockerKey) error {
    if lockerKey == nil {
        // 叶子节点,执行任务
        if err := s.setID(task); err != nil {
            return err
        }
        s.lock.Lock()
        if err := s.setTs(task); err != nil {
            s.lock.Unlock()
            return err
        }
        s.lock.Unlock()
        s.execute(task)
        return nil
    }
    
    // 获取对应层级的锁
    taskLock := s.lockMapping[lockerKey.Level()]
    if lockerKey.IsWLock() {
        taskLock.Lock(lockerKey.LockKey())
        defer taskLock.Unlock(lockerKey.LockKey())
    } else {
        taskLock.RLock(lockerKey.LockKey())
        defer taskLock.RUnlock(lockerKey.LockKey())
    }
    
    // 递归获取下一层锁
    return s.executeTaskWithLock(task, lockerKey.Next())
}

// 锁层级示例
type LockerKey interface {
    Level() LockLevel      // ClusterLock/DatabaseLock/CollectionLock
    LockKey() string       // 锁的键值
    IsWLock() bool         // 是否写锁
    Next() LockerKey       // 下一层锁
}

// CreateCollection的锁链:
// ClusterLock(RLock) -> DatabaseLock(RLock) -> CollectionLock(WLock)

2.3.4 MinDdlTs同步机制

syncTsLoop() - 定期更新最小DDL时间戳

func (s *scheduler) syncTsLoop() {
    defer s.wg.Done()
    ticker := time.NewTicker(Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond))
    defer ticker.Stop()
    
    for {
        select {
        case <-s.ctx.Done():
            return
        case <-ticker.C:
            s.updateLatestTsoAsMinDdlTs()
        }
    }
}

func (s *scheduler) updateLatestTsoAsMinDdlTs() {
    // 创建一个空任务,获取最新TSO
    t := newBaseTask(context.Background(), nil)
    if err := s.AddTask(&t); err != nil {
        log.Warn("failed to update latest ddl ts", zap.Error(err))
    }
}

func (s *scheduler) setMinDdlTs() {
    s.lock.Lock()
    defer s.lock.Unlock()
    
    // 从堆中移除已完成的任务
    for s.taskHeap.Len() > 0 && s.taskHeap.Peek().IsFinished() {
        t := s.taskHeap.Pop()
        s.minDdlTs.Store(t.GetTs())
    }
}

// 用途:TimeTick系统需要知道最小的未完成DDL时间戳
// 保证TimeTick不会超过正在执行的DDL的时间戳

2.3.5 时序图

sequenceDiagram
    autonumber
    participant A1 as DDL Task1
    participant A2 as DDL Task2
    participant A3 as DDL Task3
    participant Sch as Scheduler
    participant IDA as IDAllocator
    participant TSO as TSOAllocator
    participant Heap as TaskHeap
    participant Chan as TaskChan
    participant Loop as TaskLoop
    
    rect rgb(240, 248, 255)
    Note over A1,Sch: 并发提交多个DDL任务
    par 并发提交
        A1->>Sch: AddTask(createCollectionTask)
    and
        A2->>Sch: AddTask(dropCollectionTask)
    and
        A3->>Sch: AddTask(createPartitionTask)
    end
    end
    
    rect rgb(255, 250, 240)
    Note over Sch,Heap: 串行化处理(加锁)
    Sch->>Sch: lock.Lock()
    
    Sch->>IDA: AllocOne()
    IDA-->>Sch: taskID=1
    
    Sch->>TSO: GenerateTSO(1)
    TSO-->>Sch: ts=1000
    
    Sch->>Heap: Push(task1, ts=1000)
    Sch->>Chan: taskChan <- task1
    
    Sch->>Sch: lock.Unlock()
    end
    
    rect rgb(243, 229, 245)
    Note over Sch,Heap: 后续任务入队
    Sch->>Sch: lock.Lock()
    
    Sch->>IDA: AllocOne()
    IDA-->>Sch: taskID=2
    
    Sch->>TSO: GenerateTSO(1)
    TSO-->>Sch: ts=1001
    
    Sch->>Heap: Push(task2, ts=1001)
    Sch->>Chan: taskChan <- task2
    
    Sch->>Sch: lock.Unlock()
    end
    
    rect rgb(232, 245, 233)
    Note over Loop,A1: 串行执行Task1
    Loop->>Chan: <-taskChan
    Chan-->>Loop: task1
    
    Loop->>A1: Prepare(ctx)
    A1-->>Loop: nil
    
    Loop->>A1: Execute(ctx)
    A1->>A1: 执行DDL逻辑
    A1-->>Loop: nil
    
    Loop->>A1: NotifyDone(nil)
    A1->>A1: 唤醒WaitToFinish()
    
    Loop->>Heap: Pop已完成任务
    Loop->>Loop: minDdlTs.Store(1000)
    end
    
    rect rgb(255, 228, 225)
    Note over Loop,A2: 串行执行Task2
    Loop->>Chan: <-taskChan
    Chan-->>Loop: task2
    
    Loop->>A2: Prepare(ctx)
    A2-->>Loop: nil
    
    Loop->>A2: Execute(ctx)
    A2->>A2: 执行DDL逻辑
    A2-->>Loop: nil
    
    Loop->>A2: NotifyDone(nil)
    A2->>A2: 唤醒WaitToFinish()
    
    Loop->>Heap: Pop已完成任务
    Loop->>Loop: minDdlTs.Store(1001)
    end
    
    Note over Loop: 串行执行保证DDL顺序性

2.4 StepExecutor与Undo机制详解

2.4.1 StepExecutor架构图

flowchart TB
    subgraph Task["DDL Task"]
        T1[Task.Execute]
        T2[executeXXXTaskSteps]
    end
    
    subgraph UndoTask["UndoTask构建"]
        UT[newBaseUndoTask]
        
        subgraph Steps["步骤对(Do/Undo)"]
            S1[Step1: ExpireCache / NullStep]
            S2[Step2: AddMeta / DeleteMeta]
            S3[Step3: NullStep / RemoveChannels]
            S4[Step4: WatchChannels / UnwatchChannels]
            S5[Step5: ChangeState / NullStep]
        end
    end
    
    subgraph Executor["StepExecutor"]
        BG[bgStepExecutor<br/>后台执行器]
        SS[stepStack<br/>步骤栈]
        SP[selectPolicy<br/>优先级策略]
    end
    
    subgraph Execution["执行流程"]
        DO[执行Do步骤]
        CHK[检查错误]
        UNDO[触发Undo]
    end
    
    T1 --> T2
    T2 --> UT
    UT --> S1 & S2 & S3 & S4 & S5
    
    S1 & S2 & S3 & S4 & S5 --> DO
    DO --> CHK
    
    CHK -->|成功| S1
    CHK -->|失败| UNDO
    
    UNDO --> BG
    BG --> SS
    SS --> SP
    
    style UndoTask fill:#e1f5ff,stroke:#333,stroke-width:2px
    style Executor fill:#fff4e6,stroke:#333,stroke-width:2px
    style Execution fill:#f3e5f5,stroke:#333,stroke-width:2px

2.4.2 核心数据结构

baseUndoTask - Undo任务

// 路径:internal/rootcoord/undo.go
type baseUndoTask struct {
    todoStep     []nestedStep  // 正向步骤(Do)
    undoStep     []nestedStep  // 回滚步骤(Undo)
    stepExecutor StepExecutor   // 步骤执行器
}

func newBaseUndoTask(stepExecutor StepExecutor) *baseUndoTask {
    return &baseUndoTask{
        todoStep:     make([]nestedStep, 0),
        undoStep:     make([]nestedStep, 0),
        stepExecutor: stepExecutor,
    }
}

// 添加步骤对
func (b *baseUndoTask) AddStep(todoStep, undoStep nestedStep) {
    b.todoStep = append(b.todoStep, todoStep)
    b.undoStep = append(b.undoStep, undoStep)
}

nestedStep - 步骤接口

// 路径:internal/rootcoord/step.go
type nestedStep interface {
    Execute(ctx context.Context) ([]nestedStep, error)
    Desc() string          // 步骤描述
    Weight() stepPriority  // 优先级
}

type stepPriority int
const (
    stepPriorityLow       = 0
    stepPriorityNormal    = 1
    stepPriorityImportant = 10
    stepPriorityUrgent    = 1000
)

2.4.3 执行流程

baseUndoTask.Execute() - 事务式执行

func (b *baseUndoTask) Execute(ctx context.Context) error {
    if len(b.todoStep) != len(b.undoStep) {
        return errors.New("todo step and undo step length not equal")
    }
    
    for i := 0; i < len(b.todoStep); i++ {
        todoStep := b.todoStep[i]
        
        // 执行正向步骤
        if _, err := todoStep.Execute(ctx); err != nil {
            log.Warn("step failed, starting undo", 
                zap.Error(err), 
                zap.String("desc", todoStep.Desc()))
            
            // 收集已执行步骤的undo
            undoSteps := b.undoStep[:i]
            b.undoStep = nil  // 防止内存泄漏
            
            // 异步执行undo(不阻塞当前调用)
            go b.stepExecutor.AddSteps(&stepStack{undoSteps})
            return err
        }
    }
    return nil
}

bgStepExecutor - 后台执行器

// 路径:internal/rootcoord/step_executor.go
type bgStepExecutor struct {
    ctx    context.Context
    cancel context.CancelFunc
    wg     sync.WaitGroup
    
    // 待执行步骤集合
    todoSteps map[*stepStack]struct{}
    mu        sync.Mutex
    
    // 步骤选择策略
    policy selectStepPolicy
    
    // 并发度
    parallel int  // 默认16
}

func (bg *bgStepExecutor) Start() {
    bg.wg.Add(1)
    go bg.scheduleLoop()
}

func (bg *bgStepExecutor) scheduleLoop() {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-bg.ctx.Done():
            return
        case <-ticker.C:
            bg.process()
        }
    }
}

func (bg *bgStepExecutor) process() {
    bg.mu.Lock()
    // 按优先级选择步骤
    steps := bg.policy(bg.todoSteps)
    bg.mu.Unlock()
    
    // 并发执行
    var wg sync.WaitGroup
    for _, s := range steps {
        wg.Add(1)
        go func(s *stepStack) {
            defer wg.Done()
            
            // 执行步骤栈
            remainSteps := s.Execute(bg.ctx)
            
            bg.mu.Lock()
            defer bg.mu.Unlock()
            
            delete(bg.todoSteps, s)
            if remainSteps != nil {
                // 有剩余步骤,重新加入队列
                bg.todoSteps[remainSteps] = struct{}{}
            }
        }(s)
    }
    wg.Wait()
}

stepStack - 步骤栈

type stepStack struct {
    steps []nestedStep
}

func (s *stepStack) Execute(ctx context.Context) *stepStack {
    steps := s.steps
    for len(steps) > 0 {
        l := len(steps)
        todo := steps[l-1]  // 从后往前执行(栈结构)
        
        childSteps, err := todo.Execute(ctx)
        
        if !retry.IsRecoverable(err) {
            log.Warn("step failed, not recoverable", zap.Error(err))
            return nil
        }
        
        if err != nil {
            log.Warn("step failed, will retry", zap.Error(err))
            return &stepStack{steps: steps}
        }
        
        // 步骤成功,移除并添加子步骤
        steps = steps[:l-1]
        steps = append(steps, childSteps...)
    }
    return nil
}

func (s *stepStack) totalPriority() stepPriority {
    total := stepPriority(0)
    for _, step := range s.steps {
        total += step.Weight()
    }
    return total
}

2.4.4 常见Step实现

addCollectionMetaStep - 添加Collection元数据

type addCollectionMetaStep struct {
    baseStep
    coll *model.Collection
}

func (s *addCollectionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) {
    err := s.core.meta.AddCollection(ctx, s.coll)
    return nil, err
}

func (s *addCollectionMetaStep) Desc() string {
    return fmt.Sprintf("add collection to meta table, name: %s, id: %d", 
        s.coll.Name, s.coll.CollectionID)
}

watchChannelsStep - 通知DataCoord

type watchChannelsStep struct {
    baseStep
    info *watchInfo
}

func (s *watchChannelsStep) Execute(ctx context.Context) ([]nestedStep, error) {
    err := s.core.broker.WatchChannels(ctx, s.info)
    return nil, err
}

func (s *watchChannelsStep) Desc() string {
    return fmt.Sprintf("watch channels, ts: %d, collection: %d, vChannels: %v",
        s.info.ts, s.info.collectionID, s.info.vChannels)
}

2.4.5 Undo时序图

sequenceDiagram
    autonumber
    participant Task
    participant Undo as baseUndoTask
    participant S1 as Step1(ExpireCache)
    participant S2 as Step2(AddMeta)
    participant S3 as Step3(WatchChannels)
    participant BG as bgStepExecutor
    participant U1 as UndoStep1
    participant U2 as UndoStep2
    
    Task->>Undo: Execute(ctx)
    
    rect rgb(232, 245, 233)
    Note over Undo,S1: 执行Step1成功
    Undo->>S1: Execute(ctx)
    S1->>S1: Proxy.InvalidateCache
    S1-->>Undo: nil
    end
    
    rect rgb(232, 245, 233)
    Note over Undo,S2: 执行Step2成功
    Undo->>S2: Execute(ctx)
    S2->>S2: MetaTable.AddCollection
    S2->>S2: etcd.Save()
    S2-->>Undo: nil
    end
    
    rect rgb(255, 228, 225)
    Note over Undo,S3: 执行Step3失败
    Undo->>S3: Execute(ctx)
    S3->>S3: Broker.WatchChannels
    S3-->>Undo: error(network timeout)
    end
    
    rect rgb(255, 240, 245)
    Note over Undo,BG: 触发Undo流程
    Undo->>Undo: 收集undoSteps[0:2]
    Undo->>BG: AddSteps(undoSteps) [异步]
    Undo-->>Task: return error
    end
    
    rect rgb(240, 248, 255)
    Note over BG,U2: 后台执行Undo(倒序)
    BG->>BG: scheduleLoop触发
    BG->>BG: 按优先级选择stepStack
    
    BG->>U2: Execute(ctx) [UndoStep2]
    U2->>U2: MetaTable.RemoveCollection
    U2->>U2: etcd.Delete()
    U2-->>BG: nil
    
    BG->>U1: Execute(ctx) [UndoStep1]
    U1->>U1: NullStep(无操作)
    U1-->>BG: nil
    
    BG->>BG: 移除stepStack
    end
    
    Note over Task,BG: Undo完成,系统恢复一致性

2.4.6 Undo机制特点

优点

  1. 事务语义:任何步骤失败都能回滚
  2. 异步执行:Undo不阻塞当前DDL,提升吞吐
  3. 优先级调度:紧急清理步骤优先执行
  4. 自动重试:Recoverable错误自动重试
  5. 并发执行:多个Undo任务并发执行(默认16并发)

限制

  1. 非严格事务:Undo是补偿操作,不是原子回滚
  2. 依赖幂等性:Undo步骤必须幂等
  3. 异步延迟:Undo可能延迟执行(最长1秒)

典型Undo场景

  • CreateCollection失败:删除已写入的元数据、移除Channel
  • WatchChannels超时:重试或标记失败
  • etcd写入失败:删除部分已写入的数据

3. 关键设计

2.5 Broker模块与其他组件交互

2.5.1 Broker架构图

flowchart TB
    subgraph RootCoord["RootCoord"]
        API[API Layer]
        Task[DDL Tasks]
        Step[Step Executor]
        Broker[ServerBroker]
    end
    
    subgraph DataCoord["DataCoord"]
        DC_API[gRPC API]
        DC_Seg[Segment Manager]
        DC_Ch[Channel Manager]
        DC_GC[GC Manager]
    end
    
    subgraph QueryCoord["QueryCoord"]
        QC_API[gRPC API]
        QC_Load[Load Manager]
        QC_Sched[Scheduler]
    end
    
    subgraph Proxy["Proxy"]
        P_Cache[GlobalMetaCache]
    end
    
    API --> Task
    Task --> Step
    Step --> Broker
    
    Broker -->|WatchChannels| DC_API
    Broker -->|GetSegmentStates| DC_API
    Broker -->|GcConfirm| DC_API
    Broker -->|DropIndex| DC_API
    Broker -->|BroadcastAlteredCollection| DC_API
    
    DC_API --> DC_Ch
    DC_API --> DC_Seg
    DC_API --> DC_GC
    
    Broker -->|ReleaseCollection| QC_API
    Broker -->|ReleasePartitions| QC_API
    Broker -->|SyncNewCreatedPartition| QC_API
    Broker -->|GetSegmentInfo| QC_API
    
    QC_API --> QC_Load
    QC_API --> QC_Sched
    
    Broker -.->|InvalidateCache| P_Cache
    
    style Broker fill:#e1f5ff,stroke:#333,stroke-width:2px
    style DataCoord fill:#fff4e6,stroke:#333,stroke-width:2px
    style QueryCoord fill:#f3e5f5,stroke:#333,stroke-width:2px

2.5.2 核心接口

Broker接口定义

// 路径:internal/rootcoord/broker.go
type Broker interface {
    // QueryCoord交互
    ReleaseCollection(ctx context.Context, collectionID UniqueID) error
    ReleasePartitions(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) error
    SyncNewCreatedPartition(ctx context.Context, collectionID UniqueID, partitionID UniqueID) error
    GetQuerySegmentInfo(ctx context.Context, collectionID int64, segIDs []int64) (*querypb.GetSegmentInfoResponse, error)
    
    // DataCoord交互
    WatchChannels(ctx context.Context, info *watchInfo) error
    UnwatchChannels(ctx context.Context, info *watchInfo) error
    GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error)
    GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool
    DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error
    
    // 广播通知
    BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) error
}

2.5.3 关键交互场景

场景1:CreateCollection - WatchChannels交互

sequenceDiagram
    autonumber
    participant Step as watchChannelsStep
    participant Broker
    participant DC as DataCoord
    participant DN as DataNode
    participant MQ as MessageQueue
    
    Step->>Broker: WatchChannels(watchInfo)
    Broker->>DC: WatchChannelsRequest
    
    Note over DC: 接收Watch请求
    DC->>DC: 创建Channel元数据
    DC->>DC: 分配DataNode
    
    par 通知所有DataNode
        DC->>DN: CreateFlowGraph(vChannels)
        DN->>DN: 创建FlowGraph
        DN->>MQ: Subscribe(vChannels)
        MQ-->>DN: StartPosition
        DN-->>DC: ACK
    end
    
    DC->>DC: 标记Channel为Watched
    DC-->>Broker: WatchChannelsResponse(Success)
    Broker-->>Step: nil

场景2:DropCollection - ReleaseCollection交互

sequenceDiagram
    autonumber
    participant Step as releaseCollectionStep
    participant Broker
    participant QC as QueryCoord
    participant QN as QueryNode
    
    Step->>Broker: ReleaseCollection(collectionID)
    Broker->>QC: ReleaseCollectionRequest
    
    Note over QC: 开始释放流程
    QC->>QC: 标记Collection为Releasing
    
    par 通知所有QueryNode
        QC->>QN: ReleaseSegments(collectionID)
        QN->>QN: 卸载所有Segment
        QN->>QN: 释放内存
        QN-->>QC: ACK
    end
    
    QC->>QC: 清理Collection元数据
    QC-->>Broker: ReleaseCollectionResponse(Success)
    Broker-->>Step: nil

场景3:GarbageCollection - GcConfirm交互

sequenceDiagram
    autonumber
    participant GC as GarbageCollector
    participant Broker
    participant DC as DataCoord
    participant MinIO as Object Storage
    
    Note over GC: 定期GC扫描
    GC->>GC: 扫描Dropped Collection
    GC->>Broker: GcConfirm(collectionID, partitionID)
    
    Broker->>DC: GcConfirmRequest
    DC->>DC: 检查Segment是否全部Dropped
    DC->>DC: 检查Binlog是否已清理
    
    alt 清理完成
        DC-->>Broker: GcConfirmResponse(finished=true)
        Broker-->>GC: true
        
        GC->>GC: 删除Collection元数据
        GC->>MinIO: DeleteObjects(binlog paths)
        MinIO-->>GC: OK
    else 清理未完成
        DC-->>Broker: GcConfirmResponse(finished=false)
        Broker-->>GC: false
        GC->>GC: 等待下次扫描
    end

场景4:AlterCollection - BroadcastAlteredCollection交互

sequenceDiagram
    autonumber
    participant Task as AlterCollectionTask
    participant Broker
    participant DC as DataCoord
    participant QC as QueryCoord
    participant Proxy
    
    Task->>Broker: BroadcastAlteredCollection(req)
    
    Note over Broker: 构造AlterCollectionRequest
    Broker->>Broker: 获取Collection完整Schema
    Broker->>Broker: 获取所有PartitionIDs
    
    par 并发广播
        Broker->>DC: AlterCollectionRequest
        DC->>DC: 更新Collection Schema
        DC->>DC: 更新Channel映射
        DC-->>Broker: Success
    and
        Broker->>QC: AlterCollectionRequest
        QC->>QC: 更新Collection配置
        QC-->>Broker: Success
    and
        Broker->>Proxy: InvalidateCollectionMetaCache
        Proxy->>Proxy: 清除缓存
        Proxy-->>Broker: ACK
    end
    
    Broker-->>Task: nil
    
    Note over Task,Proxy: 所有组件完成Schema更新

2.6 MetaTable缓存机制与失效策略

2.6.1 MetaTable架构

flowchart TB
    subgraph RootCoord["RootCoord"]
        API[API Layer]
        
        subgraph MetaTable["MetaTable(内存缓存)"]
            CollMap[collID2Meta<br/>Map[CollID]*Collection]
            NameMap[collName2ID<br/>Map[DBName][CollName]CollID]
            AliasMap[collAlias2ID<br/>Map[DBName][Alias]CollID]
            PartMap[partID2Meta<br/>Map[PartID]*Partition]
            DBMap[dbID2Meta<br/>Map[DBID]*Database]
        end
        
        Catalog[Catalog<br/>持久化层]
    end
    
    subgraph Storage["存储层"]
        ETCD[etcd/TiKV]
    end
    
    subgraph Proxy["Proxy"]
        GCache[GlobalMetaCache]
    end
    
    API --> MetaTable
    MetaTable <--> Catalog
    Catalog <--> ETCD
    
    MetaTable -.->|InvalidateCache| GCache
    
    style MetaTable fill:#e1f5ff,stroke:#333,stroke-width:2px
    style Catalog fill:#fff4e6,stroke:#333,stroke-width:2px

2.6.2 核心数据结构

MetaTable结构体

// 路径:internal/rootcoord/meta_table.go
type MetaTable struct {
    sync.RWMutex
    ctx context.Context
    
    // Collection缓存:CollectionID -> Collection
    collID2Meta map[typeutil.UniqueID]*model.Collection
    
    // Collection名称索引:DBID -> CollectionName -> CollectionID
    collName2ID map[typeutil.UniqueID]map[string]typeutil.UniqueID
    
    // Collection别名:DBID -> Alias -> CollectionID
    collAlias2ID map[typeutil.UniqueID]map[string]typeutil.UniqueID
    
    // Partition缓存:PartitionID -> Partition
    partID2Meta map[typeutil.UniqueID]*model.Partition
    
    // Database缓存
    names2DatabaseID map[string]typeutil.UniqueID
    dbID2Meta        map[typeutil.UniqueID]*model.Database
    
    // 持久化层
    catalog metastore.RootCoordCatalog
}

2.6.3 缓存操作流程

AddCollection - 写入缓存

func (mt *MetaTable) AddCollection(ctx context.Context, coll *model.Collection) error {
    mt.Lock()
    defer mt.Unlock()
    
    // 1. 持久化到etcd
    if err := mt.catalog.CreateCollection(ctx, coll, coll.CreateTime); err != nil {
        return err
    }
    
    // 2. 更新内存缓存
    mt.collID2Meta[coll.CollectionID] = coll
    
    if _, ok := mt.collName2ID[coll.DBID]; !ok {
        mt.collName2ID[coll.DBID] = make(map[string]typeutil.UniqueID)
    }
    mt.collName2ID[coll.DBID][coll.Name] = coll.CollectionID
    
    // 3. 更新Partition缓存
    for _, part := range coll.Partitions {
        mt.partID2Meta[part.PartitionID] = part
    }
    
    return nil
}

GetCollectionByName - 查询缓存

func (mt *MetaTable) GetCollectionByName(ctx context.Context, dbName string, collectionName string, ts Timestamp) (*model.Collection, error) {
    mt.RLock()
    defer mt.RUnlock()
    
    // 1. 获取DatabaseID
    dbID, ok := mt.names2DatabaseID[dbName]
    if !ok {
        return nil, merr.WrapErrDatabaseNotFound(dbName)
    }
    
    // 2. 从名称索引获取CollectionID
    collID, ok := mt.collName2ID[dbID][collectionName]
    if !ok {
        return nil, merr.WrapErrCollectionNotFound(collectionName)
    }
    
    // 3. 从Collection缓存获取数据
    coll, ok := mt.collID2Meta[collID]
    if !ok {
        return nil, merr.WrapErrCollectionNotFound(collectionName)
    }
    
    // 4. 时间旅行支持
    if ts != typeutil.MaxTimestamp && coll.CreateTime > ts {
        return nil, merr.WrapErrCollectionNotFound(collectionName)
    }
    
    // 5. 返回副本(避免外部修改)
    return coll.Clone(), nil
}

2.6.4 缓存失效机制

失效流程时序图

sequenceDiagram
    autonumber
    participant Task as DDL Task
    participant Meta as MetaTable
    participant ETCD as etcd
    participant Broker
    participant Proxy1
    participant Proxy2
    
    rect rgb(240, 248, 255)
    Note over Task,ETCD: DDL修改元数据
    Task->>Meta: AlterCollection(coll)
    Meta->>ETCD: UpdateCollection(coll)
    ETCD-->>Meta: OK
    Meta->>Meta: 更新内存缓存
    Meta-->>Task: Success
    end
    
    rect rgb(255, 250, 240)
    Note over Task,Proxy2: 广播失效通知
    Task->>Broker: BroadcastAlteredCollection(req)
    
    par 并发通知所有Proxy
        Broker->>Proxy1: InvalidateCollectionMetaCache(collID, ts)
        Proxy1->>Proxy1: 删除缓存[collID]
        Proxy1-->>Broker: ACK
    and
        Broker->>Proxy2: InvalidateCollectionMetaCache(collID, ts)
        Proxy2->>Proxy2: 删除缓存[collID]
        Proxy2-->>Broker: ACK
    end
    
    Broker-->>Task: Success
    end
    
    rect rgb(243, 229, 245)
    Note over Proxy1,Meta: Proxy重新加载元数据
    Proxy1->>Proxy1: 下次访问Collection
    Proxy1->>Meta: DescribeCollection(collID)
    Meta-->>Proxy1: CollectionInfo(最新)
    Proxy1->>Proxy1: 缓存到本地
    end

失效策略

场景 触发时机 失效范围 性能影响
CreateCollection 创建成功后 新Collection(无缓存)
DropCollection 删除成功后 指定Collection
AlterCollection 修改成功后 指定Collection
CreatePartition 创建成功后 父Collection
DropPartition 删除成功后 父Collection
RenameCollection 重命名成功后 旧名和新名

缓存一致性保证

1. Write-Through(写穿):
   - 先写etcd,成功后更新内存缓存
   - 保证持久化数据与缓存一致

2. 主动失效(Invalidate):
   - DDL完成后立即通知所有Proxy
   - Proxy删除本地缓存

3. 懒加载(Lazy Load):
   - Proxy下次访问时从RootCoord重新加载
   - RootCoord从内存缓存返回(快速)

4. 最终一致性:
   - 极短时间内可能存在不一致(毫秒级)
   - Proxy收到失效通知后立即删除缓存

5. 时间旅行支持:
   - 支持基于Timestamp的历史查询
   - etcd支持Revision回溯

2.6.5 性能优化

优化策略

  1. 读写锁分离

    // 读操作(高频):RLock
    func (mt *MetaTable) GetCollection(...) {
        mt.RLock()
        defer mt.RUnlock()
        // ...
    }
    
    // 写操作(低频):Lock
    func (mt *MetaTable) AddCollection(...) {
        mt.Lock()
        defer mt.Unlock()
        // ...
    }
    
  2. 多级索引

    • collID2Meta:主索引(CollectionID)
    • collName2ID:名称索引(DBName + CollName)
    • collAlias2ID:别名索引(DBName + Alias)
    • 避免全表扫描,O(1)查询
  3. 批量操作

    // 批量查询Collection
    func (mt *MetaTable) ListCollections(ctx context.Context, dbName string) ([]*model.Collection, error) {
        // 一次锁,批量返回
    }
    
  4. 异步持久化(部分场景):

    • 低优先级更新可异步写入etcd
    • 高优先级更新(DDL)同步写入

性能指标

操作 P50 P99 P999 QPS
GetCollectionByName <1ms <5ms <10ms 10K+
GetCollectionByID <1ms <3ms <8ms 20K+
AddCollection 50ms 200ms 500ms 10/s
ListCollections 2ms 10ms 30ms 1K/s

3. 关键设计

3.1 DDL串行化

问题:并发DDL可能导致元数据不一致

解决:DDL Scheduler串行执行

关键机制

  • 单一taskChan保证串行处理
  • 每个任务分配单调递增的Timestamp
  • taskHeap维护任务完成顺序,用于MinDdlTs计算

3.2 MVCC与时间旅行

机制:基于TSO实现多版本并发控制

写入:Insert(data, ts=100)
查询:Search(ts=100)  → 查询≤100的所有数据
删除:Delete(id, ts=150) → 标记删除
查询:Search(ts=120)  → 仍可见(未删除)
查询:Search(ts=160)  → 不可见(已删除)

3.3 元数据缓存策略

两级缓存

  1. RootCoord内存缓存:MetaTable
  2. Proxy缓存:globalMetaCache

失效机制

sequenceDiagram
    participant RC as RootCoord
    participant ETCD as etcd
    participant P1 as Proxy1
    participant P2 as Proxy2
    
    RC->>ETCD: UpdateCollection(collectionID, newSchema)
    RC->>RC: 更新本地MetaTable
    
    par 广播失效通知
        RC->>P1: InvalidateCollectionMetaCache(collectionID, ts)
        P1->>P1: 移除缓存
    and
        RC->>P2: InvalidateCollectionMetaCache(collectionID, ts)
        P2->>P2: 移除缓存
    end

4. 性能与容量

4.1 性能指标

指标 数值 说明
AllocTimestamp QPS >100万 单RootCoord
AllocID QPS >10万 批量分配
CreateCollection延迟 P99: 200ms 包含etcd写入
DescribeCollection延迟 P99: 10ms 内存缓存

4.2 容量规划

维度 容量 限制因素
Database数量 1000 内存
Collection数量 10000 内存+etcd
Field数量/Collection 256 Schema大小
Partition数量/Collection 4096 元数据量

5. 配置参数

rootCoord:
  dmlChannelNum: 16           # DML Channel数量
  maxPartitionNum: 4096       # 单Collection最大分区数
  minSegmentSizeToEnableIndex: 1024  # 最小索引大小(MB)
  
  # TSO配置
  tso:
    updateInterval: 50ms      # TSO更新间隔
    saveInterval: 3000ms      # TSO持久化间隔
    
  # GC配置
  gc:
    interval: 3600            # GC周期(秒)
    missingTolerance: 86400   # 数据丢失容忍时间

相关文档


Milvus-02-RootCoord-API

核心API列表

RootCoord作为元数据管理中心,提供以下类别的API:

类别 API数量 主要功能
Collection管理 8个 CreateCollection、DropCollection、DescribeCollection等
Partition管理 4个 CreatePartition、DropPartition、ShowPartitions等
Database管理 4个 CreateDatabase、DropDatabase、ListDatabases等
资源分配 2个 AllocTimestamp、AllocID
权限管理 10+个 CreateCredential、CreateRole、GrantPrivilege等

1. CreateCollection

基本信息

  • 功能:创建Collection及其Schema
  • 协议:gRPC milvuspb.MilvusService/CreateCollection
  • 幂等性:否

请求参数

type CreateCollectionRequest struct {
    Base              *commonpb.MsgBase
    DbName            string                   // 数据库名
    CollectionName    string                   // 集合名
    Schema            []byte                   // 序列化的Schema
    ShardsNum         int32                    // Shard数量
    ConsistencyLevel  commonpb.ConsistencyLevel
    Properties        []*commonpb.KeyValuePair
    NumPartitions     int64                    // Partition Key分区数
}

核心执行流程

func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
    // 1. 健康检查
    if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
        return merr.Status(err), nil
    }
    
    // 2. 创建任务
    t := &createCollectionTask{
        baseTask: newBaseTask(ctx, c),
        Req:      in,
    }
    
    // 3. 加入调度器(串行执行)
    if err := c.scheduler.AddTask(t); err != nil {
        return merr.Status(err), nil
    }
    
    // 4. 等待完成
    if err := t.WaitToFinish(); err != nil {
        return merr.Status(err), nil
    }
    
    return merr.Success(), nil
}

// 任务执行逻辑
func (t *createCollectionTask) Execute(ctx context.Context) error {
    // 1. 解析Schema
    schema := &schemapb.CollectionSchema{}
    proto.Unmarshal(t.Req.Schema, schema)
    
    // 2. 参数校验
    if err := t.validate(ctx); err != nil {
        return err
    }
    
    // 3. 分配CollectionID
    t.collID, err = t.core.idAllocator.AllocOne()
    
    // 4. 创建VirtualChannels
    t.channels = t.core.chanTimeTick.getChannels(t.Req.ShardsNum)
    
    // 5. 持久化元数据
    err = t.core.meta.AddCollection(ctx, &model.Collection{
        CollectionID:     t.collID,
        Name:             t.Req.CollectionName,
        Schema:           schema,
        VirtualChannels:  t.channels.virtualChannels,
        PhysicalChannels: t.channels.physicalChannels,
    })
    
    // 6. 广播失效通知
    t.core.broker.BroadcastAlteredCollection(ctx, &milvuspb.AlterCollectionRequest{
        CollectionID: t.collID,
    })
    
    return nil
}

时序图

sequenceDiagram
    participant C as Client
    participant RC as RootCoord
    participant SCH as Scheduler
    participant ID as IDAllocator
    participant Meta as MetaTable
    participant ETCD as etcd
    
    C->>RC: CreateCollection(name, schema)
    RC->>SCH: AddTask(createCollectionTask)
    SCH->>SCH: 串行执行(DDL锁)
    SCH->>ID: AllocOne()
    ID-->>SCH: collectionID
    SCH->>Meta: AddCollection(collection)
    Meta->>ETCD: Save(collection_meta)
    ETCD-->>Meta: OK
    Meta-->>SCH: Success
    SCH-->>RC: Task完成
    RC-->>C: Status(Success)

2. AllocTimestamp

基本信息

  • 功能:分配全局时间戳(TSO)
  • 调用频率:极高(每个DML/DQL操作)
  • 性能要求:P99 < 10ms

请求参数

type AllocTimestampRequest struct {
    Base           *commonpb.MsgBase
    Count          uint32  // 分配数量
    BlockTimestamp uint64  // 阻塞等待时间戳(可选)
}

响应参数

type AllocTimestampResponse struct {
    Status    *commonpb.Status
    Timestamp uint64  // 起始时间戳
    Count     uint32  // 分配数量
}

核心实现

func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
    // 1. 健康检查
    if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
        return &rootcoordpb.AllocTimestampResponse{
            Status: merr.Status(err),
        }, nil
    }
    
    // 2. BlockTimestamp处理(等待物理时间)
    if in.BlockTimestamp > 0 {
        blockTime, _ := tsoutil.ParseTS(in.BlockTimestamp)
        lastTime := c.tsoAllocator.GetLastSavedTime()
        deltaDuration := blockTime.Sub(lastTime)
        if deltaDuration > 0 {
            time.Sleep(deltaDuration + 200*time.Millisecond)
        }
    }
    
    // 3. 生成TSO
    ts, err := c.tsoAllocator.GenerateTSO(in.GetCount())
    if err != nil {
        return &rootcoordpb.AllocTimestampResponse{
            Status: merr.Status(err),
        }, nil
    }
    
    // 4. 返回第一个可用时间戳
    ts = ts - uint64(in.GetCount()) + 1
    
    return &rootcoordpb.AllocTimestampResponse{
        Status:    merr.Success(),
        Timestamp: ts,
        Count:     in.GetCount(),
    }, nil
}

TSO生成机制

// GlobalTSOAllocator TSO分配器
type GlobalTSOAllocator struct {
    lastPhysical int64   // 上次物理时间(毫秒)
    lastLogical  int64   // 上次逻辑计数器
    maxLogical   int64   // 最大逻辑计数器(262144)
    
    kvBase kv.TxnKV      // etcd/TiKV
}

func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) {
    gta.mu.Lock()
    defer gta.mu.Unlock()
    
    // 1. 获取当前物理时间
    physical := time.Now().UnixMilli()
    
    // 2. 如果物理时间前进,重置逻辑计数器
    if physical > gta.lastPhysical {
        gta.lastPhysical = physical
        gta.lastLogical = 0
    }
    
    // 3. 检查逻辑计数器是否溢出
    if gta.lastLogical+int64(count) >= gta.maxLogical {
        // 等待下一毫秒
        time.Sleep(time.Millisecond)
        gta.lastPhysical = time.Now().UnixMilli()
        gta.lastLogical = 0
    }
    
    // 4. 生成TSO
    ts := uint64(gta.lastPhysical)<<18 | uint64(gta.lastLogical)
    gta.lastLogical += int64(count)
    
    // 5. 定期持久化到etcd(每3秒)
    if time.Since(gta.lastSaveTime) > 3*time.Second {
        gta.kvBase.Save(tsoKey, gta.lastPhysical)
        gta.lastSaveTime = time.Now()
    }
    
    return ts, nil
}

TSO格式

 64 bits TSO
|<----  Physical (46 bits)  ---->|<-- Logical (18 bits) -->|
|   毫秒级物理时间戳              |   逻辑计数器(0-262143) |

示例:
  Physical: 1704067200000 (2024-01-01 00:00:00)
  Logical:  12345
  TSO:      1704067200000 << 18 | 12345 = 446676598505558329

3. DescribeCollection

基本信息

  • 功能:查询Collection元信息
  • 调用频率:高
  • 性能:P99 < 10ms(内存缓存)

请求参数

type DescribeCollectionRequest struct {
    Base           *commonpb.MsgBase
    DbName         string
    CollectionName string
    CollectionID   int64  // 可选
    TimeStamp      uint64 // 时间旅行
}

响应参数

type DescribeCollectionResponse struct {
    Status              *commonpb.Status
    Schema              *schemapb.CollectionSchema
    CollectionID        int64
    VirtualChannelNames []string
    PhysicalChannelNames []string
    CreatedTimestamp    uint64
    CreatedUtcTimestamp uint64
    ShardsNum           int32
    ConsistencyLevel    commonpb.ConsistencyLevel
    CollectionName      string
    Properties          []*commonpb.KeyValuePair
    DbName              string
    NumPartitions       int64
}

核心实现

func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*milvuspb.DescribeCollectionResponse, error) {
    // 1. 健康检查
    if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
        return &milvuspb.DescribeCollectionResponse{
            Status: merr.Status(err),
        }, nil
    }
    
    // 2. 确定查询时间戳
    ts := typeutil.MaxTimestamp
    if in.TimeStamp != 0 {
        ts = in.TimeStamp
    }
    
    // 3. 从MetaTable查询
    var coll *model.Collection
    var err error
    
    if in.CollectionID != 0 {
        coll, err = c.meta.GetCollectionByID(ctx, in.DbName, in.CollectionID, ts, allowUnavailable)
    } else {
        coll, err = c.meta.GetCollectionByName(ctx, in.DbName, in.CollectionName, ts)
    }
    
    if err != nil {
        return &milvuspb.DescribeCollectionResponse{
            Status: merr.Status(err),
        }, nil
    }
    
    // 4. 构造响应
    return &milvuspb.DescribeCollectionResponse{
        Status:               merr.Success(),
        Schema:               coll.Schema,
        CollectionID:         coll.CollectionID,
        VirtualChannelNames:  coll.VirtualChannelNames,
        PhysicalChannelNames: coll.PhysicalChannelNames,
        CreatedTimestamp:     coll.CreateTime,
        ShardsNum:            coll.ShardsNum,
        ConsistencyLevel:     coll.ConsistencyLevel,
        CollectionName:       coll.Name,
        Properties:           coll.Properties,
        DbName:               in.DbName,
        NumPartitions:        coll.NumPartitions,
    }, nil
}

4. CreatePartition

基本信息

  • 功能:创建分区
  • 幂等性:否

请求参数

type CreatePartitionRequest struct {
    Base           *commonpb.MsgBase
    DbName         string
    CollectionName string
    PartitionName  string
}

核心流程

func (t *createPartitionTask) Execute(ctx context.Context) error {
    // 1. 获取Collection信息
    coll, err := t.core.meta.GetCollectionByName(ctx, t.Req.DbName, t.Req.CollectionName, typeutil.MaxTimestamp)
    
    // 2. 检查分区数量限制
    if len(coll.Partitions) >= int(Params.RootCoordCfg.MaxPartitionNum.GetAsInt64()) {
        return fmt.Errorf("partition number (%d) exceeds max configuration (%d)", 
            len(coll.Partitions), Params.RootCoordCfg.MaxPartitionNum.GetAsInt64())
    }
    
    // 3. 分配PartitionID
    partID, err := t.core.idAllocator.AllocOne()
    
    // 4. 添加分区
    err = t.core.meta.AddPartition(ctx, coll.CollectionID, &model.Partition{
        PartitionID:   partID,
        PartitionName: t.Req.PartitionName,
        CollectionID:  coll.CollectionID,
    })
    
    return nil
}

5. 其他重要API(简要说明)

5.1 DropCollection

功能:删除Collection及其所有Partition、Segment

核心逻辑

  1. 标记Collection状态为CollectionDropping
  2. 通知DataCoord/QueryCoord释放资源
  3. 删除etcd中的元数据
  4. 触发垃圾回收

5.2 ShowCollections

功能:列出指定Database的所有Collection

核心逻辑

func (t *showCollectionTask) Execute(ctx context.Context) error {
    colls, err := t.core.meta.ListCollections(ctx, t.Req.DbName, t.Req.TimeStamp, true)
    
    t.Rsp.CollectionNames = make([]string, len(colls))
    t.Rsp.CollectionIds = make([]int64, len(colls))
    
    for i, coll := range colls {
        t.Rsp.CollectionNames[i] = coll.Name
        t.Rsp.CollectionIds[i] = coll.CollectionID
    }
    
    return nil
}

5.3 AllocID

功能:分配全局唯一ID(CollectionID、PartitionID、SegmentID等)

实现:基于etcd实现的GlobalIDAllocator

func (c *Core) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
    start, end, err := c.idAllocator.Alloc(in.Count)
    
    return &rootcoordpb.AllocIDResponse{
        Status: merr.Success(),
        ID:     start,
        Count:  in.Count,
    }, nil
}

6. 性能优化

6.1 TSO性能优化

批量分配

  • 每次etcd更新预分配50ms的时间戳
  • 单个RootCoord支持>100万QPS

本地缓存

type tsoCache struct {
    physical int64
    logical  int64
    maxLogical int64
}

// 避免频繁访问etcd

6.2 MetaTable缓存

两级缓存

  1. RootCoord内存缓存(MetaTable)
  2. Proxy缓存(globalMetaCache)

失效策略

  • 主动失效:DDL操作后广播InvalidateCollectionMetaCache
  • 被动失效:Proxy定期刷新(可配置)

7. 配置参数

rootCoord:
  dmlChannelNum: 16              # DML Channel数量
  maxPartitionNum: 4096          # 单Collection最大分区数
  minSegmentSizeToEnableIndex: 1024  # 最小索引大小(MB)
  
  # TSO配置
  tso:
    updateInterval: 50ms         # TSO更新间隔
    saveInterval: 3000ms         # TSO持久化间隔

相关文档


Milvus-02-RootCoord-数据结构

1. 核心数据结构UML图

classDiagram
    class Core {
        +context.Context ctx
        +IMetaTable meta
        +IScheduler scheduler
        +IDAllocator idAllocator
        +TSOAllocator tsoAllocator
        +Broker broker
        +DdlTsLockManager ddlTsLockManager
        
        +CreateCollection(req) status
        +AllocTimestamp(req) response
        +AllocID(req) response
    }
    
    class IMetaTable {
        <<interface>>
        +AddCollection(coll) error
        +GetCollectionByName(name) collection
        +GetCollectionByID(id) collection
        +DropCollection(id) error
        +ListCollections() collections
    }
    
    class MetaTable {
        -sync.RWMutex mu
        -map collections
        -map partitions
        -map aliases
        -Catalog catalog
        
        +AddCollection(coll) error
        +GetCollectionByName(name) collection
    }
    
    class Collection {
        +int64 CollectionID
        +string Name
        +string Description
        +CollectionSchema Schema
        +uint64 CreateTime
        +[]string VirtualChannelNames
        +[]string PhysicalChannelNames
        +int32 ShardsNum
        +ConsistencyLevel consistencyLevel
        +[]Partition Partitions
        +[]*KeyValuePair Properties
        +int64 NumPartitions
        +CollectionState State
    }
    
    class CollectionSchema {
        +string Name
        +string Description
        +[]FieldSchema Fields
        +bool AutoID
        +bool EnableDynamicField
        +[]*KeyValuePair Properties
    }
    
    class FieldSchema {
        +int64 FieldID
        +string Name
        +DataType DataType
        +bool IsPrimaryKey
        +bool IsPartitionKey
        +bool AutoID
        +int64 Dimension
        +[]int TypeParams
        +[]int IndexParams
    }
    
    class IScheduler {
        <<interface>>
        +AddTask(task) error
        +Start()
        +Stop()
    }
    
    class Scheduler {
        -chan task taskQueue
        -atomic.Bool started
        -DdlTsLockManager ddlTsLockManager
        
        +AddTask(task) error
        +Start()
        +taskLoop()
    }
    
    class Task {
        <<interface>>
        +GetID() int64
        +SetID(id)
        +GetTs() Timestamp
        +Execute(ctx) error
        +WaitToFinish() error
        +Notify(error)
    }
    
    class createCollectionTask {
        +CreateCollectionRequest Req
        +CollectionSchema schema
        +int64 collID
        +[]int64 partIDs
        +[]string channels
        
        +Execute(ctx) error
    }
    
    class TSOAllocator {
        -int64 lastPhysical
        -int64 lastLogical
        -int64 maxLogical
        -kv.TxnKV kvBase
        
        +GenerateTSO(count) uint64
        +UpdateTSO() error
        +GetLastSavedTime() time
    }
    
    class IDAllocator {
        -int64 base
        -int64 end
        -kv.TxnKV kvBase
        
        +AllocOne() int64
        +Alloc(count) (start, end)
        +reloadFromKV() error
    }
    
    Core *-- IMetaTable
    Core *-- IScheduler
    Core *-- TSOAllocator
    Core *-- IDAllocator
    
    IMetaTable <|.. MetaTable
    MetaTable *-- Collection
    Collection *-- CollectionSchema
    CollectionSchema *-- FieldSchema
    
    IScheduler <|.. Scheduler
    Scheduler *-- Task
    Task <|.. createCollectionTask

2. Core结构体详解

// Core RootCoord核心结构体
type Core struct {
    // 上下文与生命周期
    ctx    context.Context
    cancel context.CancelFunc
    wg     sync.WaitGroup
    
    // etcd/TiKV客户端
    etcdCli *clientv3.Client
    tikvCli *txnkv.Client
    
    // 地址与Session
    address string
    session sessionutil.SessionInterface
    
    // 元数据管理
    meta IMetaTable  // MetaTable接口
    
    // 任务调度
    scheduler IScheduler  // DDL任务调度器
    
    // Broker(与其他组件通信)
    broker Broker
    
    // DDL时间戳锁管理器
    ddlTsLockManager DdlTsLockManager
    
    // 资源分配器
    idAllocator  allocator.Interface     // ID分配器
    tsoAllocator tso2.Allocator          // TSO分配器
    
    // Coordinator客户端
    mixCoord types.MixCoord
    
    // 配额中心
    quotaCenter *QuotaCenter
    
    // 垃圾回收
    garbageCollector GarbageCollector
    
    // Proxy管理
    proxyCreator       proxyutil.ProxyCreator
    proxyWatcher       *proxyutil.ProxyWatcher
    proxyClientManager proxyutil.ProxyClientManagerInterface
    
    // 时间同步
    chanTimeTick *timetickSync
    
    // 状态码
    stateCode atomic.Int32
}

3. MetaTable数据结构

3.1 MetaTable结构体

// MetaTable Collection元数据管理
type MetaTable struct {
    sync.RWMutex
    
    // Collection缓存:dbID -> collectionID -> Collection
    collID2Meta map[typeutil.UniqueID]*model.Collection
    
    // Collection名称索引:dbID -> collectionName -> collectionID
    collName2ID map[typeutil.UniqueID]map[string]typeutil.UniqueID
    
    // Collection别名:dbID -> alias -> collectionID
    collAlias2ID map[typeutil.UniqueID]map[string]typeutil.UniqueID
    
    // Partition缓存:partitionID -> Partition
    partID2Meta map[typeutil.UniqueID]*model.Partition
    
    // Database缓存:dbName -> Database
    names2DatabaseID map[string]typeutil.UniqueID
    dbID2Meta        map[typeutil.UniqueID]*model.Database
    
    // 持久化层
    catalog metastore.RootCoordCatalog
}

3.2 Collection模型

// Collection Collection元数据模型
type Collection struct {
    // 基础信息
    CollectionID      int64                      // Collection ID
    DBID              int64                      // Database ID
    Name              string                     // Collection名称
    Description       string                     // 描述
    AutoID            bool                       // 是否自动生成ID
    
    // Schema
    Schema            *schemapb.CollectionSchema // 字段Schema
    
    // 字段映射
    FieldIndexes      []*model.Index             // 字段索引
    
    // 虚拟通道
    VirtualChannelNames  []string                // Virtual Channel名称
    PhysicalChannelNames []string                // Physical Channel名称
    ShardsNum            int32                   // Shard数量
    
    // 分区
    Partitions        []*Partition               // 所有分区
    
    // 时间戳
    CreateTime        uint64                     // 创建时间戳
    StartPositions    []*commonpb.KeyDataPair    // 起始位置
    
    // 一致性
    ConsistencyLevel  commonpb.ConsistencyLevel  // 一致性级别
    
    // 状态
    State             pb.CollectionState         // Collection状态
    
    // 扩展属性
    Properties        []*commonpb.KeyValuePair   // 扩展属性
    
    // Partition Key
    NumPartitions     int64                      // Partition Key分区数
}

3.3 CollectionSchema

// CollectionSchema Schema定义
type CollectionSchema struct {
    Name              string                     // Schema名称
    Description       string                     // 描述
    AutoID            bool                       // 是否自动生成ID
    Fields            []*FieldSchema             // 字段列表
    EnableDynamicField bool                      // 是否启用动态字段
    Properties        []*commonpb.KeyValuePair   // 扩展属性
}

// FieldSchema 字段Schema
type FieldSchema struct {
    FieldID           int64                      // 字段ID
    Name              string                     // 字段名
    IsPrimaryKey      bool                       // 是否主键
    Description       string                     // 描述
    DataType          schemapb.DataType          // 数据类型
    TypeParams        []*commonpb.KeyValuePair   // 类型参数
    IndexParams       []*commonpb.KeyValuePair   // 索引参数
    AutoID            bool                       // 是否自动生成
    State             schemapb.FieldState        // 字段状态
    ElementType       schemapb.DataType          // 元素类型(Array)
    DefaultValue      *schemapb.ValueField       // 默认值
    IsDynamic         bool                       // 是否动态字段
    IsPartitionKey    bool                       // 是否Partition Key
    IsClusteringKey   bool                       // 是否Clustering Key
    Nullable          bool                       // 是否可空
}

3.4 CollectionState状态机

// CollectionState Collection状态
type CollectionState int32

const (
    CollectionCreated     CollectionState = 0  // 已创建
    CollectionCreating    CollectionState = 1  // 创建中
    CollectionDropping    CollectionState = 2  // 删除中
    CollectionDropped     CollectionState = 3  // 已删除
)

// 状态转换
// Created → Dropping → Dropped
// Creating → Created
// Creating → Dropping → Dropped (创建失败)

4. Scheduler调度器

4.1 Scheduler结构体

// scheduler DDL任务调度器
type scheduler struct {
    ctx    context.Context
    cancel context.CancelFunc
    wg     sync.WaitGroup
    
    // 任务队列
    taskQueue chan task
    
    // 启动标志
    started atomic.Bool
    
    // 资源分配器
    idAllocator  allocator.Interface
    tsoAllocator tso2.Allocator
    
    // DDL时间戳锁
    ddlTsLockManager DdlTsLockManager
}

4.2 Task接口

// task DDL任务接口
type task interface {
    GetID() int64
    SetID(id int64)
    GetTs() Timestamp
    SetTs(ts Timestamp)
    GetType() commonpb.MsgType
    Execute(ctx context.Context) error
    WaitToFinish() error
    Notify(err error)
}

4.3 createCollectionTask

// createCollectionTask 创建Collection任务
type createCollectionTask struct {
    baseTask
    
    // 请求
    Req *milvuspb.CreateCollectionRequest
    
    // 解析后的Schema
    schema *schemapb.CollectionSchema
    
    // 分配的资源
    collID         UniqueID     // Collection ID
    partIDs        []UniqueID   // Partition IDs
    channels       collectionChannels  // Channels
    dbID           UniqueID     // Database ID
    partitionNames []string     // Partition名称
}

func (t *createCollectionTask) Execute(ctx context.Context) error {
    // 1. 解析Schema
    t.schema = &schemapb.CollectionSchema{}
    proto.Unmarshal(t.Req.Schema, t.schema)
    
    // 2. 参数校验
    if err := t.validate(ctx); err != nil {
        return err
    }
    
    // 3. 分配CollectionID
    t.collID, _ = t.core.idAllocator.AllocOne()
    
    // 4. 创建VirtualChannels
    t.channels = t.core.chanTimeTick.getChannels(t.Req.ShardsNum)
    
    // 5. 添加Collection
    coll := &model.Collection{
        CollectionID:         t.collID,
        Name:                 t.Req.CollectionName,
        Schema:               t.schema,
        VirtualChannelNames:  t.channels.virtualChannels,
        PhysicalChannelNames: t.channels.physicalChannels,
        ShardsNum:            t.Req.ShardsNum,
        ConsistencyLevel:     t.Req.ConsistencyLevel,
        CreateTime:           t.GetTs(),
        State:                pb.CollectionState_CollectionCreated,
    }
    
    err := t.core.meta.AddCollection(ctx, coll)
    
    return err
}

5. TSO与ID分配器

5.1 TSOAllocator

// GlobalTSOAllocator 全局TSO分配器
type GlobalTSOAllocator struct {
    mu sync.Mutex
    
    // TSO状态
    lastPhysical int64   // 上次物理时间(毫秒)
    lastLogical  int64   // 上次逻辑计数器
    maxLogical   int64   // 最大逻辑计数器(262144 = 2^18)
    
    // 持久化
    kvBase kv.TxnKV      // etcd/TiKV
    key    string        // TSO存储key
    
    // 更新间隔
    updateInterval time.Duration  // 50ms
    saveInterval   time.Duration  // 3s
    
    lastSaveTime time.Time
}

// GenerateTSO 生成TSO
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) {
    gta.mu.Lock()
    defer gta.mu.Unlock()
    
    // 获取当前物理时间
    physical := time.Now().UnixMilli()
    
    // 物理时间前进,重置逻辑计数器
    if physical > gta.lastPhysical {
        gta.lastPhysical = physical
        gta.lastLogical = 0
    }
    
    // 检查逻辑计数器溢出
    if gta.lastLogical+int64(count) >= gta.maxLogical {
        // 等待下一毫秒
        time.Sleep(time.Millisecond)
        gta.lastPhysical = time.Now().UnixMilli()
        gta.lastLogical = 0
    }
    
    // 生成TSO
    ts := uint64(gta.lastPhysical)<<18 | uint64(gta.lastLogical)
    gta.lastLogical += int64(count)
    
    return ts, nil
}

5.2 IDAllocator

// GlobalIDAllocator 全局ID分配器
type GlobalIDAllocator struct {
    mu sync.Mutex
    
    // ID范围
    base int64   // 当前ID范围起始
    end  int64   // 当前ID范围结束
    
    // 持久化
    kvBase kv.TxnKV
    key    string
    
    // 批量分配大小
    allocSize int64  // 10000
}

// AllocOne 分配单个ID
func (gia *GlobalIDAllocator) AllocOne() (int64, error) {
    gia.mu.Lock()
    defer gia.mu.Unlock()
    
    // 检查是否需要重新加载
    if gia.base >= gia.end {
        if err := gia.reloadFromKV(); err != nil {
            return 0, err
        }
    }
    
    id := gia.base
    gia.base++
    
    return id, nil
}

// reloadFromKV 从etcd重新加载ID范围
func (gia *GlobalIDAllocator) reloadFromKV() error {
    // 1. 获取当前值
    val, err := gia.kvBase.Load(gia.key)
    if err != nil {
        return err
    }
    
    currentEnd := parseInt64(val)
    
    // 2. 更新etcd(CAS操作)
    newEnd := currentEnd + gia.allocSize
    err = gia.kvBase.CompareAndSwap(gia.key, val, toString(newEnd))
    if err != nil {
        return err
    }
    
    // 3. 更新本地范围
    gia.base = currentEnd
    gia.end = newEnd
    
    return nil
}

6. DdlTsLockManager

// ddlTsLockManager DDL时间戳锁管理器
type ddlTsLockManager struct {
    lastTs        atomic.Uint64   // 最后的DDL时间戳
    inProgressCnt atomic.Int32    // 正在进行的DDL数量
    tsoAllocator  tso.Allocator   // TSO分配器
    mu            sync.Mutex      // 互斥锁
}

// GetMinDdlTs 获取最小DDL时间戳
func (c *ddlTsLockManager) GetMinDdlTs() Timestamp {
    // 如果有正在进行的DDL,返回上次的时间戳
    if c.inProgressCnt.Load() > 0 {
        return c.lastTs.Load()
    }
    
    // 否则分配新的时间戳
    c.Lock()
    defer c.Unlock()
    
    ts, err := c.tsoAllocator.GenerateTSO(1)
    if err != nil {
        return c.lastTs.Load()
    }
    
    c.UpdateLastTs(ts)
    return ts
}

// 用途:保证DDL操作的时间戳顺序性
// - DDL开始:AddRefCnt(1)
// - DDL结束:AddRefCnt(-1)
// - TimeTick:GetMinDdlTs()

相关文档


Milvus-02-RootCoord-时序图

1. CreateCollection时序图

sequenceDiagram
    autonumber
    participant C as Client
    participant P as Proxy
    participant RC as RootCoord
    participant SCH as Scheduler
    participant TSO as TSOAllocator
    participant ID as IDAllocator
    participant Meta as MetaTable
    participant ETCD as etcd
    participant DC as DataCoord
    participant QC as QueryCoord
    
    C->>P: CreateCollection(name, schema)
    P->>RC: CreateCollection RPC
    RC->>RC: 健康检查
    RC->>SCH: AddTask(createCollectionTask)
    
    SCH->>SCH: 等待DDL锁
    SCH->>SCH: 解析Schema
    SCH->>SCH: 参数校验
    
    SCH->>ID: AllocOne()
    ID->>ETCD: CompareAndSwap(idKey)
    ETCD-->>ID: OK
    ID-->>SCH: collectionID=123
    
    SCH->>TSO: GenerateTSO(1)
    TSO-->>SCH: timestamp
    
    SCH->>SCH: 创建VirtualChannels
    
    SCH->>Meta: AddCollection(collection)
    Meta->>ETCD: Save(/collection/123)
    ETCD-->>Meta: OK
    Meta->>Meta: 更新内存缓存
    Meta-->>SCH: Success
    
    par 异步广播
        SCH->>DC: BroadcastAlteredCollection
        DC->>DC: InvalidateCache
        DC-->>SCH: ACK
    and
        SCH->>QC: BroadcastAlteredCollection
        QC->>QC: InvalidateCache
        QC-->>SCH: ACK
    and
        SCH->>P: InvalidateCollectionMetaCache
        P->>P: RemoveCache
        P-->>SCH: ACK
    end
    
    SCH-->>RC: Task完成
    RC-->>P: Status(Success)
    P-->>C: Status(Success)

时序图说明

  1. 步骤1-4:Client通过Proxy发起CreateCollection请求
  2. 步骤5-7:任务进入DDL Scheduler,等待串行执行
  3. 步骤8-11:分配CollectionID(从etcd批量预分配)
  4. 步骤12-13:分配创建时间戳
  5. 步骤14:创建VirtualChannels(从Channel池分配)
  6. 步骤15-19:持久化元数据到etcd,更新内存缓存
  7. 步骤20-29:并发广播失效通知到所有组件
  8. 步骤30-32:返回成功状态

2. AllocTimestamp时序图

sequenceDiagram
    autonumber
    participant P as Proxy
    participant RC as RootCoord
    participant TSO as TSOAllocator
    participant ETCD as etcd
    
    loop 每50ms
        TSO->>ETCD: Save(tsoKey, lastPhysical)
        Note over TSO: 定期持久化物理时间
    end
    
    P->>RC: AllocTimestamp(count=10)
    RC->>TSO: GenerateTSO(10)
    
    TSO->>TSO: 获取当前时间(UnixMilli)
    
    alt 物理时间前进
        TSO->>TSO: lastPhysical=currentTime
        TSO->>TSO: lastLogical=0
    end
    
    alt 逻辑计数器溢出
        TSO->>TSO: sleep(1ms)
        TSO->>TSO: lastPhysical=新时间
        TSO->>TSO: lastLogical=0
    end
    
    TSO->>TSO: ts = lastPhysical<<18 | lastLogical
    TSO->>TSO: lastLogical += 10
    
    TSO-->>RC: ts
    RC-->>P: Timestamp(ts, count=10)
    
    Note over P: 使用范围[ts, ts+9]

TSO生成机制

时间轴:
  t0: physical=1000, logical=0      → TSO=1000<<18|0   = 262144000
  t1: physical=1000, logical=10     → TSO=1000<<18|10  = 262144010
  t2: physical=1000, logical=262143 → TSO=1000<<18|262143
  t3: physical=1001, logical=0      → TSO=1001<<18|0   = 262406144

特性:
- 单调递增:保证分布式顺序
- 高精度:毫秒+逻辑计数器
- 高性能:本地生成,减少etcd访问

3. DescribeCollection时序图

sequenceDiagram
    autonumber
    participant P as Proxy
    participant RC as RootCoord
    participant Meta as MetaTable
    participant ETCD as etcd
    
    P->>RC: DescribeCollection(name="docs")
    RC->>RC: 健康检查
    
    RC->>Meta: GetCollectionByName("default", "docs", ts)
    
    alt 内存缓存命中
        Meta-->>RC: Collection(from cache)
    else 缓存未命中
        Meta->>ETCD: Load(/collection/*)
        ETCD-->>Meta: CollectionMeta
        Meta->>Meta: 反序列化+缓存
        Meta-->>RC: Collection
    end
    
    RC->>RC: 构造Response
    RC-->>P: DescribeCollectionResponse

缓存策略

Cache Key: dbID + collectionName
Cache Value: *model.Collection

失效时机:
1. DDL操作(CreateCollection/DropCollection/AlterCollection)
2. 接收到InvalidateCollectionMetaCache通知
3. 查询时发现版本不匹配(etcd比缓存新)

性能:
- Cache Hit: P99 < 5ms
- Cache Miss: P99 < 20ms (包含etcd查询)

4. CreatePartition时序图

sequenceDiagram
    autonumber
    participant C as Client
    participant RC as RootCoord
    participant SCH as Scheduler
    participant ID as IDAllocator
    participant Meta as MetaTable
    participant ETCD as etcd
    
    C->>RC: CreatePartition(collection, partition)
    RC->>SCH: AddTask(createPartitionTask)
    
    SCH->>Meta: GetCollectionByName(collection)
    Meta-->>SCH: Collection
    
    SCH->>SCH: 检查分区数量限制
    alt 超过限制
        SCH-->>RC: Error(MaxPartitionNum)
        RC-->>C: Status(Error)
    end
    
    SCH->>ID: AllocOne()
    ID-->>SCH: partitionID
    
    SCH->>Meta: AddPartition(partition)
    Meta->>Meta: 更新Collection.Partitions
    Meta->>ETCD: Save(/collection/123/partition/456)
    ETCD-->>Meta: OK
    Meta-->>SCH: Success
    
    SCH->>RC: 广播失效通知
    SCH-->>RC: Task完成
    RC-->>C: Status(Success)

5. DropCollection时序图

sequenceDiagram
    autonumber
    participant C as Client
    participant RC as RootCoord
    participant SCH as Scheduler
    participant Meta as MetaTable
    participant ETCD as etcd
    participant DC as DataCoord
    participant QC as QueryCoord
    
    C->>RC: DropCollection(collection)
    RC->>SCH: AddTask(dropCollectionTask)
    
    SCH->>Meta: GetCollectionByName(collection)
    Meta-->>SCH: Collection
    
    SCH->>Meta: ChangeCollectionState(Dropping)
    Meta->>ETCD: Update(state=Dropping)
    ETCD-->>Meta: OK
    
    par 通知Release
        SCH->>QC: ReleaseCollection(collectionID)
        QC->>QC: 卸载所有Segment
        QC-->>SCH: Success
    and
        SCH->>DC: ReleaseCollection(collectionID)
        DC->>DC: 标记Segment为Dropped
        DC-->>SCH: Success
    end
    
    SCH->>Meta: DropCollection(collectionID)
    Meta->>ETCD: Delete(/collection/123)
    ETCD-->>Meta: OK
    Meta->>Meta: 从缓存移除
    Meta-->>SCH: Success
    
    SCH->>SCH: 触发GC
    Note over SCH: 异步清理Binlog文件
    
    SCH-->>RC: Task完成
    RC-->>C: Status(Success)

状态转换

CollectionCreated → CollectionDropping → CollectionDropped

步骤:
1. 标记为Dropping(不可见)
2. 通知QueryCoord/DataCoord释放资源
3. 删除etcd元数据
4. 异步GC清理Object Storage文件

6. ShowCollections时序图

sequenceDiagram
    autonumber
    participant C as Client
    participant RC as RootCoord
    participant SCH as Scheduler
    participant Meta as MetaTable
    
    C->>RC: ShowCollections(dbName)
    RC->>SCH: AddTask(showCollectionTask)
    
    SCH->>Meta: ListCollections(dbName, ts)
    Meta->>Meta: 过滤:state==Created
    Meta-->>SCH: [coll1, coll2, ...]
    
    SCH->>SCH: 构造Response
    SCH-->>RC: [names, ids, timestamps]
    RC-->>C: ShowCollectionsResponse

7. 时间旅行(Time Travel)

sequenceDiagram
    participant C as Client
    participant RC as RootCoord
    participant Meta as MetaTable
    participant ETCD as etcd
    
    Note over C: 查询历史状态
    C->>RC: DescribeCollection(name, ts=100)
    RC->>Meta: GetCollectionByName(name, ts=100)
    
    Meta->>ETCD: LoadWithRevision(key, revision)
    ETCD-->>Meta: HistoricalData(ts=100)
    Meta-->>RC: Collection(历史状态)
    RC-->>C: DescribeCollectionResponse
    
    Note over C: 查询当前状态
    C->>RC: DescribeCollection(name, ts=MaxTimestamp)
    RC->>Meta: GetCollectionByName(name, ts=Max)
    Meta-->>RC: Collection(当前状态)
    RC-->>C: DescribeCollectionResponse

时间旅行用途

  1. MVCC查询:查询历史版本的Collection
  2. 一致性保证:使用特定Timestamp保证读一致性
  3. 调试与审计:回溯DDL操作历史

实现机制

// etcd支持基于Revision的查询
// Timestamp → Revision映射(单调递增)

func (m *MetaTable) GetCollectionByName(dbName, name string, ts Timestamp) (*Collection, error) {
    if ts == MaxTimestamp {
        // 查询最新版本(从缓存)
        return m.cache[dbName][name], nil
    }
    
    // 查询历史版本(从etcd)
    revision := m.timestampToRevision(ts)
    data := m.etcdCli.Get(ctx, collectionKey, clientv3.WithRev(revision))
    
    return parseCollection(data), nil
}

8. DDL串行化机制

sequenceDiagram
    participant T1 as DDL Task1
    participant T2 as DDL Task2
    participant T3 as DDL Task3
    participant SCH as Scheduler
    participant Lock as DdlTsLockManager
    
    par 并发提交
        T1->>SCH: AddTask(createCollection)
        T2->>SCH: AddTask(dropCollection)
        T3->>SCH: AddTask(createPartition)
    end
    
    SCH->>SCH: taskQueue接收任务
    
    loop 串行执行
        SCH->>Lock: Lock()
        Lock-->>SCH: Acquired
        
        SCH->>T1: Execute()
        T1->>T1: 执行DDL
        T1-->>SCH: Success
        
        SCH->>Lock: Unlock()
        
        SCH->>Lock: Lock()
        SCH->>T2: Execute()
        T2-->>SCH: Success
        SCH->>Lock: Unlock()
        
        SCH->>Lock: Lock()
        SCH->>T3: Execute()
        T3-->>SCH: Success
        SCH->>Lock: Unlock()
    end

串行化原因

  1. 元数据一致性:避免并发修改导致冲突
  2. Timestamp顺序性:保证DDL操作的全局顺序
  3. 简化实现:无需复杂的并发控制

性能影响

  • DDL操作频率低(通常<1/秒)
  • 串行化对系统吞吐影响小
  • DML/DQL操作不受影响(并发执行)