Milvus-05-DataNode-概览

1. 模块概述

1.1 职责定义

DataNode是Milvus分布式架构中的数据持久化组件,负责将流式数据写入对象存储,执行数据整理和索引构建任务。

核心职责

  1. 数据同步与持久化

    • 通过SyncManager管理数据同步任务
    • 将内存数据序列化为Binlog格式
    • 上传到Object Storage(S3/MinIO/OSS)
    • 支持StorageV1和StorageV2两种存储格式
  2. Compaction执行

    • L0DeleteCompaction:应用Delete日志到Sealed Segment
    • MixCompaction:合并小Segment并清理删除数据
    • ClusteringCompaction:按Clustering Key重新组织数据
    • SortCompaction:按主键排序优化查询
  3. 索引构建

    • 执行向量索引构建(HNSW/IVF_FLAT/DiskANN)
    • 执行标量索引构建(B-Tree/Inverted)
    • 执行统计信息收集(Stats/Analyze)
    • 构建BM25统计信息(全文检索)
  4. 数据导入

    • 批量导入任务调度(ImportV2)
    • 多文件格式支持(Parquet/JSON/CSV)
    • L0 Import(仅Delete数据导入)
  5. 资源管理

    • Slot管理(控制并发任务数)
    • 内存控制(防止OOM)
    • 任务优先级调度

1.2 整体架构图

flowchart TB
    subgraph DataNode["DataNode 核心组件"]
        subgraph Core["核心服务"]
            DN[DataNode Server<br/>生命周期管理]
            Session[Session<br/>Etcd注册/心跳]
        end
        
        subgraph Managers["管理器层"]
            SyncMgr[SyncManager<br/>数据同步管理]
            CompactExec[CompactionExecutor<br/>Compaction执行器]
            TaskSched[TaskScheduler<br/>索引/Stats任务调度]
            ImportSched[ImportScheduler<br/>导入任务调度]
        end
        
        subgraph Workers["工作池"]
            SyncPool[Sync Worker Pool<br/>并发同步任务]
            CompactPool[Compaction Pool<br/>Compaction并发]
            IndexPool[Index Builder Pool<br/>索引构建并发]
        end
        
        subgraph Storage["存储抽象"]
            ChunkMgr[ChunkManager<br/>对象存储客户端]
            BinlogIO[BinlogIO<br/>Binlog读写]
        end
        
        DN --> Session
        DN --> SyncMgr
        DN --> CompactExec
        DN --> TaskSched
        DN --> ImportSched
        
        SyncMgr --> SyncPool
        CompactExec --> CompactPool
        TaskSched --> IndexPool
        
        SyncPool --> BinlogIO
        CompactPool --> BinlogIO
        IndexPool --> BinlogIO
        
        BinlogIO --> ChunkMgr
    end
    
    subgraph External["外部依赖"]
        DC[DataCoord<br/>任务分配]
        Etcd[Etcd<br/>服务注册/元数据]
        OS[Object Storage<br/>S3/MinIO/OSS]
    end
    
    DN -.->|注册/心跳| Etcd
    DC -.->|CompactionV2<br/>CreateTask| DN
    ChunkMgr -.->|Upload/Download| OS
    
    style DataNode fill:#e1f5ff,stroke:#333,stroke-width:2px
    style Managers fill:#fff4e6,stroke:#333,stroke-width:1px
    style Workers fill:#e6f7ff,stroke:#333,stroke-width:1px

架构说明

  1. 核心服务层

    • DataNode Server:整个服务的入口,管理生命周期(Init/Start/Stop)
    • Session:通过Etcd注册节点,定期发送心跳,维护节点在线状态
  2. 管理器层

    • SyncManager:管理所有数据同步任务,按SegmentID分配Worker,确保同一Segment的同步任务串行执行
    • CompactionExecutor:维护Compaction任务队列,管理Slot分配,执行各类Compaction任务
    • TaskScheduler:调度索引构建和统计信息任务,控制并发度
    • ImportScheduler:调度批量导入任务,管理导入任务的生命周期
  3. 工作池

    • Sync Worker Pool:并发执行数据同步任务,将WriteBuffer中的数据序列化并上传
    • Compaction Pool:并发执行Compaction,支持L0/Mix/Clustering/Sort四种类型
    • Index Builder Pool:并发构建索引,调用Knowhere/Segcore接口
  4. 存储抽象

    • ChunkManager:封装对象存储操作(S3/MinIO/OSS/本地),支持分片上传
    • BinlogIO:封装Binlog读写逻辑,处理压缩/解压缩

1.3 与DataCoord交互总览

sequenceDiagram
    autonumber
    participant DC as DataCoord
    participant DN as DataNode
    participant Etcd as Etcd
    participant OS as Object Storage
    
    Note over DN,Etcd: 节点注册与心跳
    DN->>Etcd: 注册节点信息
    DN->>Etcd: 定期更新心跳
    
    Note over DC,DN: Compaction任务分配
    DC->>DN: CompactionV2(CompactionPlan)
    DN->>DN: 创建Compaction任务
    DN->>DN: 加入ExecutorQueue
    DN-->>DC: Status{Success}
    
    Note over DN,OS: Compaction执行
    DN->>OS: 读取Binlog数据
    DN->>DN: 执行Compaction逻辑
    DN->>OS: 写入新Segment Binlog
    
    Note over DC,DN: 任务状态查询
    DC->>DN: GetCompactionState(planID)
    DN-->>DC: CompactionStateResponse
    
    Note over DC,DN: 索引构建任务
    DC->>DN: CreateTask(IndexTask)
    DN->>DN: TaskScheduler调度
    DN->>DN: IndexBuilder构建索引
    DN->>OS: 上传索引文件
    DN-->>DC: Status{Success}
    
    Note over DC,DN: 导入任务
    DC->>DN: CreateTask(ImportTask)
    DN->>DN: ImportScheduler调度
    DN->>OS: 读取导入文件
    DN->>OS: 写入Segment Binlog
    DN-->>DC: Status{Success}
    
    Note over DC,DN: Slot查询
    DC->>DN: QuerySlot()
    DN-->>DC: QuerySlotResponse{availableSlots}

交互说明

  1. 节点注册与心跳

    • DataNode启动后向Etcd注册,包含ServerID、地址、版本等信息
    • 定期发送心跳(默认1秒),若心跳超时则DataNode进程退出
  2. Compaction任务分配

    • DataCoord通过CompactionV2 RPC下发任务
    • DataNode检查Slot可用性,创建对应类型的Compactor
    • 任务加入CompactionExecutor队列,异步执行
  3. 索引构建任务

    • DataCoord通过CreateTask RPC下发IndexTask
    • TaskScheduler根据Slot可用性调度
    • IndexBuilder调用Knowhere/Segcore接口构建索引
  4. 导入任务

    • DataCoord通过CreateTask RPC下发ImportTask
    • ImportScheduler管理PreImport和Import两阶段
    • PreImport验证文件格式和内容,Import执行实际写入
  5. Slot管理

    • DataCoord通过QuerySlot查询可用Slot数
    • DataNode根据总Slot数减去各类任务占用计算可用Slot
    • 用于任务调度和负载均衡

2. 核心流程详解

