Milvus-05-DataNode-概览
1. 模块概述
1.1 职责定义
DataNode是Milvus分布式架构中的数据持久化组件,负责将流式数据写入对象存储,执行数据整理和索引构建任务。
核心职责:
-
数据同步与持久化
- 通过SyncManager管理数据同步任务
- 将内存数据序列化为Binlog格式
- 上传到Object Storage(S3/MinIO/OSS)
- 支持StorageV1和StorageV2两种存储格式
-
Compaction执行
- L0DeleteCompaction:应用Delete日志到Sealed Segment
- MixCompaction:合并小Segment并清理删除数据
- ClusteringCompaction:按Clustering Key重新组织数据
- SortCompaction:按主键排序优化查询
-
索引构建
- 执行向量索引构建(HNSW/IVF_FLAT/DiskANN)
- 执行标量索引构建(B-Tree/Inverted)
- 执行统计信息收集(Stats/Analyze)
- 构建BM25统计信息(全文检索)
-
数据导入
- 批量导入任务调度(ImportV2)
- 多文件格式支持(Parquet/JSON/CSV)
- L0 Import(仅Delete数据导入)
-
资源管理
- Slot管理(控制并发任务数)
- 内存控制(防止OOM)
- 任务优先级调度
1.2 整体架构图
flowchart TB
subgraph DataNode["DataNode 核心组件"]
subgraph Core["核心服务"]
DN[DataNode Server<br/>生命周期管理]
Session[Session<br/>Etcd注册/心跳]
end
subgraph Managers["管理器层"]
SyncMgr[SyncManager<br/>数据同步管理]
CompactExec[CompactionExecutor<br/>Compaction执行器]
TaskSched[TaskScheduler<br/>索引/Stats任务调度]
ImportSched[ImportScheduler<br/>导入任务调度]
end
subgraph Workers["工作池"]
SyncPool[Sync Worker Pool<br/>并发同步任务]
CompactPool[Compaction Pool<br/>Compaction并发]
IndexPool[Index Builder Pool<br/>索引构建并发]
end
subgraph Storage["存储抽象"]
ChunkMgr[ChunkManager<br/>对象存储客户端]
BinlogIO[BinlogIO<br/>Binlog读写]
end
DN --> Session
DN --> SyncMgr
DN --> CompactExec
DN --> TaskSched
DN --> ImportSched
SyncMgr --> SyncPool
CompactExec --> CompactPool
TaskSched --> IndexPool
SyncPool --> BinlogIO
CompactPool --> BinlogIO
IndexPool --> BinlogIO
BinlogIO --> ChunkMgr
end
subgraph External["外部依赖"]
DC[DataCoord<br/>任务分配]
Etcd[Etcd<br/>服务注册/元数据]
OS[Object Storage<br/>S3/MinIO/OSS]
end
DN -.->|注册/心跳| Etcd
DC -.->|CompactionV2<br/>CreateTask| DN
ChunkMgr -.->|Upload/Download| OS
style DataNode fill:#e1f5ff,stroke:#333,stroke-width:2px
style Managers fill:#fff4e6,stroke:#333,stroke-width:1px
style Workers fill:#e6f7ff,stroke:#333,stroke-width:1px
架构说明
-
核心服务层
- DataNode Server:整个服务的入口,管理生命周期(Init/Start/Stop)
- Session:通过Etcd注册节点,定期发送心跳,维护节点在线状态
-
管理器层
- SyncManager:管理所有数据同步任务,按SegmentID分配Worker,确保同一Segment的同步任务串行执行
- CompactionExecutor:维护Compaction任务队列,管理Slot分配,执行各类Compaction任务
- TaskScheduler:调度索引构建和统计信息任务,控制并发度
- ImportScheduler:调度批量导入任务,管理导入任务的生命周期
-
工作池
- Sync Worker Pool:并发执行数据同步任务,将WriteBuffer中的数据序列化并上传
- Compaction Pool:并发执行Compaction,支持L0/Mix/Clustering/Sort四种类型
- Index Builder Pool:并发构建索引,调用Knowhere/Segcore接口
-
存储抽象
- ChunkManager:封装对象存储操作(S3/MinIO/OSS/本地),支持分片上传
- BinlogIO:封装Binlog读写逻辑,处理压缩/解压缩
1.3 与DataCoord交互总览
sequenceDiagram
autonumber
participant DC as DataCoord
participant DN as DataNode
participant Etcd as Etcd
participant OS as Object Storage
Note over DN,Etcd: 节点注册与心跳
DN->>Etcd: 注册节点信息
DN->>Etcd: 定期更新心跳
Note over DC,DN: Compaction任务分配
DC->>DN: CompactionV2(CompactionPlan)
DN->>DN: 创建Compaction任务
DN->>DN: 加入ExecutorQueue
DN-->>DC: Status{Success}
Note over DN,OS: Compaction执行
DN->>OS: 读取Binlog数据
DN->>DN: 执行Compaction逻辑
DN->>OS: 写入新Segment Binlog
Note over DC,DN: 任务状态查询
DC->>DN: GetCompactionState(planID)
DN-->>DC: CompactionStateResponse
Note over DC,DN: 索引构建任务
DC->>DN: CreateTask(IndexTask)
DN->>DN: TaskScheduler调度
DN->>DN: IndexBuilder构建索引
DN->>OS: 上传索引文件
DN-->>DC: Status{Success}
Note over DC,DN: 导入任务
DC->>DN: CreateTask(ImportTask)
DN->>DN: ImportScheduler调度
DN->>OS: 读取导入文件
DN->>OS: 写入Segment Binlog
DN-->>DC: Status{Success}
Note over DC,DN: Slot查询
DC->>DN: QuerySlot()
DN-->>DC: QuerySlotResponse{availableSlots}
交互说明
-
节点注册与心跳
- DataNode启动后向Etcd注册,包含ServerID、地址、版本等信息
- 定期发送心跳(默认1秒),若心跳超时则DataNode进程退出
-
Compaction任务分配
- DataCoord通过
CompactionV2RPC下发任务 - DataNode检查Slot可用性,创建对应类型的Compactor
- 任务加入CompactionExecutor队列,异步执行
- DataCoord通过
-
索引构建任务
- DataCoord通过
CreateTaskRPC下发IndexTask - TaskScheduler根据Slot可用性调度
- IndexBuilder调用Knowhere/Segcore接口构建索引
- DataCoord通过
-
导入任务
- DataCoord通过
CreateTaskRPC下发ImportTask - ImportScheduler管理PreImport和Import两阶段
- PreImport验证文件格式和内容,Import执行实际写入
- DataCoord通过
-
Slot管理
- DataCoord通过
QuerySlot查询可用Slot数 - DataNode根据总Slot数减去各类任务占用计算可用Slot
- 用于任务调度和负载均衡
- DataCoord通过
2. 核心流程详解
2.1 DataNode启动流程
sequenceDiagram
autonumber
participant Main as main()
participant DN as DataNode
participant Session as Session
participant Etcd as Etcd
participant SyncMgr as SyncManager
participant CompactExec as CompactionExecutor
participant TaskSched as TaskScheduler
participant ImportSched as ImportScheduler
Main->>DN: NewDataNode(ctx)
activate DN
DN->>DN: 初始化各组件
DN->>CompactExec: NewExecutor()
DN->>TaskSched: NewTaskScheduler()
DN-->>Main: dataNode实例
deactivate DN
Main->>DN: SetEtcdClient(etcdCli)
Main->>DN: Init()
activate DN
DN->>DN: registerMetricsRequest()
DN->>Session: NewSession(ctx)
Session->>Session: Init(DataNodeRole)
Session->>Etcd: 预注册节点
DN->>SyncMgr: NewSyncManager()
DN->>DN: importTaskMgr = NewTaskManager()
DN->>ImportSched: NewScheduler(importTaskMgr)
DN->>DN: InitSegcore(serverID)
DN-->>Main: nil (成功)
deactivate DN
Main->>DN: Start()
activate DN
DN->>CompactExec: Start(ctx)
Note over CompactExec: 启动后台goroutine<br/>监听taskCh
DN->>ImportSched: Start()
Note over ImportSched: 启动导入调度器
DN->>TaskSched: Start()
Note over TaskSched: 启动索引构建loop
DN->>DN: UpdateStateCode(Healthy)
DN-->>Main: nil (成功)
deactivate DN
Main->>DN: Register()
activate DN
DN->>Session: Register()
Session->>Etcd: 正式注册节点信息
Session->>Session: LivenessCheck(ctx)
Note over Session: 启动心跳goroutine<br/>定期更新TTL
DN-->>Main: nil (成功)
deactivate DN
启动流程说明
第一阶段:NewDataNode - 创建实例
func NewDataNode(ctx context.Context) *DataNode {
ctx2, cancel2 := context.WithCancel(ctx)
node := &DataNode{
ctx: ctx2,
cancel: cancel2,
Role: typeutil.DataNodeRole,
lifetime: lifetime.NewLifetime(commonpb.StateCode_Abnormal),
compactionExecutor: compactor.NewExecutor(),
reportImportRetryTimes: 10,
metricsRequest: metricsinfo.NewMetricsRequest(),
totalSlot: index.CalculateNodeSlots(), // 根据CPU和内存计算
}
sc := index.NewTaskScheduler(ctx2)
node.storageFactory = NewChunkMgrFactory()
node.taskScheduler = sc
node.taskManager = index.NewTaskManager(ctx2)
node.UpdateStateCode(commonpb.StateCode_Abnormal) // 初始为Abnormal
return node
}
第二阶段:Init - 初始化组件
func (node *DataNode) Init() error {
node.initOnce.Do(func() {
// 1. 注册Metrics处理器
node.registerMetricsRequest()
// 2. 初始化Session(Etcd注册)
if err := node.initSession(); err != nil {
initError = err
return
}
serverID := node.GetNodeID()
// 3. 创建SyncManager(数据同步管理器)
syncMgr := syncmgr.NewSyncManager(nil)
node.syncMgr = syncMgr
// 4. 创建ImportTaskManager和Scheduler
node.importTaskMgr = importv2.NewTaskManager()
node.importScheduler = importv2.NewScheduler(node.importTaskMgr)
// 5. 初始化Segcore(C++索引引擎)
err := index.InitSegcore(serverID)
if err != nil {
initError = err
}
})
return initError
}
第三阶段:Start - 启动后台服务
func (node *DataNode) Start() error {
node.startOnce.Do(func() {
// 1. 启动Compaction执行器
go node.compactionExecutor.Start(node.ctx)
// 2. 启动Import调度器
go node.importScheduler.Start()
// 3. 启动Task调度器(索引构建)
err := node.taskScheduler.Start()
if err != nil {
startErr = err
return
}
// 4. 更新状态为Healthy
node.UpdateStateCode(commonpb.StateCode_Healthy)
})
return startErr
}
第四阶段:Register - 注册到Etcd
func (node *DataNode) Register() error {
// 正式注册到Etcd
node.session.Register()
// 启动心跳检测
node.session.LivenessCheck(node.ctx, func() {
log.Error("Data Node disconnected from etcd, process will exit")
os.Exit(1) // 心跳失败则退出进程
})
return nil
}
关键设计点
- 分阶段启动:Init阶段准备资源,Start阶段启动后台任务,Register阶段注册到集群
- Once保护:使用
sync.Once确保Init/Start/Stop只执行一次 - Lifetime管理:通过
Lifetime管理状态码(Abnormal→Healthy→Stopping→Stopped) - 心跳机制:心跳失败直接退出进程,避免脑裂
- 优雅关闭:Stop先设置状态为Abnormal,等待所有请求完成后再清理资源
2.2 SyncManager数据同步流程
DataNode v2.6.0之后采用WriteBuffer + SyncManager架构,不再使用FlowGraph,数据同步由SyncManager统一管理。
sequenceDiagram
autonumber
participant WB as WriteBuffer
participant SM as SyncManager
participant ST as SyncTask
participant Writer as BinlogWriter
participant CM as ChunkManager
participant OS as Object Storage
participant MC as MetaCache
Note over WB: 数据累积触发Sync
WB->>WB: getSegmentsToSync(policies)
WB->>WB: getSyncTask(segmentID)
WB->>WB: yieldBuffer(segmentID)
Note over WB: 生成InsertData、DeleteData<br/>Schema、TimeRange
WB->>SM: SyncData(ctx, syncTask, callbacks)
SM->>SM: safeSubmitTask(segmentID, task)
Note over SM: 按SegmentID分配Worker<br/>保证同Segment串行
SM->>ST: Submit to WorkerPool
activate ST
ST->>MC: GetSegmentByID(segmentID)
MC-->>ST: SegmentInfo
alt StorageV2
ST->>Writer: NewBulkPackWriterV2()
Writer->>Writer: 创建BinlogRecordWriter
else StorageV1
ST->>Writer: NewBulkPackWriter()
end
loop 写入每个字段
ST->>Writer: Write(insertData)
Writer->>Writer: Serialize字段数据
Writer->>Writer: 累积到Chunk
alt Chunk满
Writer->>Writer: FlushChunk()
Writer->>CM: Upload(binlog_path, data)
CM->>OS: PutObject()
end
end
ST->>Writer: WriteDelta(deleteData)
Writer->>CM: Upload(deltalog_path, deletes)
ST->>Writer: WriteStats(pkStats, bm25Stats)
Writer->>CM: Upload(statslog_path, stats)
ST->>MC: UpdateSegments(FinishSyncing)
alt isFlush
ST->>MC: UpdateState(Flushed)
end
ST-->>SM: Success
deactivate ST
SM->>WB: callback(nil)
WB->>WB: syncCheckpoint.Remove(segmentID)
关键代码:WriteBuffer触发Sync
// syncSegments 触发多个Segment的同步
func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) []*conc.Future[struct{}] {
result := make([]*conc.Future[struct{}], 0, len(segmentIDs))
for _, segmentID := range segmentIDs {
// 1. 构造SyncTask
syncTask, err := wb.getSyncTask(ctx, segmentID)
if err != nil {
if errors.Is(err, merr.ErrSegmentNotFound) {
log.Warn("segment not found in meta", zap.Int64("segmentID", segmentID))
continue
} else {
log.Fatal("failed to get sync task", zap.Int64("segmentID", segmentID), zap.Error(err))
}
}
// 2. 提交到SyncManager
future, err := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
if err != nil {
return err
}
// 3. 成功后清理Checkpoint
if syncTask.StartPosition() != nil {
wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp())
}
// 4. 如果是Flush,从MetaCache移除
if syncTask.IsFlush() {
wb.metaCache.RemoveSegments(metacache.WithSegmentIDs(syncTask.SegmentID()))
log.Info("flushed segment removed", zap.Int64("segmentID", syncTask.SegmentID()))
}
return nil
})
if err != nil {
log.Fatal("failed to sync data", zap.Int64("segmentID", segmentID), zap.Error(err))
}
result = append(result, future)
}
return result
}
// getSyncTask 从WriteBuffer构造SyncTask
func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (syncmgr.Task, error) {
segmentInfo, ok := wb.metaCache.GetSegmentByID(segmentID)
if !ok {
return nil, merr.WrapErrSegmentNotFound(segmentID)
}
// Yield数据(从Buffer取出并清空)
insert, bm25, delta, schema, timeRange, startPos := wb.yieldBuffer(segmentID)
if startPos != nil {
wb.syncCheckpoint.Add(segmentID, startPos, "syncing task")
}
// 构造SyncPack
pack := &syncmgr.SyncPack{}
pack.WithInsertData(insert).
WithDeleteData(delta).
WithCollectionID(wb.collectionID).
WithPartitionID(segmentInfo.PartitionID()).
WithChannelName(wb.channelName).
WithSegmentID(segmentID).
WithStartPosition(startPos).
WithTimeRange(tsFrom, tsTo).
WithLevel(segmentInfo.Level()).
WithBatchRows(batchSize).
WithErrorHandler(wb.errHandler)
if len(bm25) != 0 {
pack.WithBM25Stats(bm25)
}
// 判断是否Flush
if segmentInfo.State() == commonpb.SegmentState_Flushing ||
segmentInfo.Level() == datapb.SegmentLevel_L0 {
pack.WithFlush()
}
return syncmgr.NewSyncTask(pack), nil
}
关键代码:SyncTask执行
func (t *SyncTask) Run(ctx context.Context) (err error) {
t.tr = timerecord.NewTimeRecorder("syncTask")
segmentInfo, has := t.metacache.GetSegmentByID(t.segmentID)
if !has {
if t.pack.isDrop {
log.Info("segment dropped, discard sync task")
return nil
}
log.Warn("segment not found in metacache")
return nil
}
columnGroups := t.getColumnGroups(segmentInfo)
// 根据StorageVersion选择Writer
switch segmentInfo.GetStorageVersion() {
case storage.StorageV2:
writer := NewBulkPackWriterV2(t.metacache, t.schema, t.chunkManager,
t.allocator, 0, packed.DefaultMultiPartUploadSize, t.storageConfig,
columnGroups, t.writeRetryOpts...)
t.insertBinlogs, t.deltaBinlog, t.statsBinlogs, t.bm25Binlogs, t.flushedSize, err =
writer.Write(ctx, t.pack)
if err != nil {
log.Warn("failed to write sync data with storage v2 format", zap.Error(err))
return err
}
default:
writer := NewBulkPackWriter(t.metacache, t.schema, t.chunkManager,
t.allocator, t.writeRetryOpts...)
t.insertBinlogs, t.deltaBinlog, t.statsBinlogs, t.bm25Binlogs, t.flushedSize, err =
writer.Write(ctx, t.pack)
if err != nil {
log.Warn("failed to write sync data", zap.Error(err))
return err
}
}
// 更新Metrics
metrics.DataNodeWriteDataCount.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()), t.dataSource,
metrics.InsertLabel, fmt.Sprint(t.collectionID)).Add(float64(t.batchRows))
// 写入Meta(调用StreamingNode SaveBinlog)
if t.metaWriter != nil {
err = t.writeMeta(ctx)
if err != nil {
log.Warn("failed to save serialized data into storage", zap.Error(err))
return err
}
}
// 释放数据
t.pack.ReleaseData()
// 更新MetaCache
actions := []metacache.SegmentAction{metacache.FinishSyncing(t.batchRows)}
if t.pack.isFlush {
actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Flushed))
}
t.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...),
metacache.WithSegmentIDs(t.segmentID))
return nil
}
Binlog存储格式
StorageV1格式:
{collection_id}/{partition_id}/{segment_id}/{field_id}/
├── 0_insert.binlog # Chunk 0
├── 1_insert.binlog # Chunk 1
└── ...
{collection_id}/{partition_id}/{segment_id}/delta/
└── delta.binlog
{collection_id}/{partition_id}/{segment_id}/stats/
├── stats.binlog # PK统计信息
└── bm25_stats.binlog # BM25统计信息
StorageV2格式(列式存储):
{collection_id}/{partition_id}/{segment_id}/
├── record.parquet # 主数据文件(Parquet格式)
├── delta.parquet # Delete数据
└── stats.json # 统计信息
Sync触发策略
WriteBuffer使用多种策略触发Sync:
-
SyncPolicyV1(行数或大小)
- 累积行数 >=
insertBufSize(默认16MB对应的行数) - 或 Buffer大小 >=
insertBufSize
- 累积行数 >=
-
TimedFlushPolicy(超时)
- 数据停留时间 >=
flushDuration(默认10秒)
- 数据停留时间 >=
-
FlushNotifyPolicy(显式Flush)
- MetaCache中Segment状态变为
Flushing
- MetaCache中Segment状态变为
-
DropPartitionPolicy(分区删除)
- 分区被删除,强制Flush所有数据
2.3 Compaction执行流程
Compaction是DataNode的核心功能之一,支持4种类型:L0Delete、Mix、Clustering、Sort。
2.3.1 Compaction调度流程
sequenceDiagram
autonumber
participant DC as DataCoord
participant DN as DataNode
participant CE as CompactionExecutor
participant Task as CompactionTask
participant Pool as CompactionPool
participant OS as Object Storage
DC->>DN: CompactionV2(CompactionPlan)
DN->>DN: 检查健康状态
DN->>DN: 解析CompactionParams
DN->>DN: 创建ChunkManager
alt L0DeleteCompaction
DN->>Task: NewLevelZeroCompactionTask()
else MixCompaction
DN->>Task: NewMixCompactionTask()
else ClusteringCompaction
DN->>Task: NewClusteringCompactionTask()
else SortCompaction
DN->>Task: NewSortCompactionTask()
end
DN->>CE: Enqueue(task)
CE->>CE: 检查Slot可用性
CE->>CE: 计算Slot占用
CE->>CE: taskCh <- task
DN-->>DC: Status{Success}
Note over CE: 后台goroutine
CE->>Pool: Submit(executeTask)
activate Task
Pool->>Task: Compact()
Task->>OS: 读取源Segment Binlog
Task->>Task: 执行Compaction逻辑
Task->>OS: 写入新Segment Binlog
Task-->>Pool: CompactionPlanResult
deactivate Task
Pool->>CE: completed.Insert(result)
CE->>CE: 释放Slot
DC->>DN: GetCompactionState(planID)
DN->>CE: GetResults(planID)
CE-->>DN: CompactionPlanResult
DN-->>DC: CompactionStateResponse
2.3.2 MixCompaction详细流程
MixCompaction合并多个小Segment并清理已删除数据。
sequenceDiagram
autonumber
participant Task as MixCompactionTask
participant Reader as BinlogReader
participant DM as DeltaMerger
participant Writer as SegmentWriter
participant OS as Object Storage
Task->>Task: init()
Task->>Task: 加载Schema和PKField
loop 每个输入Segment
Task->>Reader: LoadBinlogs(segmentID)
Reader->>OS: Download Binlog
Reader->>Reader: DecompressBinlog
Reader-->>Task: InsertData
Task->>Reader: LoadDeltaBinlogs(segmentID)
Reader->>OS: Download DeltaLog
Reader-->>Task: DeleteData
end
Task->>DM: Merge所有DeleteData
DM->>DM: 构建PK -> Timestamp映射
Task->>Task: 分配新SegmentID
Task->>Writer: NewMultiSegmentWriter()
loop 读取每行数据
Task->>Task: 获取PK
Task->>DM: IsDeleted(pk, ts)
alt 未删除
Task->>Writer: Write(value)
alt Writer满
Writer->>Writer: rotateWriter()
Writer->>OS: Upload Binlog
Task->>Task: 分配新SegmentID
end
else 已删除
Task->>Task: 跳过该行
end
end
Task->>Writer: Close()
Writer->>OS: Upload最后一批Binlog
Task->>Task: 构造CompactionPlanResult
Task-->>Task: segments、binlogs、statslogs
关键代码:CompactionExecutor
// executor 管理Compaction任务队列和并发控制
type executor struct {
executing *typeutil.ConcurrentMap[int64, Compactor]
completedCompactor *typeutil.ConcurrentMap[int64, Compactor]
completed *typeutil.ConcurrentMap[int64, *datapb.CompactionPlanResult]
taskCh chan Compactor
slotMu sync.RWMutex
usingSlots int64
}
// Enqueue 将任务加入队列
func (e *executor) Enqueue(task Compactor) (bool, error) {
e.slotMu.Lock()
defer e.slotMu.Unlock()
newSlotUsage := task.GetSlotUsage()
// 兼容旧版本,计算默认Slot占用
if task.GetSlotUsage() <= 0 {
switch task.GetCompactionType() {
case datapb.CompactionType_ClusteringCompaction:
newSlotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_MixCompaction:
newSlotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_Level0DeleteCompaction:
newSlotUsage = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64()
}
log.Warn("illegal task slot usage, change it to a default value",
zap.Int64("newSlotUsage", newSlotUsage))
}
e.usingSlots = e.usingSlots + newSlotUsage
_, ok := e.executing.GetOrInsert(task.GetPlanID(), task)
if ok {
return false, merr.WrapErrDuplicatedCompactionTask()
}
e.taskCh <- task // 加入Channel
return true, nil
}
// Start 启动后台执行循环
func (e *executor) Start(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case task := <-e.taskCh:
// 提交到Worker Pool执行
GetExecPool().Submit(func() (any, error) {
e.executeTask(task)
return nil, nil
})
}
}
}
// executeTask 执行单个Compaction任务
func (e *executor) executeTask(task Compactor) {
log := log.With(
zap.Int64("planID", task.GetPlanID()),
zap.Int64("Collection", task.GetCollection()),
zap.String("channel", task.GetChannelName()),
)
defer func() {
e.toCompleteState(task) // 完成后释放Slot
}()
log.Info("start to execute compaction")
// 调用任务的Compact方法
result, err := task.Compact()
if err != nil {
log.Warn("compaction task failed", zap.Error(err))
return
}
// 保存结果
e.completed.Insert(result.GetPlanID(), result)
e.completedCompactor.Insert(result.GetPlanID(), task)
// 更新Metrics
var entityCount int64
var deleteCount int64
lo.ForEach(result.Segments, func(seg *datapb.CompactionSegment, _ int) {
entityCount += seg.GetNumOfRows()
deleteCount += getDataCount(seg.GetDeltalogs())
})
metrics.DataNodeWriteDataCount.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
metrics.CompactionDataSourceLabel,
metrics.InsertLabel,
fmt.Sprint(task.GetCollection())).Add(float64(entityCount))
log.Info("end to execute compaction")
}
关键代码:MixCompaction执行
func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, "MixCompaction")
defer span.End()
// 1. 加载所有Segment的Binlog
allValues, err := t.loadSegments(ctx)
if err != nil {
return nil, err
}
// 2. 合并所有Delete数据,构建PK->Timestamp映射
deltaMerger := storage.NewDeltaMerge(t.primaryKeyIDs)
for _, segment := range t.plan.SegmentBinlogs {
deltaData := t.loadDeltaLogs(ctx, segment)
deltaMerger.Add(deltaData)
}
// 3. 创建MultiSegmentWriter(支持自动分段)
writer := compactor.NewMultiSegmentWriter(
t.collectionID, t.partitionID, t.plan.GetSchema(),
t.segmentIDAlloc, t.logIDAlloc, t.binlogIO,
maxRows, binLogMaxSize, storageVersion,
)
// 4. 迭代所有数据,过滤已删除行
for _, value := range allValues {
pk := value.PK
ts := value.Timestamp
// 检查是否被删除
if deltaMerger.IsDeleted(pk, ts) {
continue // 跳过已删除行
}
// 写入数据
err = writer.WriteValue(value)
if err != nil {
return nil, err
}
// Writer内部会自动判断是否需要Rotate(创建新Segment)
}
// 5. 关闭Writer,上传最后一批数据
err = writer.Close()
if err != nil {
return nil, err
}
// 6. 构造CompactionPlanResult
segments := writer.GetCompactionSegments()
planResult := &datapb.CompactionPlanResult{
PlanID: t.plan.PlanID,
State: datapb.CompactionTaskState_completed,
Segments: segments,
Type: t.plan.Type,
}
return planResult, nil
}
2.3.3 L0DeleteCompaction流程
L0DeleteCompaction将L0 Segment的Delete数据应用到Sealed Segment。
sequenceDiagram
autonumber
participant Task as L0CompactionTask
participant Reader as BinlogReader
participant Writer as DeltaWriter
participant OS as Object Storage
Task->>Task: init()
Note over Task: 获取L0 Segments和<br/>目标Sealed Segments
loop 每个L0 Segment
Task->>Reader: LoadDeltaBinlogs(l0SegmentID)
Reader->>OS: Download DeltaLog
Reader-->>Task: DeleteData (PK list)
end
Task->>Task: 按PK分组DeleteData
loop 每个目标Segment
Task->>Writer: NewSegmentDeltaWriter(segmentID)
Task->>Task: 筛选属于该Segment的PK
Task->>Writer: WriteBatch(pks, timestamps)
Writer->>OS: Upload DeltaLog
Writer-->>Task: DeltaBinlog paths
end
Task->>Task: 构造CompactionPlanResult
Task-->>Task: 返回每个Segment的新DeltaLog路径
2.3.4 ClusteringCompaction流程
ClusteringCompaction按Clustering Key重新组织数据,优化范围查询性能。
流程概览:
- Analyze阶段:分析Clustering Key分布,计算分区边界(Scalar)或聚类中心(Vector)
- Mapping阶段:根据分区边界将数据分配到不同Buffer
- Flush阶段:将每个Buffer写入独立的Segment
适用场景:
- Scalar Clustering Key:时间戳、地理位置等范围查询频繁的字段
- Vector Clustering Key:相似度搜索性能优化
2.4 索引构建流程
DataNode通过TaskScheduler调度索引构建任务,支持向量索引、标量索引和统计信息收集。
sequenceDiagram
autonumber
participant DC as DataCoord
participant DN as DataNode
participant TS as TaskScheduler
participant TM as TaskManager
participant IB as IndexBuildTask
participant Knowhere as Knowhere/Segcore
participant OS as Object Storage
DC->>DN: CreateTask(IndexTask)
DN->>DN: createIndexTask(req)
DN->>TM: LoadOrStoreIndexTask(taskCtx, req)
alt 任务已存在
TM-->>DN: 已有任务
DN-->>DC: Status{Duplicate}
else 新任务
TM->>TM: 保存任务信息
DN->>TS: AddTask(indexBuildTask)
TS->>TS: utChan <- task
DN-->>DC: Status{Success}
end
Note over TS: 后台indexBuildLoop
TS->>TS: PopUnissuedTask()
TS->>TS: 检查Slot可用性
alt Slot充足
TS->>IB: processTask(task)
activate IB
IB->>IB: PreExecute()
IB->>OS: 下载Binlog数据
IB->>IB: 解压Binlog
IB->>IB: Execute()
IB->>Knowhere: CreateIndex(buildParams)
Note over Knowhere: C++索引引擎<br/>HNSW/IVF_FLAT/DiskANN
Knowhere->>Knowhere: 构建索引
Knowhere-->>IB: IndexHandle
IB->>IB: PostExecute()
IB->>Knowhere: index.UpLoad()
Knowhere->>OS: 上传索引文件
IB->>TM: StoreIndexFilesAndStatistic()
IB->>IB: SetState(Finished)
deactivate IB
else Slot不足
TS->>TS: sleep(50ms)
TS->>TS: 重新检查
end
DC->>DN: QueryTask(IndexTask)
DN->>TM: GetIndexTaskInfo(taskID)
TM-->>DN: JobState, FileKeys, Size
DN-->>DC: QueryTaskResponse
关键代码:TaskScheduler
type TaskScheduler struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
TaskQueue TaskQueue // 管理任务队列和Slot
}
// indexBuildLoop 后台索引构建循环
func (sched *TaskScheduler) indexBuildLoop() {
log.Ctx(sched.ctx).Debug("TaskScheduler start build loop ...")
defer sched.wg.Done()
for {
select {
case <-sched.ctx.Done():
return
case <-sched.TaskQueue.utChan(): // 有新任务
t := sched.TaskQueue.PopUnissuedTask()
// 等待Slot可用
for {
totalSlot := CalculateNodeSlots()
availableSlot := totalSlot - sched.TaskQueue.GetActiveSlot()
if availableSlot >= t.GetSlot() || totalSlot == availableSlot {
// Slot充足,启动goroutine执行
go func(t Task) {
sched.processTask(t, sched.TaskQueue)
}(t)
break
}
time.Sleep(time.Millisecond * 50) // Slot不足,等待
}
}
}
}
// processTask 执行单个任务(索引/Stats/Analyze)
func (sched *TaskScheduler) processTask(t Task, q TaskQueue) {
defer func() {
t.Reset()
debug.FreeOSMemory() // 强制GC释放内存
}()
sched.TaskQueue.AddActiveTask(t)
defer sched.TaskQueue.PopActiveTask(t.Name())
log.Ctx(t.Ctx()).Debug("process task", zap.String("task", t.Name()))
// 依次执行三个阶段
pipelines := []func(context.Context) error{t.PreExecute, t.Execute, t.PostExecute}
for _, fn := range pipelines {
if err := fn(t.Ctx()); err != nil {
log.Ctx(t.Ctx()).Warn("process task failed", zap.Error(err))
t.SetState(getStateFromError(err), err.Error())
return
}
}
t.SetState(indexpb.JobState_JobStateFinished, "")
}
关键代码:IndexBuildTask执行
// PreExecute 下载Binlog并准备数据
func (it *indexBuildTask) PreExecute(ctx context.Context) error {
// 1. 下载Insert Binlog
err := binlog.DecompressBinLogWithRootPath(
it.req.GetStorageConfig().GetRootPath(),
storage.InsertBinlog,
it.req.GetCollectionID(),
it.req.GetPartitionID(),
it.req.GetSegmentID(),
it.req.GetDataPaths(),
)
if err != nil {
return err
}
// 2. 下载Stats Binlog(包含PK统计信息)
err = binlog.DecompressBinLogWithRootPath(
it.req.GetStorageConfig().GetRootPath(),
storage.StatsBinlog,
it.req.GetCollectionID(),
it.req.GetPartitionID(),
it.req.GetSegmentID(),
it.req.GetStatsLogs(),
)
if err != nil {
return err
}
return nil
}
// Execute 调用Knowhere构建索引
func (it *indexBuildTask) Execute(ctx context.Context) error {
// 构造BuildIndexParams
buildIndexParams := &indexcgowrapper.BuildIndexParams{
IndexType: it.req.GetIndexType(),
MetricType: it.req.GetMetricType(),
FieldSchema: it.fieldSchema,
InsertFiles: it.req.GetDataPaths(),
Dim: it.req.GetDim(),
NumRows: it.req.GetNumRows(),
StorageConfig: it.req.GetStorageConfig(),
IndexParams: it.req.GetIndexParams(),
TypeParams: it.req.GetTypeParams(),
}
// 调用C++ Knowhere构建索引
it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexParams)
if err != nil {
if it.index != nil && it.index.CleanLocalData() != nil {
log.Warn("failed to clean cached data on disk after build index failed")
}
return err
}
buildIndexLatency := it.tr.RecordSpan()
metrics.DataNodeKnowhereBuildIndexLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(buildIndexLatency.Seconds())
return nil
}
// PostExecute 上传索引文件
func (it *indexBuildTask) PostExecute(ctx context.Context) error {
// 1. 上传索引到对象存储
indexStats, err := it.index.UpLoad()
if err != nil {
it.index.Delete() // 清理本地缓存
return err
}
encodeIndexFileDur := it.tr.Record("index serialize and upload done")
metrics.DataNodeEncodeIndexFileLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(encodeIndexFileDur.Seconds())
// 2. 提前释放索引(GC)
it.index.Delete()
// 3. 记录索引文件信息到TaskManager
var serializedSize uint64
saveFileKeys := make([]string, 0)
for _, indexInfo := range indexStats.GetSerializedIndexInfos() {
serializedSize += uint64(indexInfo.FileSize)
parts := strings.Split(indexInfo.FileName, "/")
fileKey := parts[len(parts)-1]
saveFileKeys = append(saveFileKeys, fileKey)
}
it.manager.StoreIndexFilesAndStatistic(
it.req.GetClusterID(),
it.req.GetBuildID(),
saveFileKeys,
serializedSize,
uint64(indexStats.MemSize),
it.req.GetCurrentIndexVersion(),
it.req.GetCurrentScalarIndexVersion(),
)
return nil
}
Slot计算
// CalculateNodeSlots 根据CPU和内存计算可用Slot数
func CalculateNodeSlots() int64 {
cpuCores := hardware.GetCPUNum()
memGB := hardware.GetMemoryCount() / 1024 / 1024 / 1024
// 每个Slot需要至少4GB内存和2个CPU核心
slotByCPU := cpuCores / 2
slotByMem := memGB / 4
// 取较小值
totalSlot := slotByCPU
if slotByMem < slotByCPU {
totalSlot = slotByMem
}
// 至少保留1个Slot
if totalSlot < 1 {
totalSlot = 1
}
return int64(totalSlot)
}
2.5 数据导入流程
DataNode的ImportV2支持两阶段导入:PreImport(验证)和Import(执行)。
sequenceDiagram
autonumber
participant DC as DataCoord
participant DN as DataNode
participant IS as ImportScheduler
participant PreTask as PreImportTask
participant ImpTask as ImportTask
participant SM as SyncManager
participant OS as Object Storage
Note over DC,DN: 第一阶段:PreImport
DC->>DN: CreateTask(PreImportTask)
DN->>PreTask: NewPreImportTask(req)
DN->>IS: Add(preTask)
DN-->>DC: Status{Success}
IS->>PreTask: Execute()
activate PreTask
PreTask->>OS: 读取导入文件
PreTask->>PreTask: 解析文件格式(Parquet/JSON/CSV)
PreTask->>PreTask: 验证Schema匹配
PreTask->>PreTask: 统计行数和大小
PreTask->>PreTask: SetState(Completed)
deactivate PreTask
DC->>DN: QueryTask(PreImportTask)
DN->>DN: importTaskMgr.Get(taskID)
DN-->>DC: FileStats、State、Reason
Note over DC,DN: 第二阶段:Import
DC->>DN: CreateTask(ImportTask)
DN->>ImpTask: NewImportTask(req)
DN->>IS: Add(impTask)
DN-->>DC: Status{Success}
IS->>ImpTask: Execute()
activate ImpTask
ImpTask->>OS: 读取导入文件
loop 每个VChannel
ImpTask->>ImpTask: 按Hash分配数据到Segment
ImpTask->>ImpTask: 构造InsertData
ImpTask->>SM: SyncData(syncTask)
SM->>OS: 写入Binlog
end
ImpTask->>ImpTask: SetState(Completed)
deactivate ImpTask
DC->>DN: QueryTask(ImportTask)
DN-->>DC: SegmentsInfo、State、Reason
关键代码:ImportTask执行
func (t *ImportTask) Execute() error {
ctx := t.ctx
// 1. 创建Reader(根据文件格式)
reader, err := importutilv2.NewReader(ctx, t.req, t.chunkManager)
if err != nil {
return err
}
defer reader.Close()
// 2. 为每个VChannel创建HashManager
hashers := make(map[string]*importutilv2.HashManager)
for _, vchannel := range t.req.GetVchannels() {
hasher := importutilv2.NewHashManager(vchannel, t.req.GetRequestSegments())
hashers[vchannel] = hasher
}
// 3. 读取数据并分配到Segment
for {
data, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return err
}
// 按主键Hash分配到VChannel和Segment
for vchannel, hasher := range hashers {
segmentData := hasher.Distribute(data)
// 为每个Segment构造SyncTask
for segmentID, insertData := range segmentData {
syncTask := t.buildSyncTask(vchannel, segmentID, insertData)
_, err := t.syncMgr.SyncData(ctx, syncTask)
if err != nil {
return err
}
}
}
}
// 4. Flush所有Segment
for vchannel, hasher := range hashers {
segments := hasher.GetSegments()
for _, segmentID := range segments {
syncTask := t.buildSyncTask(vchannel, segmentID, nil)
syncTask.WithFlush() // 标记为Flush
_, err := t.syncMgr.SyncData(ctx, syncTask)
if err != nil {
return err
}
}
}
return nil
}
3. 关键设计点
3.1 SyncManager按SegmentID串行化设计
问题:多个SyncTask同时操作同一Segment会导致数据竞争
解决方案:SyncManager按SegmentID分配Worker,保证同一Segment的Task串行执行
type syncManager struct {
workerPool *conc.Pool[struct{}]
keyLock *typeutil.KeyLock[int64] // 按SegmentID加锁
tasks *typeutil.ConcurrentMap[string, Task]
}
func (mgr *syncManager) submit(ctx context.Context, key int64, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
handler := func(err error) error {
taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp())
defer mgr.tasks.Remove(taskKey)
if err == nil {
return nil
}
task.HandleError(err)
return err
}
callbacks = append([]func(error) error{handler}, callbacks...)
// 按SegmentID提交到WorkerPool,相同SegmentID的Task串行执行
return mgr.Submit(ctx, key, task, callbacks...)
}
3.2 CompactionExecutor Slot管理设计
问题:Compaction任务资源占用大,需要限制并发数
解决方案:通过Slot机制控制并发,不同类型任务占用不同Slot数
type executor struct {
executing *typeutil.ConcurrentMap[int64, Compactor]
completedCompactor *typeutil.ConcurrentMap[int64, Compactor]
completed *typeutil.ConcurrentMap[int64, *datapb.CompactionPlanResult]
taskCh chan Compactor
slotMu sync.RWMutex
usingSlots int64 // 当前占用的Slot数
}
// Slot占用配置(可配置)
const (
ClusteringCompactionSlot = 8 // Clustering最耗资源
MixCompactionSlot = 2
L0DeleteCompactionSlot = 1
)
3.3 TaskScheduler三阶段执行模型
设计目标:清晰分离任务生命周期,便于错误处理和资源清理
// 三阶段:PreExecute → Execute → PostExecute
pipelines := []func(context.Context) error{
t.PreExecute, // 准备:下载数据、验证参数
t.Execute, // 执行:核心逻辑(索引构建/统计计算)
t.PostExecute, // 清理:上传结果、释放资源
}
for _, fn := range pipelines {
if err := fn(t.Ctx()); err != nil {
// 记录失败状态
t.SetState(getStateFromError(err), err.Error())
return
}
}
t.SetState(indexpb.JobState_JobStateFinished, "")
3.4 MultiSegmentWriter自动分段设计
问题:Compaction输出Segment大小不可预测,可能超过配置限制
解决方案:Writer内部自动Rotate,创建多个Segment
type MultiSegmentWriter struct {
writer *storage.BinlogValueWriter
currentSegmentID int64
maxRows int64
segmentSize int64
res []*datapb.CompactionSegment
}
func (w *MultiSegmentWriter) WriteValue(v *storage.Value) error {
// 检查是否需要Rotate
if w.writer == nil || w.writer.GetWrittenUncompressed() >= uint64(w.segmentSize) {
if err := w.rotateWriter(); err != nil {
return err
}
}
return w.writer.WriteValue(v)
}
func (w *MultiSegmentWriter) rotateWriter() error {
// 1. 关闭当前Writer,上传Binlog
if err := w.closeWriter(); err != nil {
return err
}
// 2. 分配新SegmentID
newSegmentID, err := w.allocator.allocSegmentID()
if err != nil {
return err
}
w.currentSegmentID = newSegmentID
// 3. 创建新Writer
rw, err := storage.NewBinlogRecordWriter(w.ctx, w.collectionID, w.partitionID,
newSegmentID, w.schema, w.allocator.logIDAlloc, chunkSize, w.maxRows, w.rwOption...)
if err != nil {
return err
}
w.writer = storage.NewBinlogValueWriter(rw, w.batchSize)
return nil
}
4. 关键RPC接口规格
4.1 CompactionV2 - Compaction任务下发
接口定义:
rpc CompactionV2(CompactionPlan) returns (common.Status) {}
message CompactionPlan {
int64 planID = 1;
repeated CompactionSegmentBinlogs segmentBinlogs = 2;
int64 timeoutInSeconds = 3;
CompactionType type = 4;
int64 channel = 5;
int64 collectionID = 6;
int64 partitionID = 7;
schema.CollectionSchema schema = 8;
int64 beginLogID = 9;
IDRange preAllocatedLogIDs = 10;
IDRange preAllocatedSegmentIDs = 11;
int64 slotUsage = 12;
string jsonParams = 13;
}
调用链路:
DataCoord → DataNode.CompactionV2() → CompactionExecutor.Enqueue()
关键代码:
func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) {
// 1. 健康检查
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return merr.Status(err), nil
}
// 2. 参数验证
if len(req.GetSegmentBinlogs()) == 0 {
return merr.Success(), nil
}
if req.GetBeginLogID() == 0 {
return merr.Status(merr.WrapErrParameterInvalidMsg("invalid beginLogID")), nil
}
// 3. 创建ChunkManager
taskCtx := tracer.Propagate(ctx, node.ctx)
compactionParams, err := compaction.ParseParamsFromJSON(req.GetJsonParams())
cm, err := node.storageFactory.NewChunkManager(node.ctx, compactionParams.StorageConfig)
// 4. 根据类型创建Compactor
var task compactor.Compactor
binlogIO := io.NewBinlogIO(cm)
switch req.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
task = compactor.NewLevelZeroCompactionTask(taskCtx, binlogIO, cm, req, compactionParams)
case datapb.CompactionType_MixCompaction:
task = compactor.NewMixCompactionTask(taskCtx, binlogIO, req, compactionParams, pkFieldIDs)
case datapb.CompactionType_ClusteringCompaction:
task = compactor.NewClusteringCompactionTask(taskCtx, binlogIO, req, compactionParams)
case datapb.CompactionType_SortCompaction:
task = compactor.NewSortCompactionTask(taskCtx, binlogIO, req, compactionParams, pkFieldIDs)
}
// 5. 加入执行队列
succeed, err := node.compactionExecutor.Enqueue(task)
if succeed {
return merr.Success(), nil
} else {
return merr.Status(err), nil
}
}
错误处理:
ErrParameterInvalid:参数校验失败ErrDuplicatedCompactionTask:重复的PlanIDErrInsufficientSlots:Slot不足(返回成功但任务排队)
4.2 CreateTask - 统一任务创建接口
接口定义:
rpc CreateTask(CreateTaskRequest) returns (common.Status) {}
message CreateTaskRequest {
map<string, string> properties = 1; // 任务属性(类型、ID等)
bytes payload = 2; // 任务详细参数(Protobuf序列化)
}
支持的任务类型:
taskcommon.PreImport:预导入验证taskcommon.Import:批量导入taskcommon.Compaction:Compaction任务taskcommon.Index:索引构建taskcommon.Stats:统计信息收集taskcommon.Analyze:数据分析
调用链路:
DataCoord → DataNode.CreateTask() →
- importTaskMgr.Add() (PreImport/Import)
- compactionExecutor.Enqueue() (Compaction)
- taskScheduler.AddTask() (Index/Stats/Analyze)
关键代码:
func (node *DataNode) CreateTask(ctx context.Context, request *workerpb.CreateTaskRequest) (*commonpb.Status, error) {
properties := taskcommon.NewProperties(request.GetProperties())
taskType, err := properties.GetTaskType()
switch taskType {
case taskcommon.PreImport:
req := &datapb.PreImportRequest{}
proto.Unmarshal(request.GetPayload(), req)
return node.PreImport(ctx, req)
case taskcommon.Import:
req := &datapb.ImportRequest{}
proto.Unmarshal(request.GetPayload(), req)
return node.ImportV2(ctx, req)
case taskcommon.Compaction:
req := &datapb.CompactionPlan{}
proto.Unmarshal(request.GetPayload(), req)
return node.CompactionV2(ctx, req)
case taskcommon.Index:
req := &workerpb.CreateJobRequest{}
proto.Unmarshal(request.GetPayload(), req)
return node.createIndexTask(ctx, req)
case taskcommon.Stats:
req := &workerpb.CreateStatsRequest{}
proto.Unmarshal(request.GetPayload(), req)
return node.createStatsTask(ctx, req)
case taskcommon.Analyze:
req := &workerpb.AnalyzeRequest{}
proto.Unmarshal(request.GetPayload(), req)
return node.createAnalyzeTask(ctx, req)
}
}
4.3 QuerySlot - 资源查询
接口定义:
rpc QuerySlot(QuerySlotRequest) returns (QuerySlotResponse) {}
message QuerySlotResponse {
common.Status status = 1;
int64 availableSlots = 2;
}
Slot计算:
totalSlots = min(CPU核数/2, 内存GB/4)
availableSlots = totalSlots - indexStatsUsed - compactionUsed - importUsed
关键代码:
func (node *DataNode) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest) (*datapb.QuerySlotResponse, error) {
var (
totalSlots = node.totalSlot
indexStatsUsed = node.taskScheduler.TaskQueue.GetUsingSlot()
compactionUsed = node.compactionExecutor.Slots()
importUsed = node.importScheduler.Slots()
)
availableSlots := totalSlots - indexStatsUsed - compactionUsed - importUsed
if availableSlots < 0 {
availableSlots = 0
}
// 更新Metrics
metrics.DataNodeSlot.WithLabelValues(fmt.Sprint(node.GetNodeID()), "available").Set(float64(availableSlots))
metrics.DataNodeSlot.WithLabelValues(fmt.Sprint(node.GetNodeID()), "total").Set(float64(totalSlots))
return &datapb.QuerySlotResponse{
Status: merr.Success(),
AvailableSlots: availableSlots,
}, nil
}
4.4 GetCompactionState - 任务状态查询
接口定义:
rpc GetCompactionState(CompactionStateRequest) returns (CompactionStateResponse) {}
message CompactionStateRequest {
int64 planID = 1;
}
message CompactionStateResponse {
common.Status status = 1;
repeated CompactionPlanResult results = 2;
}
关键代码:
func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.CompactionStateRequest) (*datapb.CompactionStateResponse, error) {
results := node.compactionExecutor.GetResults(req.GetPlanID())
return &datapb.CompactionStateResponse{
Status: merr.Success(),
Results: results,
}, nil
}
5. 性能与容量规划
5.1 性能指标
| 操作类型 | 吞吐量 | 延迟 | 资源消耗 | 说明 |
|---|---|---|---|---|
| Sync任务 | 20-100MB/s | 100-500ms | CPU:中等 内存:低 |
取决于字段数和压缩算法 |
| L0Compaction | 200-500MB/s | 1-3s/GB | CPU:低 内存:低 |
仅处理Delete数据 |
| MixCompaction | 50-200MB/s | 5-20s/GB | CPU:高 内存:中等 |
需读取和过滤所有数据 |
| ClusteringCompaction | 30-100MB/s | 10-60s/GB | CPU:高 内存:高 |
需要Analyze和重组数据 |
| 索引构建(HNSW) | 10-50万行/分钟 | 取决于维度和参数 | CPU:高 内存:高 |
向量维度越高越慢 |
| 索引构建(IVF_FLAT) | 50-200万行/分钟 | 取决于nlist | CPU:中等 内存:中等 |
训练阶段较慢 |
| Import任务 | 100-500MB/s | 取决于文件大小 | CPU:中等 内存:中等 |
Parquet最快,JSON最慢 |
5.2 容量规划
Slot配置
| 资源配置 | TotalSlot | 建议任务分配 |
|---|---|---|
| 8核16GB | 4 | Index: 2, Compaction: 1, Import: 1 |
| 16核32GB | 8 | Index: 4, Compaction: 2, Import: 2 |
| 32核64GB | 16 | Index: 8, Compaction: 4, Import: 4 |
| 64核128GB | 32 | Index: 16, Compaction: 8, Import: 8 |
内存规划
| 组件 | 内存占用 | 说明 |
|---|---|---|
| SyncManager | <1GB | 固定开销 |
| SyncTask(单个) | 数据大小*1.2 | 包含序列化缓冲 |
| Compaction(Mix) | 输入Segment大小总和 | 需全部加载到内存 |
| Compaction(Clustering) | 输入大小*1.5 | 需额外Buffer |
| IndexBuild(HNSW) | NumRows * Dim * 4 * 2 | 原始数据+索引 |
| IndexBuild(IVF) | NumRows * Dim * 4 * 1.2 | 原始数据+倒排表 |
5.3 关键配置参数
dataNode:
# Sync配置
dataSync:
workerPoolSize: 16
writeRetry:
maxRetries: 3
initialBackoff: 100ms
# StorageV2配置
storageV2:
enabled: true
multiPartUploadSize: 5MB
# DataCoord配置(影响DataNode)
dataCoord:
compaction:
clusteringCompactionSlotUsage: 8
mixCompactionSlotUsage: 2
level0DeleteCompactionSlotUsage: 1
5.4 监控指标
# Slot使用
datanode_slot{nodeID="1", type="available"}
datanode_slot{nodeID="1", type="total"}
# Sync任务
datanode_flush_buffer_count{nodeID="1", status="success"}
datanode_save_to_storage_latency_seconds{nodeID="1"}
# Compaction任务
datanode_write_data_count{nodeID="1", data_source="compaction"}
# 索引构建
datanode_knowhere_build_index_latency_seconds{nodeID="1"}
5.5 性能调优建议
- Sync性能优化:增加
workerPoolSize,启用StorageV2格式(提升30-50%) - Compaction优化:合理设置Slot占用,避免OOM;对大Segment优先使用SortCompaction
- 索引构建优化:HNSW增加
buildThreads;DiskANN使用SSD;分批构建大规模索引 - Import优化:使用Parquet格式(比JSON快3-5倍);增加
maxConcurrentReadFiles