2.1 DataNode启动流程

sequenceDiagram
    autonumber
    participant Main as main()
    participant DN as DataNode
    participant Session as Session
    participant Etcd as Etcd
    participant SyncMgr as SyncManager
    participant CompactExec as CompactionExecutor
    participant TaskSched as TaskScheduler
    participant ImportSched as ImportScheduler
    
    Main->>DN: NewDataNode(ctx)
    activate DN
    DN->>DN: 初始化各组件
    DN->>CompactExec: NewExecutor()
    DN->>TaskSched: NewTaskScheduler()
    DN-->>Main: dataNode实例
    deactivate DN
    
    Main->>DN: SetEtcdClient(etcdCli)
    
    Main->>DN: Init()
    activate DN
    DN->>DN: registerMetricsRequest()
    DN->>Session: NewSession(ctx)
    Session->>Session: Init(DataNodeRole)
    Session->>Etcd: 预注册节点
    DN->>SyncMgr: NewSyncManager()
    DN->>DN: importTaskMgr = NewTaskManager()
    DN->>ImportSched: NewScheduler(importTaskMgr)
    DN->>DN: InitSegcore(serverID)
    DN-->>Main: nil (成功)
    deactivate DN
    
    Main->>DN: Start()
    activate DN
    DN->>CompactExec: Start(ctx)
    Note over CompactExec: 启动后台goroutine<br/>监听taskCh
    DN->>ImportSched: Start()
    Note over ImportSched: 启动导入调度器
    DN->>TaskSched: Start()
    Note over TaskSched: 启动索引构建loop
    DN->>DN: UpdateStateCode(Healthy)
    DN-->>Main: nil (成功)
    deactivate DN
    
    Main->>DN: Register()
    activate DN
    DN->>Session: Register()
    Session->>Etcd: 正式注册节点信息
    Session->>Session: LivenessCheck(ctx)
    Note over Session: 启动心跳goroutine<br/>定期更新TTL
    DN-->>Main: nil (成功)
    deactivate DN

启动流程说明

第一阶段:NewDataNode - 创建实例

func NewDataNode(ctx context.Context) *DataNode {
    ctx2, cancel2 := context.WithCancel(ctx)
    node := &DataNode{
        ctx:                    ctx2,
        cancel:                 cancel2,
        Role:                   typeutil.DataNodeRole,
        lifetime:               lifetime.NewLifetime(commonpb.StateCode_Abnormal),
        compactionExecutor:     compactor.NewExecutor(),
        reportImportRetryTimes: 10,
        metricsRequest:         metricsinfo.NewMetricsRequest(),
        totalSlot:              index.CalculateNodeSlots(), // 根据CPU和内存计算
    }
    sc := index.NewTaskScheduler(ctx2)
    node.storageFactory = NewChunkMgrFactory()
    node.taskScheduler = sc
    node.taskManager = index.NewTaskManager(ctx2)
    node.UpdateStateCode(commonpb.StateCode_Abnormal) // 初始为Abnormal
    return node
}

第二阶段:Init - 初始化组件

func (node *DataNode) Init() error {
    node.initOnce.Do(func() {
        // 1. 注册Metrics处理器
        node.registerMetricsRequest()
        
        // 2. 初始化Session(Etcd注册)
        if err := node.initSession(); err != nil {
            initError = err
            return
        }
        
        serverID := node.GetNodeID()
        
        // 3. 创建SyncManager(数据同步管理器)
        syncMgr := syncmgr.NewSyncManager(nil)
        node.syncMgr = syncMgr
        
        // 4. 创建ImportTaskManager和Scheduler
        node.importTaskMgr = importv2.NewTaskManager()
        node.importScheduler = importv2.NewScheduler(node.importTaskMgr)
        
        // 5. 初始化Segcore(C++索引引擎)
        err := index.InitSegcore(serverID)
        if err != nil {
            initError = err
        }
    })
    return initError
}

第三阶段:Start - 启动后台服务

func (node *DataNode) Start() error {
    node.startOnce.Do(func() {
        // 1. 启动Compaction执行器
        go node.compactionExecutor.Start(node.ctx)
        
        // 2. 启动Import调度器
        go node.importScheduler.Start()
        
        // 3. 启动Task调度器(索引构建)
        err := node.taskScheduler.Start()
        if err != nil {
            startErr = err
            return
        }
        
        // 4. 更新状态为Healthy
        node.UpdateStateCode(commonpb.StateCode_Healthy)
    })
    return startErr
}

第四阶段:Register - 注册到Etcd

func (node *DataNode) Register() error {
    // 正式注册到Etcd
    node.session.Register()
    
    // 启动心跳检测
    node.session.LivenessCheck(node.ctx, func() {
        log.Error("Data Node disconnected from etcd, process will exit")
        os.Exit(1) // 心跳失败则退出进程
    })
    
    return nil
}

关键设计点

  1. 分阶段启动:Init阶段准备资源,Start阶段启动后台任务,Register阶段注册到集群
  2. Once保护:使用sync.Once确保Init/Start/Stop只执行一次
  3. Lifetime管理:通过Lifetime管理状态码(Abnormal→Healthy→Stopping→Stopped)
  4. 心跳机制:心跳失败直接退出进程,避免脑裂
  5. 优雅关闭:Stop先设置状态为Abnormal,等待所有请求完成后再清理资源

2.2 SyncManager数据同步流程

DataNode v2.6.0之后采用WriteBuffer + SyncManager架构,不再使用FlowGraph,数据同步由SyncManager统一管理。

sequenceDiagram
    autonumber
    participant WB as WriteBuffer
    participant SM as SyncManager
    participant ST as SyncTask
    participant Writer as BinlogWriter
    participant CM as ChunkManager
    participant OS as Object Storage
    participant MC as MetaCache
    
    Note over WB: 数据累积触发Sync
    WB->>WB: getSegmentsToSync(policies)
    WB->>WB: getSyncTask(segmentID)
    WB->>WB: yieldBuffer(segmentID)
    Note over WB: 生成InsertDataDeleteData<br/>SchemaTimeRange
    
    WB->>SM: SyncData(ctx, syncTask, callbacks)
    SM->>SM: safeSubmitTask(segmentID, task)
    Note over SM: SegmentID分配Worker<br/>保证同Segment串行
    SM->>ST: Submit to WorkerPool
    
    activate ST
    ST->>MC: GetSegmentByID(segmentID)
    MC-->>ST: SegmentInfo
    
    alt StorageV2
        ST->>Writer: NewBulkPackWriterV2()
        Writer->>Writer: 创建BinlogRecordWriter
    else StorageV1
        ST->>Writer: NewBulkPackWriter()
    end
    
    loop 写入每个字段
        ST->>Writer: Write(insertData)
        Writer->>Writer: Serialize字段数据
        Writer->>Writer: 累积到Chunk
        
        alt Chunk满
            Writer->>Writer: FlushChunk()
            Writer->>CM: Upload(binlog_path, data)
            CM->>OS: PutObject()
        end
    end
    
    ST->>Writer: WriteDelta(deleteData)
    Writer->>CM: Upload(deltalog_path, deletes)
    
    ST->>Writer: WriteStats(pkStats, bm25Stats)
    Writer->>CM: Upload(statslog_path, stats)
    
    ST->>MC: UpdateSegments(FinishSyncing)
    alt isFlush
        ST->>MC: UpdateState(Flushed)
    end
    
    ST-->>SM: Success
    deactivate ST
    SM->>WB: callback(nil)
    WB->>WB: syncCheckpoint.Remove(segmentID)

关键代码:WriteBuffer触发Sync

// syncSegments 触发多个Segment的同步
func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) []*conc.Future[struct{}] {
    result := make([]*conc.Future[struct{}], 0, len(segmentIDs))
    for _, segmentID := range segmentIDs {
        // 1. 构造SyncTask
        syncTask, err := wb.getSyncTask(ctx, segmentID)
        if err != nil {
            if errors.Is(err, merr.ErrSegmentNotFound) {
                log.Warn("segment not found in meta", zap.Int64("segmentID", segmentID))
                continue
            } else {
                log.Fatal("failed to get sync task", zap.Int64("segmentID", segmentID), zap.Error(err))
            }
        }
        
        // 2. 提交到SyncManager
        future, err := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
            if err != nil {
                return err
            }
            
            // 3. 成功后清理Checkpoint
            if syncTask.StartPosition() != nil {
                wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp())
            }
            
            // 4. 如果是Flush,从MetaCache移除
            if syncTask.IsFlush() {
                wb.metaCache.RemoveSegments(metacache.WithSegmentIDs(syncTask.SegmentID()))
                log.Info("flushed segment removed", zap.Int64("segmentID", syncTask.SegmentID()))
            }
            return nil
        })
        
        if err != nil {
            log.Fatal("failed to sync data", zap.Int64("segmentID", segmentID), zap.Error(err))
        }
        result = append(result, future)
    }
    return result
}

// getSyncTask 从WriteBuffer构造SyncTask
func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (syncmgr.Task, error) {
    segmentInfo, ok := wb.metaCache.GetSegmentByID(segmentID)
    if !ok {
        return nil, merr.WrapErrSegmentNotFound(segmentID)
    }
    
    // Yield数据(从Buffer取出并清空)
    insert, bm25, delta, schema, timeRange, startPos := wb.yieldBuffer(segmentID)
    
    if startPos != nil {
        wb.syncCheckpoint.Add(segmentID, startPos, "syncing task")
    }
    
    // 构造SyncPack
    pack := &syncmgr.SyncPack{}
    pack.WithInsertData(insert).
        WithDeleteData(delta).
        WithCollectionID(wb.collectionID).
        WithPartitionID(segmentInfo.PartitionID()).
        WithChannelName(wb.channelName).
        WithSegmentID(segmentID).
        WithStartPosition(startPos).
        WithTimeRange(tsFrom, tsTo).
        WithLevel(segmentInfo.Level()).
        WithBatchRows(batchSize).
        WithErrorHandler(wb.errHandler)
    
    if len(bm25) != 0 {
        pack.WithBM25Stats(bm25)
    }
    
    // 判断是否Flush
    if segmentInfo.State() == commonpb.SegmentState_Flushing ||
        segmentInfo.Level() == datapb.SegmentLevel_L0 {
        pack.WithFlush()
    }
    
    return syncmgr.NewSyncTask(pack), nil
}

关键代码:SyncTask执行

func (t *SyncTask) Run(ctx context.Context) (err error) {
    t.tr = timerecord.NewTimeRecorder("syncTask")
    
    segmentInfo, has := t.metacache.GetSegmentByID(t.segmentID)
    if !has {
        if t.pack.isDrop {
            log.Info("segment dropped, discard sync task")
            return nil
        }
        log.Warn("segment not found in metacache")
        return nil
    }
    
    columnGroups := t.getColumnGroups(segmentInfo)
    
    // 根据StorageVersion选择Writer
    switch segmentInfo.GetStorageVersion() {
    case storage.StorageV2:
        writer := NewBulkPackWriterV2(t.metacache, t.schema, t.chunkManager, 
            t.allocator, 0, packed.DefaultMultiPartUploadSize, t.storageConfig, 
            columnGroups, t.writeRetryOpts...)
        t.insertBinlogs, t.deltaBinlog, t.statsBinlogs, t.bm25Binlogs, t.flushedSize, err = 
            writer.Write(ctx, t.pack)
        if err != nil {
            log.Warn("failed to write sync data with storage v2 format", zap.Error(err))
            return err
        }
    default:
        writer := NewBulkPackWriter(t.metacache, t.schema, t.chunkManager, 
            t.allocator, t.writeRetryOpts...)
        t.insertBinlogs, t.deltaBinlog, t.statsBinlogs, t.bm25Binlogs, t.flushedSize, err = 
            writer.Write(ctx, t.pack)
        if err != nil {
            log.Warn("failed to write sync data", zap.Error(err))
            return err
        }
    }
    
    // 更新Metrics
    metrics.DataNodeWriteDataCount.WithLabelValues(
        fmt.Sprint(paramtable.GetNodeID()), t.dataSource, 
        metrics.InsertLabel, fmt.Sprint(t.collectionID)).Add(float64(t.batchRows))
    
    // 写入Meta(调用StreamingNode SaveBinlog)
    if t.metaWriter != nil {
        err = t.writeMeta(ctx)
        if err != nil {
            log.Warn("failed to save serialized data into storage", zap.Error(err))
            return err
        }
    }
    
    // 释放数据
    t.pack.ReleaseData()
    
    // 更新MetaCache
    actions := []metacache.SegmentAction{metacache.FinishSyncing(t.batchRows)}
    if t.pack.isFlush {
        actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Flushed))
    }
    t.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...), 
        metacache.WithSegmentIDs(t.segmentID))
    
    return nil
}

Binlog存储格式

StorageV1格式

{collection_id}/{partition_id}/{segment_id}/{field_id}/
  ├── 0_insert.binlog       # Chunk 0
  ├── 1_insert.binlog       # Chunk 1
  └── ...

{collection_id}/{partition_id}/{segment_id}/delta/
  └── delta.binlog

{collection_id}/{partition_id}/{segment_id}/stats/
  ├── stats.binlog          # PK统计信息
  └── bm25_stats.binlog     # BM25统计信息

StorageV2格式(列式存储)

{collection_id}/{partition_id}/{segment_id}/
  ├── record.parquet        # 主数据文件(Parquet格式)
  ├── delta.parquet         # Delete数据
  └── stats.json            # 统计信息

Sync触发策略

WriteBuffer使用多种策略触发Sync:

  1. SyncPolicyV1(行数或大小)

    • 累积行数 >= insertBufSize(默认16MB对应的行数)
    • 或 Buffer大小 >= insertBufSize
  2. TimedFlushPolicy(超时)

    • 数据停留时间 >= flushDuration(默认10秒)
  3. FlushNotifyPolicy(显式Flush)

    • MetaCache中Segment状态变为Flushing
  4. DropPartitionPolicy(分区删除)

    • 分区被删除,强制Flush所有数据

2.3 Compaction执行流程

Compaction是DataNode的核心功能之一,支持4种类型:L0Delete、Mix、Clustering、Sort。

2.3.1 Compaction调度流程

sequenceDiagram
    autonumber
    participant DC as DataCoord
    participant DN as DataNode
    participant CE as CompactionExecutor
    participant Task as CompactionTask
    participant Pool as CompactionPool
    participant OS as Object Storage
    
    DC->>DN: CompactionV2(CompactionPlan)
    DN->>DN: 检查健康状态
    DN->>DN: 解析CompactionParams
    DN->>DN: 创建ChunkManager
    
    alt L0DeleteCompaction
        DN->>Task: NewLevelZeroCompactionTask()
    else MixCompaction
        DN->>Task: NewMixCompactionTask()
    else ClusteringCompaction
        DN->>Task: NewClusteringCompactionTask()
    else SortCompaction
        DN->>Task: NewSortCompactionTask()
    end
    
    DN->>CE: Enqueue(task)
    CE->>CE: 检查Slot可用性
    CE->>CE: 计算Slot占用
    CE->>CE: taskCh <- task
    DN-->>DC: Status{Success}
    
    Note over CE: 后台goroutine
    CE->>Pool: Submit(executeTask)
    
    activate Task
    Pool->>Task: Compact()
    Task->>OS: 读取源Segment Binlog
    Task->>Task: 执行Compaction逻辑
    Task->>OS: 写入新Segment Binlog
    Task-->>Pool: CompactionPlanResult
    deactivate Task
    
    Pool->>CE: completed.Insert(result)
    CE->>CE: 释放Slot
    
    DC->>DN: GetCompactionState(planID)
    DN->>CE: GetResults(planID)
    CE-->>DN: CompactionPlanResult
    DN-->>DC: CompactionStateResponse

2.3.2 MixCompaction详细流程

MixCompaction合并多个小Segment并清理已删除数据。

sequenceDiagram
    autonumber
    participant Task as MixCompactionTask
    participant Reader as BinlogReader
    participant DM as DeltaMerger
    participant Writer as SegmentWriter
    participant OS as Object Storage
    
    Task->>Task: init()
    Task->>Task: 加载Schema和PKField
    
    loop 每个输入Segment
        Task->>Reader: LoadBinlogs(segmentID)
        Reader->>OS: Download Binlog
        Reader->>Reader: DecompressBinlog
        Reader-->>Task: InsertData
        
        Task->>Reader: LoadDeltaBinlogs(segmentID)
        Reader->>OS: Download DeltaLog
        Reader-->>Task: DeleteData
    end
    
    Task->>DM: Merge所有DeleteData
    DM->>DM: 构建PK -> Timestamp映射
    
    Task->>Task: 分配新SegmentID
    Task->>Writer: NewMultiSegmentWriter()
    
    loop 读取每行数据
        Task->>Task: 获取PK
        Task->>DM: IsDeleted(pk, ts)
        
        alt 未删除
            Task->>Writer: Write(value)
            
            alt Writer满
                Writer->>Writer: rotateWriter()
                Writer->>OS: Upload Binlog
                Task->>Task: 分配新SegmentID
            end
        else 已删除
            Task->>Task: 跳过该行
        end
    end
    
    Task->>Writer: Close()
    Writer->>OS: Upload最后一批Binlog
    
    Task->>Task: 构造CompactionPlanResult
    Task-->>Task: segmentsbinlogsstatslogs

关键代码:CompactionExecutor

// executor 管理Compaction任务队列和并发控制
type executor struct {
    executing          *typeutil.ConcurrentMap[int64, Compactor]
    completedCompactor *typeutil.ConcurrentMap[int64, Compactor]
    completed          *typeutil.ConcurrentMap[int64, *datapb.CompactionPlanResult]
    taskCh             chan Compactor
    slotMu             sync.RWMutex
    usingSlots         int64
}

// Enqueue 将任务加入队列
func (e *executor) Enqueue(task Compactor) (bool, error) {
    e.slotMu.Lock()
    defer e.slotMu.Unlock()
    
    newSlotUsage := task.GetSlotUsage()
    // 兼容旧版本,计算默认Slot占用
    if task.GetSlotUsage() <= 0 {
        switch task.GetCompactionType() {
        case datapb.CompactionType_ClusteringCompaction:
            newSlotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
        case datapb.CompactionType_MixCompaction:
            newSlotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
        case datapb.CompactionType_Level0DeleteCompaction:
            newSlotUsage = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64()
        }
        log.Warn("illegal task slot usage, change it to a default value", 
            zap.Int64("newSlotUsage", newSlotUsage))
    }
    
    e.usingSlots = e.usingSlots + newSlotUsage
    _, ok := e.executing.GetOrInsert(task.GetPlanID(), task)
    if ok {
        return false, merr.WrapErrDuplicatedCompactionTask()
    }
    
    e.taskCh <- task  // 加入Channel
    return true, nil
}

// Start 启动后台执行循环
func (e *executor) Start(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case task := <-e.taskCh:
            // 提交到Worker Pool执行
            GetExecPool().Submit(func() (any, error) {
                e.executeTask(task)
                return nil, nil
            })
        }
    }
}

// executeTask 执行单个Compaction任务
func (e *executor) executeTask(task Compactor) {
    log := log.With(
        zap.Int64("planID", task.GetPlanID()),
        zap.Int64("Collection", task.GetCollection()),
        zap.String("channel", task.GetChannelName()),
    )
    
    defer func() {
        e.toCompleteState(task)  // 完成后释放Slot
    }()
    
    log.Info("start to execute compaction")
    
    // 调用任务的Compact方法
    result, err := task.Compact()
    if err != nil {
        log.Warn("compaction task failed", zap.Error(err))
        return
    }
    
    // 保存结果
    e.completed.Insert(result.GetPlanID(), result)
    e.completedCompactor.Insert(result.GetPlanID(), task)
    
    // 更新Metrics
    var entityCount int64
    var deleteCount int64
    lo.ForEach(result.Segments, func(seg *datapb.CompactionSegment, _ int) {
        entityCount += seg.GetNumOfRows()
        deleteCount += getDataCount(seg.GetDeltalogs())
    })
    metrics.DataNodeWriteDataCount.WithLabelValues(
        fmt.Sprint(paramtable.GetNodeID()),
        metrics.CompactionDataSourceLabel,
        metrics.InsertLabel,
        fmt.Sprint(task.GetCollection())).Add(float64(entityCount))
    
    log.Info("end to execute compaction")
}

关键代码:MixCompaction执行

func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
    ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, "MixCompaction")
    defer span.End()
    
    // 1. 加载所有Segment的Binlog
    allValues, err := t.loadSegments(ctx)
    if err != nil {
        return nil, err
    }
    
    // 2. 合并所有Delete数据,构建PK->Timestamp映射
    deltaMerger := storage.NewDeltaMerge(t.primaryKeyIDs)
    for _, segment := range t.plan.SegmentBinlogs {
        deltaData := t.loadDeltaLogs(ctx, segment)
        deltaMerger.Add(deltaData)
    }
    
    // 3. 创建MultiSegmentWriter(支持自动分段)
    writer := compactor.NewMultiSegmentWriter(
        t.collectionID, t.partitionID, t.plan.GetSchema(), 
        t.segmentIDAlloc, t.logIDAlloc, t.binlogIO,
        maxRows, binLogMaxSize, storageVersion,
    )
    
    // 4. 迭代所有数据,过滤已删除行
    for _, value := range allValues {
        pk := value.PK
        ts := value.Timestamp
        
        // 检查是否被删除
        if deltaMerger.IsDeleted(pk, ts) {
            continue  // 跳过已删除行
        }
        
        // 写入数据
        err = writer.WriteValue(value)
        if err != nil {
            return nil, err
        }
        
        // Writer内部会自动判断是否需要Rotate(创建新Segment)
    }
    
    // 5. 关闭Writer,上传最后一批数据
    err = writer.Close()
    if err != nil {
        return nil, err
    }
    
    // 6. 构造CompactionPlanResult
    segments := writer.GetCompactionSegments()
    planResult := &datapb.CompactionPlanResult{
        PlanID:   t.plan.PlanID,
        State:    datapb.CompactionTaskState_completed,
        Segments: segments,
        Type:     t.plan.Type,
    }
    
    return planResult, nil
}

2.3.3 L0DeleteCompaction流程

L0DeleteCompaction将L0 Segment的Delete数据应用到Sealed Segment。

sequenceDiagram
    autonumber
    participant Task as L0CompactionTask
    participant Reader as BinlogReader
    participant Writer as DeltaWriter
    participant OS as Object Storage
    
    Task->>Task: init()
    Note over Task: 获取L0 Segments和<br/>目标Sealed Segments
    
    loop 每个L0 Segment
        Task->>Reader: LoadDeltaBinlogs(l0SegmentID)
        Reader->>OS: Download DeltaLog
        Reader-->>Task: DeleteData (PK list)
    end
    
    Task->>Task: PK分组DeleteData
    
    loop 每个目标Segment
        Task->>Writer: NewSegmentDeltaWriter(segmentID)
        
        Task->>Task: 筛选属于该Segment的PK
        Task->>Writer: WriteBatch(pks, timestamps)
        
        Writer->>OS: Upload DeltaLog
        Writer-->>Task: DeltaBinlog paths
    end
    
    Task->>Task: 构造CompactionPlanResult
    Task-->>Task: 返回每个Segment的新DeltaLog路径

2.3.4 ClusteringCompaction流程

ClusteringCompaction按Clustering Key重新组织数据,优化范围查询性能。

流程概览

  1. Analyze阶段:分析Clustering Key分布,计算分区边界(Scalar)或聚类中心(Vector)
  2. Mapping阶段:根据分区边界将数据分配到不同Buffer
  3. Flush阶段:将每个Buffer写入独立的Segment

适用场景

  • Scalar Clustering Key:时间戳、地理位置等范围查询频繁的字段
  • Vector Clustering Key:相似度搜索性能优化

2.4 索引构建流程

DataNode通过TaskScheduler调度索引构建任务,支持向量索引、标量索引和统计信息收集。

sequenceDiagram
    autonumber
    participant DC as DataCoord
    participant DN as DataNode
    participant TS as TaskScheduler
    participant TM as TaskManager
    participant IB as IndexBuildTask
    participant Knowhere as Knowhere/Segcore
    participant OS as Object Storage
    
    DC->>DN: CreateTask(IndexTask)
    DN->>DN: createIndexTask(req)
    DN->>TM: LoadOrStoreIndexTask(taskCtx, req)
    
    alt 任务已存在
        TM-->>DN: 已有任务
        DN-->>DC: Status{Duplicate}
    else 新任务
        TM->>TM: 保存任务信息
        DN->>TS: AddTask(indexBuildTask)
        TS->>TS: utChan <- task
        DN-->>DC: Status{Success}
    end
    
    Note over TS: 后台indexBuildLoop
    TS->>TS: PopUnissuedTask()
    TS->>TS: 检查Slot可用性
    
    alt Slot充足
        TS->>IB: processTask(task)
        activate IB
        
        IB->>IB: PreExecute()
        IB->>OS: 下载Binlog数据
        IB->>IB: 解压Binlog
        
        IB->>IB: Execute()
        IB->>Knowhere: CreateIndex(buildParams)
        Note over Knowhere: C++索引引擎<br/>HNSW/IVF_FLAT/DiskANN
        Knowhere->>Knowhere: 构建索引
        Knowhere-->>IB: IndexHandle
        
        IB->>IB: PostExecute()
        IB->>Knowhere: index.UpLoad()
        Knowhere->>OS: 上传索引文件
        IB->>TM: StoreIndexFilesAndStatistic()
        
        IB->>IB: SetState(Finished)
        deactivate IB
    else Slot不足
        TS->>TS: sleep(50ms)
        TS->>TS: 重新检查
    end
    
    DC->>DN: QueryTask(IndexTask)
    DN->>TM: GetIndexTaskInfo(taskID)
    TM-->>DN: JobState, FileKeys, Size
    DN-->>DC: QueryTaskResponse

关键代码:TaskScheduler

type TaskScheduler struct {
    ctx       context.Context
    cancel    context.CancelFunc
    wg        sync.WaitGroup
    TaskQueue TaskQueue  // 管理任务队列和Slot
}

// indexBuildLoop 后台索引构建循环
func (sched *TaskScheduler) indexBuildLoop() {
    log.Ctx(sched.ctx).Debug("TaskScheduler start build loop ...")
    defer sched.wg.Done()
    
    for {
        select {
        case <-sched.ctx.Done():
            return
        case <-sched.TaskQueue.utChan():  // 有新任务
            t := sched.TaskQueue.PopUnissuedTask()
            
            // 等待Slot可用
            for {
                totalSlot := CalculateNodeSlots()
                availableSlot := totalSlot - sched.TaskQueue.GetActiveSlot()
                
                if availableSlot >= t.GetSlot() || totalSlot == availableSlot {
                    // Slot充足,启动goroutine执行
                    go func(t Task) {
                        sched.processTask(t, sched.TaskQueue)
                    }(t)
                    break
                }
                time.Sleep(time.Millisecond * 50)  // Slot不足,等待
            }
        }
    }
}

// processTask 执行单个任务(索引/Stats/Analyze)
func (sched *TaskScheduler) processTask(t Task, q TaskQueue) {
    defer func() {
        t.Reset()
        debug.FreeOSMemory()  // 强制GC释放内存
    }()
    
    sched.TaskQueue.AddActiveTask(t)
    defer sched.TaskQueue.PopActiveTask(t.Name())
    
    log.Ctx(t.Ctx()).Debug("process task", zap.String("task", t.Name()))
    
    // 依次执行三个阶段
    pipelines := []func(context.Context) error{t.PreExecute, t.Execute, t.PostExecute}
    for _, fn := range pipelines {
        if err := fn(t.Ctx()); err != nil {
            log.Ctx(t.Ctx()).Warn("process task failed", zap.Error(err))
            t.SetState(getStateFromError(err), err.Error())
            return
        }
    }
    
    t.SetState(indexpb.JobState_JobStateFinished, "")
}

关键代码:IndexBuildTask执行

// PreExecute 下载Binlog并准备数据
func (it *indexBuildTask) PreExecute(ctx context.Context) error {
    // 1. 下载Insert Binlog
    err := binlog.DecompressBinLogWithRootPath(
        it.req.GetStorageConfig().GetRootPath(),
        storage.InsertBinlog,
        it.req.GetCollectionID(),
        it.req.GetPartitionID(),
        it.req.GetSegmentID(),
        it.req.GetDataPaths(),
    )
    if err != nil {
        return err
    }
    
    // 2. 下载Stats Binlog(包含PK统计信息)
    err = binlog.DecompressBinLogWithRootPath(
        it.req.GetStorageConfig().GetRootPath(),
        storage.StatsBinlog,
        it.req.GetCollectionID(),
        it.req.GetPartitionID(),
        it.req.GetSegmentID(),
        it.req.GetStatsLogs(),
    )
    if err != nil {
        return err
    }
    
    return nil
}

// Execute 调用Knowhere构建索引
func (it *indexBuildTask) Execute(ctx context.Context) error {
    // 构造BuildIndexParams
    buildIndexParams := &indexcgowrapper.BuildIndexParams{
        IndexType:       it.req.GetIndexType(),
        MetricType:      it.req.GetMetricType(),
        FieldSchema:     it.fieldSchema,
        InsertFiles:     it.req.GetDataPaths(),
        Dim:             it.req.GetDim(),
        NumRows:         it.req.GetNumRows(),
        StorageConfig:   it.req.GetStorageConfig(),
        IndexParams:     it.req.GetIndexParams(),
        TypeParams:      it.req.GetTypeParams(),
    }
    
    // 调用C++ Knowhere构建索引
    it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexParams)
    if err != nil {
        if it.index != nil && it.index.CleanLocalData() != nil {
            log.Warn("failed to clean cached data on disk after build index failed")
        }
        return err
    }
    
    buildIndexLatency := it.tr.RecordSpan()
    metrics.DataNodeKnowhereBuildIndexLatency.WithLabelValues(
        strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(buildIndexLatency.Seconds())
    
    return nil
}

// PostExecute 上传索引文件
func (it *indexBuildTask) PostExecute(ctx context.Context) error {
    // 1. 上传索引到对象存储
    indexStats, err := it.index.UpLoad()
    if err != nil {
        it.index.Delete()  // 清理本地缓存
        return err
    }
    
    encodeIndexFileDur := it.tr.Record("index serialize and upload done")
    metrics.DataNodeEncodeIndexFileLatency.WithLabelValues(
        strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(encodeIndexFileDur.Seconds())
    
    // 2. 提前释放索引(GC)
    it.index.Delete()
    
    // 3. 记录索引文件信息到TaskManager
    var serializedSize uint64
    saveFileKeys := make([]string, 0)
    for _, indexInfo := range indexStats.GetSerializedIndexInfos() {
        serializedSize += uint64(indexInfo.FileSize)
        parts := strings.Split(indexInfo.FileName, "/")
        fileKey := parts[len(parts)-1]
        saveFileKeys = append(saveFileKeys, fileKey)
    }
    
    it.manager.StoreIndexFilesAndStatistic(
        it.req.GetClusterID(),
        it.req.GetBuildID(),
        saveFileKeys,
        serializedSize,
        uint64(indexStats.MemSize),
        it.req.GetCurrentIndexVersion(),
        it.req.GetCurrentScalarIndexVersion(),
    )
    
    return nil
}

Slot计算

// CalculateNodeSlots 根据CPU和内存计算可用Slot数
func CalculateNodeSlots() int64 {
    cpuCores := hardware.GetCPUNum()
    memGB := hardware.GetMemoryCount() / 1024 / 1024 / 1024
    
    // 每个Slot需要至少4GB内存和2个CPU核心
    slotByCPU := cpuCores / 2
    slotByMem := memGB / 4
    
    // 取较小值
    totalSlot := slotByCPU
    if slotByMem < slotByCPU {
        totalSlot = slotByMem
    }
    
    // 至少保留1个Slot
    if totalSlot < 1 {
        totalSlot = 1
    }
    
    return int64(totalSlot)
}

2.5 数据导入流程

DataNode的ImportV2支持两阶段导入:PreImport(验证)和Import(执行)。

sequenceDiagram
    autonumber
    participant DC as DataCoord
    participant DN as DataNode
    participant IS as ImportScheduler
    participant PreTask as PreImportTask
    participant ImpTask as ImportTask
    participant SM as SyncManager
    participant OS as Object Storage
    
    Note over DC,DN: 第一阶段:PreImport
    DC->>DN: CreateTask(PreImportTask)
    DN->>PreTask: NewPreImportTask(req)
    DN->>IS: Add(preTask)
    DN-->>DC: Status{Success}
    
    IS->>PreTask: Execute()
    activate PreTask
    PreTask->>OS: 读取导入文件
    PreTask->>PreTask: 解析文件格式(Parquet/JSON/CSV)
    PreTask->>PreTask: 验证Schema匹配
    PreTask->>PreTask: 统计行数和大小
    PreTask->>PreTask: SetState(Completed)
    deactivate PreTask
    
    DC->>DN: QueryTask(PreImportTask)
    DN->>DN: importTaskMgr.Get(taskID)
    DN-->>DC: FileStats、State、Reason
    
    Note over DC,DN: 第二阶段:Import
    DC->>DN: CreateTask(ImportTask)
    DN->>ImpTask: NewImportTask(req)
    DN->>IS: Add(impTask)
    DN-->>DC: Status{Success}
    
    IS->>ImpTask: Execute()
    activate ImpTask
    ImpTask->>OS: 读取导入文件
    
    loop 每个VChannel
        ImpTask->>ImpTask: 按Hash分配数据到Segment
        ImpTask->>ImpTask: 构造InsertData
        ImpTask->>SM: SyncData(syncTask)
        SM->>OS: 写入Binlog
    end
    
    ImpTask->>ImpTask: SetState(Completed)
    deactivate ImpTask
    
    DC->>DN: QueryTask(ImportTask)
    DN-->>DC: SegmentsInfo、State、Reason

关键代码:ImportTask执行

func (t *ImportTask) Execute() error {
    ctx := t.ctx
    
    // 1. 创建Reader(根据文件格式)
    reader, err := importutilv2.NewReader(ctx, t.req, t.chunkManager)
    if err != nil {
        return err
    }
    defer reader.Close()
    
    // 2. 为每个VChannel创建HashManager
    hashers := make(map[string]*importutilv2.HashManager)
    for _, vchannel := range t.req.GetVchannels() {
        hasher := importutilv2.NewHashManager(vchannel, t.req.GetRequestSegments())
        hashers[vchannel] = hasher
    }
    
    // 3. 读取数据并分配到Segment
    for {
        data, err := reader.Read()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        
        // 按主键Hash分配到VChannel和Segment
        for vchannel, hasher := range hashers {
            segmentData := hasher.Distribute(data)
            
            // 为每个Segment构造SyncTask
            for segmentID, insertData := range segmentData {
                syncTask := t.buildSyncTask(vchannel, segmentID, insertData)
                _, err := t.syncMgr.SyncData(ctx, syncTask)
                if err != nil {
                    return err
                }
            }
        }
    }
    
    // 4. Flush所有Segment
    for vchannel, hasher := range hashers {
        segments := hasher.GetSegments()
        for _, segmentID := range segments {
            syncTask := t.buildSyncTask(vchannel, segmentID, nil)
            syncTask.WithFlush()  // 标记为Flush
            _, err := t.syncMgr.SyncData(ctx, syncTask)
            if err != nil {
                return err
            }
        }
    }
    
    return nil
}

3. 关键设计点

3.1 SyncManager按SegmentID串行化设计

问题:多个SyncTask同时操作同一Segment会导致数据竞争

解决方案:SyncManager按SegmentID分配Worker,保证同一Segment的Task串行执行

type syncManager struct {
    workerPool *conc.Pool[struct{}]
    keyLock    *typeutil.KeyLock[int64]  // 按SegmentID加锁
    tasks      *typeutil.ConcurrentMap[string, Task]
}

func (mgr *syncManager) submit(ctx context.Context, key int64, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
    handler := func(err error) error {
        taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp())
        defer mgr.tasks.Remove(taskKey)
        
        if err == nil {
            return nil
        }
        task.HandleError(err)
        return err
    }
    callbacks = append([]func(error) error{handler}, callbacks...)
    
    // 按SegmentID提交到WorkerPool,相同SegmentID的Task串行执行
    return mgr.Submit(ctx, key, task, callbacks...)
}

3.2 CompactionExecutor Slot管理设计

问题:Compaction任务资源占用大,需要限制并发数

解决方案:通过Slot机制控制并发,不同类型任务占用不同Slot数

type executor struct {
    executing          *typeutil.ConcurrentMap[int64, Compactor]
    completedCompactor *typeutil.ConcurrentMap[int64, Compactor]
    completed          *typeutil.ConcurrentMap[int64, *datapb.CompactionPlanResult]
    taskCh             chan Compactor
    slotMu             sync.RWMutex
    usingSlots         int64  // 当前占用的Slot数
}

// Slot占用配置(可配置)
const (
    ClusteringCompactionSlot = 8   // Clustering最耗资源
    MixCompactionSlot        = 2
    L0DeleteCompactionSlot   = 1
)

3.3 TaskScheduler三阶段执行模型

设计目标:清晰分离任务生命周期,便于错误处理和资源清理

// 三阶段:PreExecute → Execute → PostExecute
pipelines := []func(context.Context) error{
    t.PreExecute,   // 准备:下载数据、验证参数
    t.Execute,      // 执行:核心逻辑(索引构建/统计计算)
    t.PostExecute,  // 清理:上传结果、释放资源
}

for _, fn := range pipelines {
    if err := fn(t.Ctx()); err != nil {
        // 记录失败状态
        t.SetState(getStateFromError(err), err.Error())
        return
    }
}

t.SetState(indexpb.JobState_JobStateFinished, "")

3.4 MultiSegmentWriter自动分段设计

问题:Compaction输出Segment大小不可预测,可能超过配置限制

解决方案:Writer内部自动Rotate,创建多个Segment

type MultiSegmentWriter struct {
    writer           *storage.BinlogValueWriter
    currentSegmentID int64
    maxRows          int64
    segmentSize      int64
    res              []*datapb.CompactionSegment
}

func (w *MultiSegmentWriter) WriteValue(v *storage.Value) error {
    // 检查是否需要Rotate
    if w.writer == nil || w.writer.GetWrittenUncompressed() >= uint64(w.segmentSize) {
        if err := w.rotateWriter(); err != nil {
            return err
        }
    }
    
    return w.writer.WriteValue(v)
}

func (w *MultiSegmentWriter) rotateWriter() error {
    // 1. 关闭当前Writer,上传Binlog
    if err := w.closeWriter(); err != nil {
        return err
    }
    
    // 2. 分配新SegmentID
    newSegmentID, err := w.allocator.allocSegmentID()
    if err != nil {
        return err
    }
    w.currentSegmentID = newSegmentID
    
    // 3. 创建新Writer
    rw, err := storage.NewBinlogRecordWriter(w.ctx, w.collectionID, w.partitionID, 
        newSegmentID, w.schema, w.allocator.logIDAlloc, chunkSize, w.maxRows, w.rwOption...)
    if err != nil {
        return err
    }
    
    w.writer = storage.NewBinlogValueWriter(rw, w.batchSize)
    return nil
}

4. 关键RPC接口规格

4.1 CompactionV2 - Compaction任务下发

接口定义

rpc CompactionV2(CompactionPlan) returns (common.Status) {}

message CompactionPlan {
  int64 planID = 1;
  repeated CompactionSegmentBinlogs segmentBinlogs = 2;
  int64 timeoutInSeconds = 3;
  CompactionType type = 4;
  int64 channel = 5;
  int64 collectionID = 6;
  int64 partitionID = 7;
  schema.CollectionSchema schema = 8;
  int64 beginLogID = 9;
  IDRange preAllocatedLogIDs = 10;
  IDRange preAllocatedSegmentIDs = 11;
  int64 slotUsage = 12;
  string jsonParams = 13;
}

调用链路

DataCoord → DataNode.CompactionV2() → CompactionExecutor.Enqueue()

关键代码

func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) {
    // 1. 健康检查
    if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
        return merr.Status(err), nil
    }
    
    // 2. 参数验证
    if len(req.GetSegmentBinlogs()) == 0 {
        return merr.Success(), nil
    }
    if req.GetBeginLogID() == 0 {
        return merr.Status(merr.WrapErrParameterInvalidMsg("invalid beginLogID")), nil
    }
    
    // 3. 创建ChunkManager
    taskCtx := tracer.Propagate(ctx, node.ctx)
    compactionParams, err := compaction.ParseParamsFromJSON(req.GetJsonParams())
    cm, err := node.storageFactory.NewChunkManager(node.ctx, compactionParams.StorageConfig)
    
    // 4. 根据类型创建Compactor
    var task compactor.Compactor
    binlogIO := io.NewBinlogIO(cm)
    switch req.GetType() {
    case datapb.CompactionType_Level0DeleteCompaction:
        task = compactor.NewLevelZeroCompactionTask(taskCtx, binlogIO, cm, req, compactionParams)
    case datapb.CompactionType_MixCompaction:
        task = compactor.NewMixCompactionTask(taskCtx, binlogIO, req, compactionParams, pkFieldIDs)
    case datapb.CompactionType_ClusteringCompaction:
        task = compactor.NewClusteringCompactionTask(taskCtx, binlogIO, req, compactionParams)
    case datapb.CompactionType_SortCompaction:
        task = compactor.NewSortCompactionTask(taskCtx, binlogIO, req, compactionParams, pkFieldIDs)
    }
    
    // 5. 加入执行队列
    succeed, err := node.compactionExecutor.Enqueue(task)
    if succeed {
        return merr.Success(), nil
    } else {
        return merr.Status(err), nil
    }
}

错误处理

  • ErrParameterInvalid:参数校验失败
  • ErrDuplicatedCompactionTask:重复的PlanID
  • ErrInsufficientSlots:Slot不足(返回成功但任务排队)

4.2 CreateTask - 统一任务创建接口

接口定义

rpc CreateTask(CreateTaskRequest) returns (common.Status) {}

message CreateTaskRequest {
  map<string, string> properties = 1;  // 任务属性(类型、ID等)
  bytes payload = 2;                   // 任务详细参数(Protobuf序列化)
}

支持的任务类型

  1. taskcommon.PreImport:预导入验证
  2. taskcommon.Import:批量导入
  3. taskcommon.Compaction:Compaction任务
  4. taskcommon.Index:索引构建
  5. taskcommon.Stats:统计信息收集
  6. taskcommon.Analyze:数据分析

调用链路

DataCoord → DataNode.CreateTask() → 
  - importTaskMgr.Add() (PreImport/Import)
  - compactionExecutor.Enqueue() (Compaction)
  - taskScheduler.AddTask() (Index/Stats/Analyze)

关键代码

func (node *DataNode) CreateTask(ctx context.Context, request *workerpb.CreateTaskRequest) (*commonpb.Status, error) {
    properties := taskcommon.NewProperties(request.GetProperties())
    taskType, err := properties.GetTaskType()
    
    switch taskType {
    case taskcommon.PreImport:
        req := &datapb.PreImportRequest{}
        proto.Unmarshal(request.GetPayload(), req)
        return node.PreImport(ctx, req)
        
    case taskcommon.Import:
        req := &datapb.ImportRequest{}
        proto.Unmarshal(request.GetPayload(), req)
        return node.ImportV2(ctx, req)
        
    case taskcommon.Compaction:
        req := &datapb.CompactionPlan{}
        proto.Unmarshal(request.GetPayload(), req)
        return node.CompactionV2(ctx, req)
        
    case taskcommon.Index:
        req := &workerpb.CreateJobRequest{}
        proto.Unmarshal(request.GetPayload(), req)
        return node.createIndexTask(ctx, req)
        
    case taskcommon.Stats:
        req := &workerpb.CreateStatsRequest{}
        proto.Unmarshal(request.GetPayload(), req)
        return node.createStatsTask(ctx, req)
        
    case taskcommon.Analyze:
        req := &workerpb.AnalyzeRequest{}
        proto.Unmarshal(request.GetPayload(), req)
        return node.createAnalyzeTask(ctx, req)
    }
}

4.3 QuerySlot - 资源查询

接口定义

rpc QuerySlot(QuerySlotRequest) returns (QuerySlotResponse) {}

message QuerySlotResponse {
  common.Status status = 1;
  int64 availableSlots = 2;
}

Slot计算

totalSlots = min(CPU核数/2, 内存GB/4)
availableSlots = totalSlots - indexStatsUsed - compactionUsed - importUsed

关键代码

func (node *DataNode) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest) (*datapb.QuerySlotResponse, error) {
    var (
        totalSlots     = node.totalSlot
        indexStatsUsed = node.taskScheduler.TaskQueue.GetUsingSlot()
        compactionUsed = node.compactionExecutor.Slots()
        importUsed     = node.importScheduler.Slots()
    )
    
    availableSlots := totalSlots - indexStatsUsed - compactionUsed - importUsed
    if availableSlots < 0 {
        availableSlots = 0
    }
    
    // 更新Metrics
    metrics.DataNodeSlot.WithLabelValues(fmt.Sprint(node.GetNodeID()), "available").Set(float64(availableSlots))
    metrics.DataNodeSlot.WithLabelValues(fmt.Sprint(node.GetNodeID()), "total").Set(float64(totalSlots))
    
    return &datapb.QuerySlotResponse{
        Status:         merr.Success(),
        AvailableSlots: availableSlots,
    }, nil
}

4.4 GetCompactionState - 任务状态查询

接口定义

rpc GetCompactionState(CompactionStateRequest) returns (CompactionStateResponse) {}

message CompactionStateRequest {
  int64 planID = 1;
}

message CompactionStateResponse {
  common.Status status = 1;
  repeated CompactionPlanResult results = 2;
}

关键代码

func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.CompactionStateRequest) (*datapb.CompactionStateResponse, error) {
    results := node.compactionExecutor.GetResults(req.GetPlanID())
    return &datapb.CompactionStateResponse{
        Status:  merr.Success(),
        Results: results,
    }, nil
}

5. 性能与容量规划

5.1 性能指标

操作类型 吞吐量 延迟 资源消耗 说明
Sync任务 20-100MB/s 100-500ms CPU:中等
内存:低
取决于字段数和压缩算法
L0Compaction 200-500MB/s 1-3s/GB CPU:低
内存:低
仅处理Delete数据
MixCompaction 50-200MB/s 5-20s/GB CPU:高
内存:中等
需读取和过滤所有数据
ClusteringCompaction 30-100MB/s 10-60s/GB CPU:高
内存:高
需要Analyze和重组数据
索引构建(HNSW) 10-50万行/分钟 取决于维度和参数 CPU:高
内存:高
向量维度越高越慢
索引构建(IVF_FLAT) 50-200万行/分钟 取决于nlist CPU:中等
内存:中等
训练阶段较慢
Import任务 100-500MB/s 取决于文件大小 CPU:中等
内存:中等
Parquet最快,JSON最慢

5.2 容量规划

Slot配置

资源配置 TotalSlot 建议任务分配
8核16GB 4 Index: 2, Compaction: 1, Import: 1
16核32GB 8 Index: 4, Compaction: 2, Import: 2
32核64GB 16 Index: 8, Compaction: 4, Import: 4
64核128GB 32 Index: 16, Compaction: 8, Import: 8

内存规划

组件 内存占用 说明
SyncManager <1GB 固定开销
SyncTask(单个) 数据大小*1.2 包含序列化缓冲
Compaction(Mix) 输入Segment大小总和 需全部加载到内存
Compaction(Clustering) 输入大小*1.5 需额外Buffer
IndexBuild(HNSW) NumRows * Dim * 4 * 2 原始数据+索引
IndexBuild(IVF) NumRows * Dim * 4 * 1.2 原始数据+倒排表

5.3 关键配置参数

dataNode:
  # Sync配置
  dataSync:
    workerPoolSize: 16
    writeRetry:
      maxRetries: 3
      initialBackoff: 100ms
      
  # StorageV2配置
  storageV2:
    enabled: true
    multiPartUploadSize: 5MB
    
# DataCoord配置(影响DataNode)
dataCoord:
  compaction:
    clusteringCompactionSlotUsage: 8
    mixCompactionSlotUsage: 2
    level0DeleteCompactionSlotUsage: 1

5.4 监控指标

# Slot使用
datanode_slot{nodeID="1", type="available"}
datanode_slot{nodeID="1", type="total"}

# Sync任务
datanode_flush_buffer_count{nodeID="1", status="success"}
datanode_save_to_storage_latency_seconds{nodeID="1"}

# Compaction任务
datanode_write_data_count{nodeID="1", data_source="compaction"}

# 索引构建
datanode_knowhere_build_index_latency_seconds{nodeID="1"}

5.5 性能调优建议

  1. Sync性能优化:增加workerPoolSize,启用StorageV2格式(提升30-50%)
  2. Compaction优化:合理设置Slot占用,避免OOM;对大Segment优先使用SortCompaction
  3. 索引构建优化:HNSW增加buildThreads;DiskANN使用SSD;分批构建大规模索引
  4. Import优化:使用Parquet格式(比JSON快3-5倍);增加maxConcurrentReadFiles