Milvus-06-QueryNode-概览

1. 模块概述

1.1 职责定义

QueryNode负责向量检索和标量查询的执行,是Milvus查询路径的核心执行引擎。每个QueryNode管理一个或多个Collection的Shard,通过ShardDelegator协调查询请求和数据管理。

核心职责

  1. Segment加载与管理

    • 从Object Storage加载Sealed Segment
    • 加载索引文件(HNSW/IVF/DiskANN等)
    • 管理内存Segment缓存(LRU策略)
    • 支持Lazy Load和Mmap模式
  2. Growing Segment管理

    • 订阅DML Channel消费实时数据
    • 维护增量数据(未Flush的数据)
    • 实时应用Delete操作
    • 管理Delete Buffer
  3. 向量检索

    • 调用向量索引(Knowhere)
    • TopK合并(Sealed + Growing)
    • 距离计算与排序
    • 支持多种索引类型(HNSW/IVF/DiskANN/FLAT)
  4. 标量过滤

    • 执行表达式过滤(expr:“age>18 and city==‘Beijing’")
    • Bitmap索引加速
    • 结果集合并与去重
  5. 查询执行

    • Query(标量查询/根据ID查询)
    • Search(向量检索)
    • QueryStream(流式查询)
    • GetStatistics(统计信息查询)
  6. 分布式协调

    • ShardDelegator:管理单个Shard的数据和查询
    • Worker模式:本地Worker和远程Worker
    • Segment分布管理:跨QueryNode的Segment分发

1.2 整体架构图

flowchart TB
    subgraph Proxy["Proxy"]
        P[Proxy Service]
    end
    
    subgraph QueryNode["QueryNode"]
        Server[QueryNode Server<br/>gRPC服务入口]
        
        subgraph DelegatorMgr["ShardDelegator管理"]
            D1[ShardDelegator-1<br/>vChannel_1]
            D2[ShardDelegator-2<br/>vChannel_2]
            DN[ShardDelegator-N<br/>vChannel_N]
        end
        
        subgraph Scheduler["任务调度器"]
            Sched[Scheduler<br/>任务队列与调度]
            TaskQueue[Task Queue]
        end
        
        subgraph Pipeline["Pipeline管理器"]
            PM[Pipeline Manager]
            P1[Pipeline-1<br/>FilterNodeInsertNodeDeleteNode]
            P2[Pipeline-2<br/>FilterNodeInsertNodeDeleteNode]
        end
        
        subgraph SegmentMgr["Segment管理"]
            SM[Segment Manager<br/>Segment生命周期管理]
            Loader[Segment Loader<br/>Object Storage加载]
            Cache[Segment Cache<br/>LRU缓存策略]
        end
        
        subgraph Worker["Worker"]
            LW[Local Worker<br/>本地执行]
            CM[Cluster Manager<br/>Worker管理]
        end
        
        Server -->|路由到Channel| D1
        Server -->|路由到Channel| D2
        Server -->|路由到Channel| DN
        
        D1 -->|提交任务| Sched
        D2 -->|提交任务| Sched
        DN -->|提交任务| Sched
        
        Sched --> TaskQueue
        TaskQueue -->|分发| LW
        TaskQueue -->|分发| CM
        
        D1 -->|消费DML| P1
        D2 -->|消费DML| P2
        
        P1 -->|Insert/Delete| D1
        P2 -->|Insert/Delete| D2
        
        D1 -->|LoadSegment| Loader
        D2 -->|LoadSegment| Loader
        
        Loader -->|管理| SM
        Loader -->|缓存| Cache
        
        LW -->|查询| SM
        CM -->|查询| SM
    end
    
    subgraph External["外部组件"]
        OS[Object Storage<br/>MinIO/S3]
        MQ[MessageQueue<br/>Pulsar/Kafka]
        QC[QueryCoord<br/>协调器]
        RemoteQN[Remote QueryNode<br/>其他QueryNode]
    end
    
    P -.->|Search/Query| Server
    Loader -.->|Download Binlog/Index| OS
    P1 -.->|Subscribe| MQ
    P2 -.->|Subscribe| MQ
    Server -.->|HeartBeat/Report| QC
    QC -.->|LoadSegment/WatchDmChannel| Server
    CM -.->|Remote Call| RemoteQN
    
    style QueryNode fill:#e1f5ff,stroke:#333,stroke-width:2px
    style DelegatorMgr fill:#fff4e6,stroke:#333,stroke-width:1px
    style Scheduler fill:#e6ffe6,stroke:#333,stroke-width:1px
    style Pipeline fill:#ffe6e6,stroke:#333,stroke-width:1px
    style SegmentMgr fill:#f0e6ff,stroke:#333,stroke-width:1px

架构图说明

  1. QueryNode Server

    • gRPC服务入口,接收Proxy的Search/Query请求
    • 接收QueryCoord的LoadSegment/WatchDmChannel等管理请求
    • 按vChannel路由请求到对应的ShardDelegator
  2. ShardDelegator(核心组件)

    • 每个vChannel对应一个ShardDelegator
    • 管理该Channel的所有Segment(Sealed + Growing)
    • 维护Delete Buffer缓存删除数据
    • 协调查询任务分发到Worker
    • 管理Segment Distribution(Segment在哪些Worker上)
  3. Pipeline(DML消费)

    • 每个vChannel对应一个Pipeline
    • FilterNode:过滤和验证DML消息
    • InsertNode:处理Insert消息,写入Growing Segment
    • DeleteNode:处理Delete消息,写入Delete Buffer
  4. Scheduler(任务调度)

    • 管理Search/Query任务队列
    • 按优先级调度任务
    • 限流和并发控制
  5. Segment Manager & Loader

    • 管理所有Segment的元数据
    • 从Object Storage加载Segment
    • LRU缓存策略,自动淘汰
  6. Worker

    • Local Worker:本地执行查询
    • Remote Worker:通过gRPC调用其他QueryNode
    • Cluster Manager:管理Worker池

1.3 Segment类型

Historical Segment(Sealed)

  • 已Flush到Object Storage
  • 只读,不再变更
  • 有索引(HNSW/IVF等)

Growing Segment

  • 内存中,未Flush
  • 持续接收新数据
  • 使用Brute Force检索或简单索引
flowchart LR
    Growing[Growing Segment<br/>内存,可写] -->|Flush| Sealed[Sealed Segment<br/>Object Storage,只读]
    Sealed -->|Build Index| Indexed[Indexed Segment<br/>带索引]
    
    style Growing fill:#ffe6e6
    style Sealed fill:#fff4e6
    style Indexed fill:#e6ffe6

2. 上游接口与调用链路

2.1 对外接口(gRPC)

QueryNode对外提供以下gRPC接口:

// internal/querynodev2/services.go
service QueryNode {
    // 数据加载
    rpc WatchDmChannels(WatchDmChannelsRequest) returns (Status);
    rpc UnsubDmChannel(UnsubDmChannelRequest) returns (Status);
    rpc LoadSegments(LoadSegmentsRequest) returns (Status);
    rpc ReleaseSegments(ReleaseSegmentsRequest) returns (Status);
    
    // 查询服务
    rpc Search(SearchRequest) returns (SearchResults);
    rpc Query(QueryRequest) returns (RetrieveResults);
    rpc SearchSegments(SearchRequest) returns (SearchResults);  // Worker调用
    rpc QuerySegments(QueryRequest) returns (RetrieveResults);   // Worker调用
    
    // 统计与管理
    rpc GetStatistics(GetStatisticsRequest) returns (GetStatisticsResponse);
    rpc GetSegmentInfo(GetSegmentInfoRequest) returns (GetSegmentInfoResponse);
}

接口分类

  1. 管理接口(QueryCoord调用)

    • WatchDmChannels:订阅DML Channel
    • LoadSegments:加载Sealed Segment
    • ReleaseSegments:释放Segment
  2. 查询接口(Proxy调用)

    • Search:向量检索
    • Query:标量查询
  3. Worker接口(QueryNode之间调用)

    • SearchSegments:在指定Segment上检索
    • QuerySegments:在指定Segment上查询

3. 核心流程详解

3.1 Search 流程(完整调用链)

3.1.1 Search 完整时序图

sequenceDiagram
    autonumber
    participant Proxy as Proxy
    participant QN as QueryNode.Server
    participant Delegator as ShardDelegator
    participant Dist as Distribution
    participant Scheduler as Scheduler
    participant Task as SearchTask
    participant Worker as LocalWorker
    participant SegMgr as SegmentManager
    participant Seg as Segment
    participant Knowhere as Knowhere索引
    
    Note over Proxy,Knowhere: 阶段1:请求路由
    Proxy->>QN: Search(vectors, topK, channel)
    QN->>QN: 获取Collection信息
    QN->>Delegator: searchChannel(req, vChannel)
    
    Note over Proxy,Knowhere: 阶段2:等待TSafe
    Delegator->>Delegator: waitTSafe(guaranteeTS)
    Delegator->>Dist: PinReadableSegments(partitions)
    Dist-->>Delegator: sealed[], growing[], version
    
    Note over Proxy,Knowhere: 阶段3:任务组织
    Delegator->>Delegator: organizeSubTask(sealed, growing)
    Delegator->>Delegator: 按Worker分组Segment
    
    Note over Proxy,Knowhere: 阶段4:任务执行(并发)
    par 并发执行多个Worker任务
        Delegator->>Worker: SearchSegments(segmentIDs_1)
        Worker->>Scheduler: 提交SearchTask
        Scheduler->>Task: Execute()
        
        Task->>SegMgr: GetSegments(segmentIDs)
        SegMgr-->>Task: segments[]
        
        loop 对每个Segment
            Task->>Seg: Search(vector, topK, filter)
            Seg->>Knowhere: VectorIndex.Search()
            Knowhere-->>Seg: results
            Seg->>Seg: ApplyScalarFilter()
            Seg-->>Task: SearchResult
        end
        
        Task->>Task: MergeResults(allResults)
        Task-->>Worker: SearchResults_1
        Worker-->>Delegator: SearchResults_1
    and
        Delegator->>Worker: SearchSegments(segmentIDs_2)
        Note over Worker: 同上流程
        Worker-->>Delegator: SearchResults_2
    end
    
    Note over Proxy,Knowhere: 阶段5:结果合并
    Delegator->>Delegator: MergeAllResults(results[])
    Delegator-->>QN: SearchResults
    
    Note over Proxy,Knowhere: 阶段6:Reduce
    QN->>QN: ReduceSearchResults(results)
    QN-->>Proxy: Final SearchResults
    
    Delegator->>Dist: Unpin(version)

时序图详细说明

阶段1:请求路由(步骤1-3)

  • Proxy根据Collection的Shard分配,将请求发送到对应的QueryNode
  • QueryNode.Server根据vChannel路由到对应的ShardDelegator
  • 每个vChannel在QueryNode上只有一个ShardDelegator实例

阶段2:等待TSafe(步骤4-6)

  • waitTSafe:等待数据可见性,确保guaranteeTS之前的数据已消费完成
  • PinReadableSegments:从Distribution中获取可读的Segment列表
    • sealed:Sealed Segment列表(已Flush到Object Storage)
    • growing:Growing Segment列表(内存中,实时数据)
    • version:Distribution版本号,用于并发控制

阶段3:任务组织(步骤7-8)

  • organizeSubTask:将Segment按Worker分组
    • 本地Segment:分配给LocalWorker
    • 远程Segment:分配给RemoteWorker(其他QueryNode)
  • 形成多个SubTask,每个SubTask包含一组Segment

阶段4:任务执行(步骤9-20)

  • 并发执行:多个SubTask并发执行
  • Scheduler调度:按优先级调度任务
  • Segment检索
    • 从SegmentManager获取Segment实例
    • 调用Segment.Search()执行向量检索
    • Knowhere执行向量索引检索(HNSW/IVF等)
    • 应用标量过滤(expr)
    • 每个Segment返回TopK结果
  • 结果合并:Task内部合并多个Segment的结果

阶段5:结果合并(步骤21-22)

  • Delegator合并所有Worker返回的结果
  • 全局TopK排序

阶段6:Reduce(步骤23-24)

  • QueryNode.Server执行Reduce操作
  • 返回给Proxy

3.1.2 Search 关键代码分析

(1)QueryNode.Search 入口
// internal/querynodev2/services.go:827
func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
    // 1. 健康检查
    if err := node.lifetime.Add(merr.IsHealthy); err != nil {
        return &internalpb.SearchResults{Status: merr.Status(err)}, nil
    }
    defer node.lifetime.Done()
    
    // 2. 获取Collection
    collection := node.manager.Collection.Get(req.GetReq().GetCollectionID())
    if collection == nil {
        return &internalpb.SearchResults{
            Status: merr.Status(merr.WrapErrCollectionNotLoaded(req.GetReq().GetCollectionID())),
        }, nil
    }
    
    // 3. 路由到Channel
    ch := req.GetDmlChannels()[0]  // 每次Search只针对一个Channel
    ret, err := node.searchChannel(ctx, req, ch)
    if err != nil {
        return &internalpb.SearchResults{Status: merr.Status(err)}, nil
    }
    
    return ret, nil
}

关键点

  • Search请求中的DmlChannels只包含一个Channel
  • Proxy会将一个Collection的多个Shard分别发送到不同的QueryNode
  • 每个QueryNode只处理自己负责的Channel
(2)searchChannel:路由到Delegator
// internal/querynodev2/handlers.go:382
func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchRequest, channel string) (*internalpb.SearchResults, error) {
    // 1. 获取ShardDelegator
    sd, ok := node.delegators.Get(channel)
    if !ok {
        err := merr.WrapErrChannelNotFound(channel)
        log.Warn("Query failed, failed to get shard delegator for search", zap.Error(err))
        return nil, err
    }
    
    // 2. 调用Delegator执行Search
    results, err := sd.Search(ctx, req)
    if err != nil {
        log.Warn("failed to search on delegator", zap.Error(err))
        return nil, err
    }
    
    // 3. Reduce结果
    resp, err := segments.ReduceSearchOnQueryNode(ctx, results, req, node.manager)
    return resp, err
}
(3)ShardDelegator.Search:核心协调逻辑
// internal/querynodev2/delegator/delegator.go:373
func (sd *shardDelegator) Search(ctx context.Context, req *querypb.SearchRequest) ([]*internalpb.SearchResults, error) {
    // 1. 等待TSafe
    waitTr := timerecord.NewTimeRecorder("wait tSafe")
    tSafe, err := sd.waitTSafe(ctx, req.Req.GetGuaranteeTimestamp())
    if err != nil {
        return nil, err
    }
    if req.GetReq().GetMvccTimestamp() == 0 {
        req.Req.MvccTimestamp = tSafe
    }
    
    // 2. Pin Segment Distribution
    sealed, growing, sealedRowCount, version, err := sd.distribution.PinReadableSegments(
        partialResultRequiredDataRatio,
        req.GetReq().GetPartitionIDs()...,
    )
    if err != nil {
        return nil, err
    }
    defer sd.distribution.Unpin(version)
    
    // 3. Segment裁剪(可选)
    if paramtable.Get().QueryNodeCfg.EnableSegmentPrune.GetAsBool() {
        PruneSegments(ctx, sd.partitionStats, nil, req.GetReq(), sd.collection.Schema(), sealed, pruneInfo)
    }
    
    // 4. 组织SubTask
    tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, false, sd.modifySearchRequest)
    if err != nil {
        return nil, err
    }
    
    // 5. 执行SubTask
    results, err := executeSubTasks(ctx, tasks, NewRowCountBasedEvaluator(sealedRowCount), 
        func(ctx context.Context, req *querypb.SearchRequest, worker cluster.Worker) (*internalpb.SearchResults, error) {
            resp, err := worker.SearchSegments(ctx, req)
            // 处理Worker不可用
            status, ok := status.FromError(err)
            if ok && status.Code() == codes.Unavailable {
                sd.markSegmentOffline(req.GetSegmentIDs()...)
            }
            return resp, err
        }, "Search", log)
    
    return results, err
}

关键机制

  1. TSafe机制

    • waitTSafe:阻塞等待,直到guaranteeTS之前的数据都已消费
    • 确保查询的一致性
  2. Distribution Pin/Unpin

    • PinReadableSegments:获取当前可读的Segment列表,并Pin住版本
    • Unpin:释放版本,允许Segment分布变更
    • 防止查询过程中Segment分布变化
  3. Segment Pruning

    • 根据统计信息(Partition Stats)裁剪不可能包含结果的Segment
    • 减少不必要的检索
(4)Worker.SearchSegments:在Segment上执行检索
// internal/querynodev2/services.go:739
func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
    // 1. 健康检查
    if err := node.lifetime.Add(merr.IsHealthy); err != nil {
        resp.Status = merr.Status(err)
        return resp, nil
    }
    defer node.lifetime.Done()
    
    // 2. 获取Collection
    collection := node.manager.Collection.Get(req.Req.GetCollectionID())
    
    // 3. 创建SearchTask
    var task scheduler.Task
    task = tasks.NewSearchTask(ctx, collection, node.manager, req)
    
    // 4. 提交到Scheduler
    if err := node.scheduler.Add(task); err != nil {
        return nil, err
    }
    
    // 5. 等待执行完成
    if err := task.Wait(); err != nil {
        return nil, err
    }
    
    // 6. 返回结果
    return task.Result(), nil
}
(5)SearchTask.Execute:执行检索
// internal/querynodev2/tasks/search_task.go:133
func (t *SearchTask) Execute() error {
    // 1. 创建SearchRequest(C++ Segcore)
    searchReq, err := segcore.NewSearchRequest(t.collection.GetCCollection(), req, t.placeholderGroup)
    defer searchReq.Delete()
    
    // 2. 根据Scope查询不同类型的Segment
    var results []*segments.SearchResult
    if req.GetScope() == querypb.DataScope_Historical {
        // 查询Sealed Segment
        results, searchedSegments, err = segments.SearchHistorical(
            t.ctx,
            t.segmentManager,
            searchReq,
            req.GetReq().GetCollectionID(),
            req.GetReq().GetPartitionIDs(),
            req.GetSegmentIDs(),
        )
    } else if req.GetScope() == querypb.DataScope_Streaming {
        // 查询Growing Segment
        results, searchedSegments, err = segments.SearchStreaming(...)
    }
    defer t.segmentManager.Segment.Unpin(searchedSegments)
    defer segments.DeleteSearchResults(results)
    
    // 3. Reduce结果
    ret, err := segments.ReduceSearchResultsAndFillData(
        ctx,
        searchReq.Plan(),
        results,
        int64(len(results)),
        t.originNqs,
        t.originTopks,
    )
    
    t.result = ret
    return nil
}
(6)Segment.Search:向量索引检索
// internal/querynodev2/segments/segment.go
func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*SearchResult, error) {
    // 1. 调用C++ Segcore执行检索
    searchResult, err := s.ptrLock.RLockIf(
        state.IsNotReleased,
        func() (*SearchResult, error) {
            // 调用C++ API
            return s.search(ctx, searchReq)
        },
    )
    
    return searchResult, err
}

func (s *LocalSegment) search(ctx context.Context, searchReq *SearchRequest) (*SearchResult, error) {
    // CGO调用C++ Segcore
    cRes := C.Search(
        s.ptr,
        searchReq.plan.cSearchPlan,
        searchReq.cPlaceholderGroup,
        C.uint64_t(searchReq.mvccTimestamp),
    )
    
    // 转换结果
    result := &SearchResult{
        cSearchResult: cRes,
    }
    
    return result, nil
}

C++层向量检索(Segcore):

  • 调用Knowhere库执行向量索引检索
  • 支持HNSW、IVF、DiskANN等多种索引
  • 应用标量过滤(expr)
  • 返回TopK结果

3.2 LoadSegment 流程

3.2.1 LoadSegment 时序图

sequenceDiagram
    autonumber
    participant QC as QueryCoord
    participant QN as QueryNode.Server
    participant Delegator as ShardDelegator
    participant Loader as SegmentLoader
    participant SM as SegmentManager
    participant OS as Object Storage
    participant Seg as LocalSegment
    
    Note over QC,Seg: 阶段1:接收加载请求
    QC->>QN: LoadSegments(segmentLoadInfos)
    QN->>QN: 获取Collection元信息
    
    alt needTransfer = true(需要Delegator处理
        QN->>Delegator: LoadSegments(req)
        Delegator->>Delegator: 确定目标Worker
        alt 目标是本地
            Delegator->>QN: LoadSegments(needTransfer=false)
        else 目标是远程
            Delegator->>RemoteQN: LoadSegments via Worker
            RemoteQN-->>Delegator: Success
        end
    end
    
    Note over QC,Seg: 阶段2:准备加载
    QN->>Loader: Load(segmentLoadInfos)
    Loader->>Loader: prepare():过滤已加载的Segment
    Loader->>Loader: requestResource():检查内存限制
    
    Note over QC,Seg: 阶段3:并发加载Segment
    par 并发加载多个Segment
        loop 对每个Segment
            Loader->>OS: DownloadBinlog(insertLog)
            OS-->>Loader: Binlog数据
            
            Loader->>OS: DownloadBinlog(indexFiles)
            OS-->>Loader: 索引文件
            
            Loader->>OS: DownloadBinlog(statsLog)
            OS-->>Loader: 统计信息
            
            Loader->>OS: DownloadBinlog(deltaLog)
            OS-->>Loader: Delete数据
        end
        
        Loader->>Seg: 创建LocalSegment
        Loader->>Seg: LoadMultiFieldData():加载字段数据
        Loader->>Seg: LoadIndex():加载向量索引
        Loader->>Seg: LoadStatslog():加载统计信息
        Loader->>Seg: LoadDeltaLog():加载Delete数据
        
        Seg-->>Loader: 加载完成
    end
    
    Note over QC,Seg: 阶段4:注册Segment
    Loader->>SM: Put(segment)
    Loader->>Delegator: LoadSegments完成,更新Distribution
    Delegator->>Delegator: SyncDistribution():同步Segment分布
    
    Note over QC,Seg: 阶段5:应用Delete
    Delegator->>Delegator: forwardDelete(segment)
    Delegator->>Delegator: applyBFInParallel():使用BloomFilter过滤Delete
    Delegator->>Seg: Delete(pks, tss)
    
    Loader-->>QN: 加载成功
    QN-->>QC: Success
    
    Note over QC,Seg: 阶段6:心跳上报
    QN->>QC: HeartBeat:上报已加载Segment

LoadSegment 时序图详细说明

阶段1:接收加载请求(步骤1-8)

  • QueryCoord发送LoadSegments请求到QueryNode
  • 如果needTransfer=true,说明需要通过Delegator转发
    • Delegator根据Distribution决定目标Worker
    • 如果是本地,调用本地LoadSegments
    • 如果是远程,通过RemoteWorker调用其他QueryNode
  • 这种机制支持Segment在QueryNode之间动态分发

阶段2:准备加载(步骤9-11)

  • prepare():过滤已加载或正在加载的Segment,避免重复加载
  • requestResource():检查内存和磁盘空间是否足够
    • 估算Segment大小(数据+索引)
    • 检查是否超过内存限制
    • 如果空间不足,触发LRU淘汰

阶段3:并发加载Segment(步骤12-23)

  • 多个Segment并发加载,提高效率
  • 对每个Segment:
    1. 下载Binlog数据

      • insertLog:原始数据(向量和标量字段)
      • indexFiles:向量索引文件
      • statsLog:统计信息(min/max值等)
      • deltaLog:Delete数据(主键列表)
    2. 加载数据到Segment

      • LoadMultiFieldData():反序列化字段数据
      • LoadIndex():加载向量索引到内存或Mmap
      • LoadStatslog():加载统计信息
      • LoadDeltaLog():加载Delete数据

阶段4:注册Segment(步骤24-26)

  • 将加载好的Segment注册到SegmentManager
  • 更新Delegator的Distribution
    • 记录Segment在哪个Worker上
    • 更新Distribution版本号

阶段5:应用Delete(步骤27-29)

  • 将Delete Buffer中的数据应用到新加载的Segment
  • 使用BloomFilter快速过滤不相关的Delete记录
  • 只有匹配的Delete记录才会应用到Segment

阶段6:心跳上报(步骤31-32)

  • QueryNode通过心跳将已加载的Segment上报给QueryCoord
  • QueryCoord更新全局的Segment分布视图

3.2.2 LoadSegment 关键代码分析

(1)QueryNode.LoadSegments 入口
// internal/querynodev2/services.go:468
func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) {
    // 1. 健康检查
    if err := node.lifetime.Add(merr.IsHealthy); err != nil {
        return merr.Status(err), nil
    }
    defer node.lifetime.Done()
    
    // 2. 估算Binlog大小(如果没有提供)
    for _, s := range req.GetInfos() {
        fallbackBinlogMemorySize(s.GetBinlogPaths())
        fallbackBinlogMemorySize(s.GetDeltalogs())
    }
    
    // 3. 判断是否需要通过Delegator转发
    if req.GetNeedTransfer() {
        delegator, ok := node.delegators.Get(segment.GetInsertChannel())
        if !ok {
            return merr.Status(merr.WrapErrChannelNotFound(segment.GetInsertChannel())), nil
        }
        
        // 通过Delegator处理加载
        req.NeedTransfer = false
        err := delegator.LoadSegments(ctx, req)
        if err != nil {
            return merr.Status(err), nil
        }
        return merr.Success(), nil
    }
    
    // 4. 直接加载
    err := node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(), indexMeta, loadMeta)
    defer node.manager.Collection.Unref(req.GetCollectionID(), 1)
    
    // 5. 调用Loader加载
    loaded, err := node.loader.Load(ctx,
        req.GetCollectionID(),
        segments.SegmentTypeSealed,
        req.GetVersion(),
        req.GetInfos()...,
    )
    if err != nil {
        return merr.Status(err), nil
    }
    
    // 6. 增加Collection引用计数
    node.manager.Collection.Ref(req.GetCollectionID(), uint32(len(loaded)))
    
    return merr.Success(), nil
}

关键点

  • NeedTransfer:标识是否需要Delegator转发
    • true:Delegator根据Distribution决定目标Worker
    • false:直接在本地加载
  • Collection引用计数:防止Collection在加载过程中被释放
(2)Delegator.LoadSegments:转发逻辑
// internal/querynodev2/delegator/delegator_data.go:396
func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error {
    if len(req.GetInfos()) == 0 {
        return nil
    }
    
    targetNodeID := req.GetDstNodeID()
    
    // 1. Pin Delete Buffer(防止清理)
    for _, info := range req.GetInfos() {
        sd.deleteBuffer.Pin(info.GetStartPosition().GetTimestamp(), info.GetSegmentID())
    }
    defer func() {
        for _, info := range req.GetInfos() {
            sd.deleteBuffer.Unpin(info.GetStartPosition().GetTimestamp(), info.GetSegmentID())
        }
    }()
    
    // 2. 获取目标Worker
    worker, err := sd.workerManager.GetWorker(ctx, targetNodeID)
    if err != nil {
        return err
    }
    
    // 3. 调用Worker加载Segment
    req.Base.TargetID = targetNodeID
    err = worker.LoadSegments(ctx, req)
    
    // 4. 更新Distribution
    sd.distribution.AddDistributions(/* segment info */)
    
    // 5. 应用Delete Buffer
    sd.forwardDelete(ctx, req.GetInfos(), targetNodeID)
    
    return nil
}

关键机制

  • Pin/Unpin Delete Buffer:防止Delete数据在加载过程中被清理
  • Distribution更新:记录Segment加载到了哪个Worker
  • forwardDelete:将历史Delete数据应用到新加载的Segment
(3)SegmentLoader.Load:核心加载逻辑
// internal/querynodev2/segments/segment_loader.go:249
func (loader *segmentLoader) Load(ctx context.Context,
    collectionID int64,
    segmentType SegmentType,
    version int64,
    segments ...*querypb.SegmentLoadInfo,
) ([]Segment, error) {
    if len(segments) == 0 {
        return nil, nil
    }
    
    collection := loader.manager.Collection.Get(collectionID)
    if collection == nil {
        return nil, merr.WrapErrCollectionNotFound(collectionID)
    }
    
    // 1. 过滤已加载的Segment
    infos := loader.prepare(ctx, segmentType, segments...)
    defer loader.unregister(infos...)
    
    // 2. 检查资源(内存/磁盘)
    if !isLazyLoad(collection, segmentType) {
        requestResourceResult, err := loader.requestResource(ctx, infos...)
        if err != nil {
            return nil, err
        }
        defer loader.freeRequest(requestResourceResult.Resource, requestResourceResult.LogicalResource)
    }
    
    // 3. 并发加载多个Segment
    newSegments := typeutil.NewConcurrentMap[int64, Segment]()
    loaded := typeutil.NewConcurrentMap[int64, Segment]()
    
    loadedSegments, err := loader.loadSegments(ctx, collection, segmentType, version, infos, newSegments, loaded)
    if err != nil {
        return nil, err
    }
    
    // 4. 注册到SegmentManager
    for _, segment := range loadedSegments {
        loader.manager.Segment.Put(ctx, segmentType, segment)
    }
    
    return loadedSegments, nil
}
(4)SegmentLoader.LoadSegment:加载单个Segment
// internal/querynodev2/segments/segment_loader.go:1002
func (loader *segmentLoader) LoadSegment(ctx context.Context,
    seg Segment,
    loadInfo *querypb.SegmentLoadInfo,
) (err error) {
    segment, ok := seg.(*LocalSegment)
    if !ok {
        return merr.WrapErrParameterInvalid("LocalSegment", fmt.Sprintf("%T", seg))
    }
    
    collection := loader.manager.Collection.Get(segment.Collection())
    pkField := GetPkField(collection.Schema())
    
    if segment.Type() == SegmentTypeSealed {
        // 加载Sealed Segment
        if err := loader.loadSealedSegment(ctx, loadInfo, segment); err != nil {
            return err
        }
    } else {
        // 加载Growing Segment
        if err := segment.LoadMultiFieldData(ctx); err != nil {
            return err
        }
    }
    
    // 加载统计信息
    if segment.segmentType == SegmentTypeGrowing {
        if err := loader.loadStatslog(ctx, collection, segment, loadInfo.GetStatsLogs()); err != nil {
            return err
        }
    }
    
    // 加载Delete数据
    if err := loader.loadDeltaLogs(ctx, segment, loadInfo.GetDeltalogs()); err != nil {
        return err
    }
    
    return nil
}

加载步骤

  1. 加载字段数据:从Binlog恢复向量和标量字段
  2. 加载索引:加载向量索引(HNSW/IVF等)
  3. 加载统计信息:min/max值,用于Segment裁剪
  4. 加载Delete数据:主键列表和时间戳
(5)加载索引到内存
// internal/querynodev2/segments/segment_loader.go
func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *querypb.SegmentLoadInfo, segment *LocalSegment) error {
    // 1. 加载字段数据
    if err := segment.LoadMultiFieldData(ctx); err != nil {
        return err
    }
    
    // 2. 加载索引
    if err := loader.LoadIndex(ctx, segment, loadInfo, version); err != nil {
        return err
    }
    
    return nil
}

func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo, version int64) error {
    for _, indexInfo := range loadInfo.GetIndexInfos() {
        fieldID := indexInfo.GetFieldID()
        
        // 下载索引文件
        indexData, err := loader.downloadIndexFiles(ctx, indexInfo)
        if err != nil {
            return err
        }
        
        // 加载到Segment
        if err := segment.LoadIndex(ctx, fieldID, indexData, indexInfo); err != nil {
            return err
        }
    }
    
    return nil
}

索引加载模式

  1. LoadAndHold:加载到内存,一直持有
  2. LoadAndRelease:加载后立即释放(适用于LRU缓存)
  3. Mmap:使用内存映射,减少内存占用

3.3 Query 流程

Query流程与Search类似,但有以下区别:

  1. 返回数据:Query返回完整的字段数据,Search只返回ID和距离
  2. 多Channel并发:Query可以同时查询多个Channel
  3. 支持迭代器:QueryStream支持流式返回结果

3.3.1 Query 时序图

sequenceDiagram
    autonumber
    participant Proxy as Proxy
    participant QN as QueryNode.Server
    participant Delegator1 as ShardDelegator-1
    participant Delegator2 as ShardDelegator-2
    participant Worker as LocalWorker
    participant SegMgr as SegmentManager
    
    Note over Proxy,SegMgr: 阶段1:请求分发
    Proxy->>QN: Query(channels[ch1, ch2])
    
    par 并发查询多个Channel
        QN->>Delegator1: queryChannel(ch1)
        Delegator1->>Delegator1: waitTSafe()
        Delegator1->>Delegator1: PinReadableSegments()
        Delegator1->>Delegator1: organizeSubTask()
        Delegator1->>Worker: QuerySegments(segmentIDs_1)
        Worker->>SegMgr: Query on segments
        SegMgr-->>Worker: RetrieveResults_1
        Worker-->>Delegator1: RetrieveResults_1
        Delegator1-->>QN: RetrieveResults_1
    and
        QN->>Delegator2: queryChannel(ch2)
        Note over Delegator2,Worker: 同上流程
        Delegator2-->>QN: RetrieveResults_2
    end
    
    Note over Proxy,SegMgr: 阶段2:合并结果
    QN->>QN: CreateInternalReducer()
    QN->>QN: Reduce(results[])
    QN->>QN: 去重、排序、限制数量
    QN-->>Proxy: Final RetrieveResults

Query与Search的差异

特性 Search Query
目的 向量检索 标量查询/根据ID查询
返回内容 ID + 距离 + 少量字段 完整字段数据
Channel处理 单Channel 多Channel并发
索引使用 向量索引(HNSW/IVF) 标量索引(Bitmap)
流式支持 不支持 支持QueryStream
结果合并 TopK合并 去重+排序+限制数量

3.4 WatchDmChannel 流程(Pipeline初始化)

WatchDmChannel是QueryNode订阅DML Channel的流程,创建Pipeline消费实时数据。

3.4.1 WatchDmChannel 时序图

sequenceDiagram
    autonumber
    participant QC as QueryCoord
    participant QN as QueryNode.Server
    participant CM as ClusterManager
    participant Delegator as ShardDelegator
    participant PM as PipelineManager
    participant Pipeline as Pipeline
    participant MQ as MessageQueue
    
    Note over QC,MQ: 阶段1:接收订阅请求
    QC->>QN: WatchDmChannels(channel, seekPosition)
    QN->>QN: 获取Collection元信息
    
    Note over QC,MQ: 阶段2:创建ShardDelegator
    QN->>Delegator: NewShardDelegator(channel)
    Delegator->>Delegator: 初始化Distribution
    Delegator->>Delegator: 初始化DeleteBuffer
    Delegator->>Delegator: 初始化PKOracle
    QN->>QN: delegators.Insert(channel, delegator)
    
    Note over QC,MQ: 阶段3:创建Pipeline
    QN->>PM: Add(collectionID, channel)
    PM->>Pipeline: NewPipeline(channel, delegator)
    Pipeline->>Pipeline: 创建FilterNode
    Pipeline->>Pipeline: 创建EmbeddingNode(可选)
    Pipeline->>Pipeline: 创建InsertNode
    Pipeline->>Pipeline: 创建DeleteNode
    Pipeline->>Pipeline: 连接Node流程
    
    Note over QC,MQ: 阶段4:加载Growing Segment
    loop 对每个Growing Segment
        QN->>Delegator: LoadGrowing(segmentInfo)
        Delegator->>Delegator: 创建Growing Segment
        Delegator->>Delegator: AddExcludedSegments()
    end
    
    Note over QC,MQ: 阶段5:订阅消息流
    QN->>Pipeline: ConsumeMsgStream(seekPosition)
    Pipeline->>MQ: Subscribe(channel, seekPosition)
    MQ-->>Pipeline: Subscription确认
    
    Note over QC,MQ: 阶段6:启动Pipeline
    QN->>Pipeline: Start()
    Pipeline->>Pipeline: 启动各个Node
    QN->>Delegator: Start()
    Delegator->>Delegator: 设置为Serviceable状态
    
    QN-->>QC: Success
    
    Note over QC,MQ: 阶段7:持续消费DML消息
    loop 持续消费
        MQ->>Pipeline: DML Messages
        Pipeline->>FilterNode: 过滤消息
        FilterNode->>InsertNode: InsertMsg
        FilterNode->>DeleteNode: DeleteMsg
        InsertNode->>Delegator: ProcessInsert()
        DeleteNode->>Delegator: ProcessDelete()
    end

WatchDmChannel 时序图详细说明

阶段1:接收订阅请求(步骤1-2)

  • QueryCoord发送WatchDmChannels请求
  • 包含Channel名称和SeekPosition(从哪个位置开始消费)
  • SeekPosition由Collection的Checkpoint决定

阶段2:创建ShardDelegator(步骤3-7)

  • 为该vChannel创建ShardDelegator实例
  • Distribution:管理Segment分布(哪些Segment在哪些Worker上)
  • DeleteBuffer:缓存Delete消息,应用到Growing Segment
  • PKOracle:BloomFilter,快速判断主键是否在Segment中
  • 将Delegator注册到QueryNode的delegators map中

阶段3:创建Pipeline(步骤8-13)

  • Pipeline是消费DML消息的流水线
  • FilterNode:过滤和验证DML消息
    • 检查消息类型(Insert/Delete/SchemaChange)
    • 验证Segment是否在ExcludedSegments中
    • 转发给下游Node
  • EmbeddingNode(可选):向量嵌入计算(如果Collection启用了Embedding功能)
  • InsertNode:处理Insert消息
    • 将数据写入Growing Segment
    • 更新PKOracle
  • DeleteNode:处理Delete消息
    • 写入DeleteBuffer
    • 转发Delete到StreamingNode(如果启用)

阶段4:加载Growing Segment(步骤14-17)

  • 加载已存在的Growing Segment(未Flush的数据)
  • 将这些Segment加入ExcludedSegments
    • ExcludedSegments:已经Sealed或Dropped,但尚未确认的Segment
    • 防止重复消费数据

阶段5:订阅消息流(步骤18-20)

  • Pipeline订阅MessageQueue的指定Channel
  • 从SeekPosition开始消费
  • 建立消费者连接

阶段6:启动Pipeline(步骤21-24)

  • 启动Pipeline,开始消费消息
  • 启动Delegator,设置为可服务状态
  • 此时可以接收Search/Query请求

阶段7:持续消费DML消息(步骤26-32)

  • Pipeline持续从MessageQueue消费消息
  • 经过FilterNode → EmbeddingNode → InsertNode → DeleteNode
  • 最终调用Delegator的ProcessInsert/ProcessDelete

3.4.2 Pipeline 内部流程图

flowchart LR
    MQ[MessageQueue] -->|MsgPack| Filter[FilterNode]
    
    subgraph Filter Processing
        Filter -->|验证消息| Check{消息类型}
        Check -->|Insert| ValidateInsert[验证SegmentID<br/>是否Excluded]
        Check -->|Delete| ValidateDelete[验证Delete]
        Check -->|SchemaChange| ValidateSchema[验证Schema]
    end
    
    ValidateInsert -->|Pass| Embed[EmbeddingNode<br/>可选]
    ValidateDelete -->|Pass| Embed
    ValidateSchema -->|Pass| Embed
    
    Embed -->|InsertData| Insert[InsertNode]
    Embed -->|DeleteData| Delete[DeleteNode]
    
    subgraph Insert Processing
        Insert -->|groupBySegment| Group[按SegmentID分组]
        Group -->|ProcessInsert| Delegator1[Delegator.ProcessInsert]
        Delegator1 -->|AppendRows| GrowingSeg[Growing Segment]
        Delegator1 -->|UpdatePK| PKOracle[PKOracle<br/>BloomFilter]
    end
    
    subgraph Delete Processing
        Delete -->|批量Delete| Delegator2[Delegator.ProcessDelete]
        Delegator2 -->|Put| DeleteBuffer[DeleteBuffer]
        Delegator2 -->|Forward| StreamingNode[StreamingNode<br/>可选]
    end
    
    style Filter fill:#e1f5ff
    style Insert fill:#e6ffe6
    style Delete fill:#ffe6e6

Pipeline Node详解

  1. FilterNode

    • 功能:过滤和验证DML消息
    • 验证逻辑:
      • 检查SegmentID是否在ExcludedSegments中
      • ExcludedSegments包含已Sealed但尚未确认的Segment
      • 如果在Excluded中,跳过该消息(避免重复)
    • 输出:有效的Insert/Delete/SchemaChange消息
  2. EmbeddingNode(可选):

    • 功能:对文本字段进行向量嵌入
    • 使用场景:Collection启用了Embedding功能
    • 调用Embedding模型将文本转换为向量
    • 输出:带有向量的InsertData
  3. InsertNode

    • 功能:处理Insert消息
    • 处理步骤:
      1. 按SegmentID分组
      2. 调用Delegator.ProcessInsert()
      3. 将数据追加到Growing Segment
      4. 更新PKOracle(BloomFilter)
    • 输出:传递给DeleteNode
  4. DeleteNode

    • 功能:处理Delete消息
    • 处理步骤:
      1. 调用Delegator.ProcessDelete()
      2. 将Delete数据放入DeleteBuffer
      3. 转发到StreamingNode(如果启用)
    • DeleteBuffer作用:
      • 缓存Delete数据
      • 在查询时应用到Growing Segment
      • 在LoadSegment时应用到新加载的Sealed Segment

3.4.3 WatchDmChannel 关键代码分析

(1)QueryNode.WatchDmChannels 入口
// internal/querynodev2/services.go:187
func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
    // 1. 健康检查
    if err := node.lifetime.Add(merr.IsHealthy); err != nil {
        return merr.Status(err), nil
    }
    defer node.lifetime.Done()
    
    // 2. 获取Channel信息
    channel := req.GetInfos()[0]  // 每次只Watch一个Channel
    
    // 3. 注册Collection
    err := node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(), indexMeta, loadMeta)
    defer func() {
        if err != nil {
            node.manager.Collection.Unref(req.GetCollectionID(), 1)
        }
    }()
    
    // 4. 创建ShardDelegator
    delegator, err := delegator.NewShardDelegator(
        ctx,
        req.GetCollectionID(),
        req.GetReplicaID(),
        channel.GetChannelName(),
        req.GetVersion(),
        node.clusterManager,
        node.manager,
        node.loader,
        channel.GetSeekPosition().GetTimestamp(),
        node.queryHook,
        node.chunkManager,
        queryView,
    )
    if err != nil {
        return merr.Status(err), nil
    }
    node.delegators.Insert(channel.GetChannelName(), delegator)
    
    // 5. 创建Pipeline
    pipeline, err := node.pipelineManager.Add(req.GetCollectionID(), channel.GetChannelName())
    if err != nil {
        return merr.Status(err), nil
    }
    
    // 6. 加载Growing Segment
    delegator.AddExcludedSegments(growingInfo)   // 标记已有的Growing
    delegator.AddExcludedSegments(flushedInfo)   // 标记已Flush的
    delegator.AddExcludedSegments(droppedInfo)   // 标记已Drop的
    
    // 7. 加载L0 Segment(Delete Segment)
    err = loadL0Segments(ctx, delegator, req)
    
    // 8. 订阅消息流
    err = pipeline.ConsumeMsgStream(ctx, position)
    if err != nil {
        return merr.Status(err), nil
    }
    
    // 9. 启动
    pipeline.Start()
    delegator.Start()
    
    return merr.Success(), nil
}

关键点

  • 每次只Watch一个Channel
  • ExcludedSegments机制:防止重复消费
  • L0 Segment:专门存储Delete数据的Segment
  • Pipeline和Delegator的启动顺序:先Pipeline后Delegator
(2)Pipeline.NewPipeline:创建Pipeline
// internal/querynodev2/pipeline/pipeline.go:48
func NewPipeLine(
    collection *Collection,
    channel string,
    manager *DataManager,
    dispatcher msgdispatcher.Client,
    delegator delegator.ShardDelegator,
) (Pipeline, error) {
    collectionID := collection.ID()
    
    // 创建Pipeline
    p := &pipeline{
        collectionID:   collectionID,
        StreamPipeline: base.NewPipelineWithStream(dispatcher, nodeCtxTtInterval, enableTtChecker, channel, replicateConfig),
    }
    
    // 创建Node
    filterNode := newFilterNode(collectionID, channel, manager, delegator, pipelineQueueLength)
    embeddingNode, err := newEmbeddingNode(collectionID, channel, manager, pipelineQueueLength)
    insertNode := newInsertNode(collectionID, channel, manager, delegator, pipelineQueueLength)
    deleteNode := newDeleteNode(collectionID, channel, manager, delegator, pipelineQueueLength)
    
    // 连接Node
    if embeddingNode != nil {
        p.Add(filterNode, embeddingNode, insertNode, deleteNode)
    } else {
        p.Add(filterNode, insertNode, deleteNode)
    }
    
    return p, nil
}
(3)FilterNode.Operate:过滤消息
// internal/querynodev2/pipeline/filter_node.go:50
func (fNode *filterNode) Operate(in Msg) Msg {
    streamMsgPack, ok := in.(*msgstream.MsgPack)
    if !ok {
        return nil
    }
    
    collection := fNode.manager.Collection.Get(fNode.collectionID)
    
    out := &insertNodeMsg{
        insertMsgs: []*InsertMsg{},
        deleteMsgs: []*DeleteMsg{},
        timeRange: TimeRange{
            timestampMin: streamMsgPack.BeginTs,
            timestampMax: streamMsgPack.EndTs,
        },
    }
    
    // 过滤每条消息
    for _, msg := range streamMsgPack.Msgs {
        err := fNode.filtrate(collection, msg)
        if err != nil {
            log.Debug("filter invalid message", zap.Error(err))
        } else {
            out.append(msg)  // 添加到输出
        }
    }
    
    // 尝试清理ExcludedSegments
    fNode.delegator.TryCleanExcludedSegments(streamMsgPack.EndTs)
    
    return out
}

func (fNode *filterNode) filtrate(c *Collection, msg msgstream.TsMsg) error {
    switch msg.Type() {
    case commonpb.MsgType_Insert:
        insertMsg := msg.(*msgstream.InsertMsg)
        
        // 检查Segment是否Excluded
        ok := fNode.delegator.VerifyExcludedSegments(insertMsg.SegmentID, insertMsg.EndTimestamp)
        if !ok {
            return merr.WrapErrServiceInternal("skip msg due to segment excluded")
        }
        return nil
        
    case commonpb.MsgType_Delete:
        deleteMsg := msg.(*msgstream.DeleteMsg)
        return nil  // Delete消息总是接受
        
    default:
        return merr.WrapErrParameterInvalid("msgType is Insert or Delete", "not")
    }
}

FilterNode逻辑

  1. 将MsgPack拆分为单个消息
  2. 对每个Insert消息,检查SegmentID是否在ExcludedSegments中
  3. ExcludedSegments包含:
    • 已Sealed但尚未确认的Segment
    • 已Flushed的Segment
    • 已Dropped的Segment
  4. 如果在Excluded中,跳过该消息(避免重复)
  5. Delete消息总是接受(需要应用到所有Segment)
(4)InsertNode.Operate:处理Insert消息
// internal/querynodev2/pipeline/insert_node.go:90
func (iNode *insertNode) Operate(in Msg) Msg {
    nodeMsg := in.(*insertNodeMsg)
    
    if len(nodeMsg.insertMsgs) > 0 {
        // 按SegmentID分组
        nodeMsg.insertDatas = make(map[UniqueID]*delegator.InsertData)
        for _, msg := range nodeMsg.insertMsgs {
            iNode.addInsertData(nodeMsg.insertDatas, msg, collection)
        }
        
        // 调用Delegator处理
        iNode.delegator.ProcessInsert(nodeMsg.insertDatas)
    }
    
    // 传递给DeleteNode
    return &deleteNodeMsg{
        deleteMsgs: nodeMsg.deleteMsgs,
        timeRange:  nodeMsg.timeRange,
    }
}
(5)DeleteNode.Operate:处理Delete消息
// internal/querynodev2/pipeline/delete_node.go:46
func (dNode *deleteNode) Operate(in Msg) Msg {
    nodeMsg := in.(*deleteNodeMsg)
    
    if len(nodeMsg.deleteMsgs) > 0 {
        // 构造DeleteData
        deleteData := make([]*delegator.DeleteData, 0, len(nodeMsg.deleteMsgs))
        for _, msg := range nodeMsg.deleteMsgs {
            deleteData = append(deleteData, &delegator.DeleteData{
                PartitionID:  msg.PartitionID,
                PrimaryKeys:  msg.PrimaryKeys,
                Timestamps:   msg.Timestamps,
                RowCount:     msg.NumRows,
            })
        }
        
        // 调用Delegator处理
        dNode.delegator.ProcessDelete(deleteData, nodeMsg.timeRange.timestampMax)
    }
    
    return nil  // DeleteNode是终点
}

3.5 Delete 处理流程

Delete处理是QueryNode的重要功能,需要将Delete操作应用到所有相关的Segment(包括Growing和Sealed)。

3.5.1 Delete 处理时序图

sequenceDiagram
    autonumber
    participant MQ as MessageQueue
    participant Pipeline as Pipeline.DeleteNode
    participant Delegator as ShardDelegator
    participant DB as DeleteBuffer
    participant GS as Growing Segment
    participant Loader as SegmentLoader
    participant SS as Sealed Segment
    
    Note over MQ,SS: 阶段1:DeleteNode接收Delete消息
    MQ->>Pipeline: DeleteMsg(pks[], tss[])
    Pipeline->>Delegator: ProcessDelete(deleteData, ts)
    
    Note over MQ,SS: 阶段2:写入DeleteBuffer
    Delegator->>DB: Put(deleteData, ts)
    DB->>DB: 存储到List缓存
    DB->>DB: 更新Metrics(缓存大小)
    
    Note over MQ,SS: 阶段3:转发到StreamingNode(可选)
    Delegator->>Delegator: forwardStreamingDeletion()
    
    Note over MQ,SS: 阶段4:LoadSegment时应用Delete
    Loader->>Loader: LoadSegment完成
    Loader->>Delegator: 通知Segment加载完成
    Delegator->>DB: ListAfter(segmentStartTS)
    DB-->>Delegator: deleteRecords[]
    
    loop 对每个Delete记录
        Delegator->>SS: BatchPkExist(pks) - BloomFilter过滤
        SS-->>Delegator: hits[](匹配的主键)
        Delegator->>SS: Delete(matchedPks, tss)
    end
    
    Note over MQ,SS: 阶段5:查询时应用Delete(Growing)
    activate Delegator
    Delegator->>DB: ListAfter(queryTS)
    DB-->>Delegator: deleteRecords[]
    Delegator->>GS: ApplyDelete(deleteRecords)
    GS->>GS: 标记删除的行
    deactivate Delegator
    
    Note over MQ,SS: 阶段6:定期清理DeleteBuffer
    Delegator->>DB: TryDiscard(tSafe)
    DB->>DB: 检查是否Pin
    DB->>DB: 清理tSafe之前的数据

Delete 处理详细说明

阶段1:DeleteNode接收Delete消息(步骤1-2)

  • Pipeline的DeleteNode从MessageQueue消费Delete消息
  • 调用Delegator.ProcessDelete()

阶段2:写入DeleteBuffer(步骤3-5)

  • 将Delete数据写入DeleteBuffer
  • DeleteBuffer是一个List结构,按时间戳排序
  • 更新Metrics,记录缓存大小和行数

阶段3:转发到StreamingNode(步骤6-7)

  • 如果启用了StreamingNode,转发Delete到StreamingNode
  • StreamingNode用于跨QueryNode共享Delete数据

阶段4:LoadSegment时应用Delete(步骤8-15)

  • 当新加载Sealed Segment时,需要应用历史的Delete数据
  • ListAfter(segmentStartTS):获取Segment起始时间戳之后的所有Delete记录
  • 使用BloomFilter快速过滤:
    • 对每个Delete记录的主键列表,调用Segment.BatchPkExist()
    • BloomFilter返回可能匹配的主键
    • 只有匹配的主键才会真正应用Delete
  • 这个过程称为forwardDelete

阶段5:查询时应用Delete(步骤16-20)

  • 查询Growing Segment时,需要应用DeleteBuffer中的Delete数据
  • ListAfter(queryTS):获取查询时间戳之前的所有Delete记录
  • 在查询结果中过滤掉已删除的行

阶段6:定期清理DeleteBuffer(步骤21-23)

  • DeleteBuffer占用内存,需要定期清理
  • TryDiscard(tSafe):清理tSafe之前的数据
  • 清理前检查是否有Segment Pin了该时间戳(防止正在使用)

3.5.2 DeleteBuffer 数据结构

// internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go

type Item struct {
    Ts   uint64           // 时间戳
    Data []BufferItem     // Delete数据
}

type BufferItem struct {
    PartitionID int64
    DeleteData  storage.DeleteData  // {Pks, Tss, RowCount}
}

type listDeleteBuffer struct {
    mut          sync.RWMutex
    list         []*cacheBlock      // 分块存储
    sizePerBlock int64
    rowNum       int64              // 总行数
    size         int64              // 总内存大小
    safeTs       uint64             // 安全时间戳
    pinRecords   map[uint64]int    // Pin计数
}

// Put:添加Delete数据
func (b *listDeleteBuffer) Put(entry *Item) {
    b.mut.Lock()
    defer b.mut.Unlock()
    
    tail := b.list[len(b.list)-1]
    err := tail.Put(entry)
    if errors.Is(err, errBufferFull) {
        // Block满了,创建新Block
        b.list = append(b.list, newCacheBlock(entry.Ts, b.sizePerBlock, entry))
    }
    
    b.rowNum += entry.EntryNum()
    b.size += entry.Size()
    b.updateMetrics()
}

// ListAfter:获取指定时间戳之后的所有Delete记录
func (b *listDeleteBuffer) ListAfter(ts uint64) []*Item {
    b.mut.RLock()
    defer b.mut.RUnlock()
    
    var result []*Item
    for _, block := range b.list {
        result = append(result, block.ListAfter(ts)...)
    }
    return result
}

// TryDiscard:尝试清理旧数据
func (b *listDeleteBuffer) TryDiscard(ts uint64) {
    b.mut.Lock()
    defer b.mut.Unlock()
    
    // 检查是否有Segment Pin了该时间戳
    if b.isPinned(ts) {
        return
    }
    
    b.tryCleanDelete(ts)
}

// Pin/Unpin:防止清理
func (b *listDeleteBuffer) Pin(ts uint64, segmentID int64) {
    b.mut.Lock()
    defer b.mut.Unlock()
    b.pinRecords[ts]++
}

func (b *listDeleteBuffer) Unpin(ts uint64, segmentID int64) {
    b.mut.Lock()
    defer b.mut.Unlock()
    b.pinRecords[ts]--
    if b.pinRecords[ts] <= 0 {
        delete(b.pinRecords, ts)
    }
}

DeleteBuffer设计要点

  1. 分块存储

    • 将Delete数据分成多个Block存储
    • 每个Block有大小限制(默认16MB)
    • 方便清理旧数据
  2. 时间戳排序

    • Delete数据按时间戳升序存储
    • ListAfter(ts)可以高效查询
  3. Pin机制

    • LoadSegment时Pin住对应的时间戳
    • 防止Delete数据在加载过程中被清理
    • 加载完成后Unpin
  4. 内存控制

    • 记录总行数和总内存大小
    • 定期清理tSafe之前的数据
    • 上报Metrics监控

3.5.3 Delete 关键代码分析

(1)Delegator.ProcessDelete:处理Delete消息
// internal/querynodev2/delegator/delegator_data.go:184
func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
    // 加锁防止与LoadSegment并发
    sd.deleteMut.Lock()
    defer sd.deleteMut.Unlock()
    
    // 1. 构造DeleteBuffer Item
    cacheItems := make([]deletebuffer.BufferItem, 0, len(deleteData))
    for _, entry := range deleteData {
        cacheItems = append(cacheItems, deletebuffer.BufferItem{
            PartitionID: entry.PartitionID,
            DeleteData: storage.DeleteData{
                Pks:      entry.PrimaryKeys,
                Tss:      entry.Timestamps,
                RowCount: entry.RowCount,
            },
        })
    }
    
    // 2. 写入DeleteBuffer
    sd.deleteBuffer.Put(&deletebuffer.Item{
        Ts:   ts,
        Data: cacheItems,
    })
    
    // 3. 转发到StreamingNode
    sd.forwardStreamingDeletion(context.Background(), deleteData)
}
(2)forwardDelete:应用Delete到Segment
// internal/querynodev2/delegator/delegator_data.go:638
func (sd *shardDelegator) forwardDelete(ctx context.Context, infos []*querypb.SegmentLoadInfo, targetNodeID int64) error {
    worker, err := sd.workerManager.GetWorker(ctx, targetNodeID)
    if err != nil {
        return err
    }
    
    for _, info := range infos {
        candidate := idCandidates[info.GetSegmentID()]
        
        // 确定Delete Scope
        deleteScope := querypb.DataScope_All
        switch candidate.Type() {
        case commonpb.SegmentState_Sealed:
            deleteScope = querypb.DataScope_Historical
        case commonpb.SegmentState_Growing:
            deleteScope = querypb.DataScope_Streaming
        }
        
        // 创建BufferedForwarder
        bufferedForwarder := NewBufferedForwarder(paramtable.Get().QueryNodeCfg.ForwardBatchSize.GetAsInt64(),
            deleteViaWorker(ctx, worker, targetNodeID, info, deleteScope))
        
        // 列出所有Delete记录
        deleteRecords := sd.deleteBuffer.ListAfter(info.GetStartPosition().GetTimestamp())
        
        // 使用BloomFilter过滤
        for _, entry := range deleteRecords {
            for _, record := range entry.Data {
                // 检查Partition
                if record.PartitionID != common.AllPartitionsID && candidate.Partition() != record.PartitionID {
                    continue
                }
                
                pks := record.DeleteData.Pks
                batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
                
                // 分批应用BloomFilter
                for idx := 0; idx < len(pks); idx += batchSize {
                    endIdx := idx + batchSize
                    if endIdx > len(pks) {
                        endIdx = len(pks)
                    }
                    
                    // BloomFilter快速判断
                    lc := storage.NewBatchLocationsCache(pks[idx:endIdx])
                    hits := candidate.BatchPkExist(lc)
                    
                    // 只转发匹配的主键
                    for i, hit := range hits {
                        if hit {
                            err := bufferedForwarder.Buffer(pks[idx+i], record.DeleteData.Tss[idx+i])
                            if err != nil {
                                return err
                            }
                        }
                    }
                }
            }
        }
        
        // Flush缓冲
        if err := bufferedForwarder.Flush(); err != nil {
            return err
        }
    }
    
    return nil
}

forwardDelete优化

  1. BloomFilter过滤:避免不必要的Delete调用
  2. 批量处理:减少gRPC调用次数
  3. Buffered转发:积累一批Delete后再发送
(3)QueryNode.Delete:应用Delete到Segment
// internal/querynodev2/services.go
func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commonpb.Status, error) {
    // 1. 获取Segment
    segment := node.manager.Segment.Get(req.GetSegmentId())
    if segment == nil {
        return merr.Status(merr.WrapErrSegmentNotFound(req.GetSegmentId())), nil
    }
    
    // 2. 应用Delete
    err := segment.Delete(ctx, req.GetPrimaryKeys(), req.GetTimestamps())
    if err != nil {
        return merr.Status(err), nil
    }
    
    return merr.Success(), nil
}

4. 模块内部交互与数据流

4.1 ShardDelegator 内部结构

flowchart TB
    subgraph ShardDelegator["ShardDelegator(核心协调器)"]
        subgraph Data["数据管理"]
            Dist[Distribution<br/>Segment分布管理]
            PK[PKOracle<br/>BloomFilter]
            DB[DeleteBuffer<br/>Delete缓存]
        end
        
        subgraph Query["查询协调"]
            QC[QueryCoordinator<br/>协调查询任务]
            TS[TSafe Manager<br/>时间戳安全]
            Stats[PartitionStats<br/>统计信息]
        end
        
        subgraph Worker["Worker管理"]
            WM[WorkerManager<br/>Worker池]
            LW[LocalWorker]
            RW[RemoteWorker]
        end
        
        Dist -->|Pin/Unpin| QC
        QC -->|GetSegments| Dist
        QC -->|WaitTSafe| TS
        QC -->|Prune| Stats
        QC -->|DispatchTask| WM
        WM -->|Route| LW
        WM -->|Route| RW
        DB -->|ListAfter| QC
        PK -->|Filter| QC
    end
    
    subgraph External["外部交互"]
        Pipeline[Pipeline<br/>DML消费]
        Loader[SegmentLoader<br/>Segment加载]
        SM[SegmentManager<br/>Segment存储]
    end
    
    Pipeline -->|ProcessInsert| Data
    Pipeline -->|ProcessDelete| DB
    Loader -->|LoadComplete| Dist
    Dist -->|QuerySegments| SM
    
    style ShardDelegator fill:#e1f5ff,stroke:#333,stroke-width:2px
    style Data fill:#fff4e6,stroke:#333,stroke-width:1px
    style Query fill:#e6ffe6,stroke:#333,stroke-width:1px
    style Worker fill:#ffe6e6,stroke:#333,stroke-width:1px

ShardDelegator核心职责

  1. Distribution(Segment分布)

    • 记录每个Segment在哪个Worker上
    • 支持Pin/Unpin版本控制
    • 支持动态更新(SyncDistribution)
    • 用于查询时路由Segment到Worker
  2. PKOracle(主键Oracle)

    • 为每个Segment维护BloomFilter
    • 快速判断主键是否在Segment中
    • 用于Delete操作的过滤
  3. DeleteBuffer(Delete缓存)

    • 缓存Delete消息
    • 支持按时间戳范围查询
    • 支持Pin/Unpin防止清理
    • 定期清理旧数据
  4. TSafe Manager(时间戳安全)

    • 维护Channel的TSafe
    • 查询时等待TSafe到达guaranteeTS
    • 确保查询的一致性
  5. PartitionStats(统计信息)

    • 记录每个Partition的统计信息(min/max值)
    • 用于Segment Pruning
    • 减少不必要的Segment查询
  6. WorkerManager(Worker管理)

    • 管理LocalWorker和RemoteWorker
    • 根据Segment分布路由查询请求
    • 处理Worker不可用的情况

4.2 数据流图

flowchart LR
    subgraph Input["数据输入"]
        MQ[MessageQueue]
        OS[Object Storage]
    end
    
    subgraph Processing["数据处理"]
        Pipeline[Pipeline]
        Loader[SegmentLoader]
    end
    
    subgraph Storage["数据存储"]
        Growing[Growing Segment<br/>内存,可写]
        Sealed[Sealed Segment<br/>内存/Mmap,只读]
        DeleteBuf[DeleteBuffer<br/>内存缓存]
    end
    
    subgraph Query["查询路径"]
        Delegator[ShardDelegator]
        Worker[Worker]
        Search[Search/Query]
    end
    
    MQ -->|DML消息| Pipeline
    Pipeline -->|Insert| Growing
    Pipeline -->|Delete| DeleteBuf
    
    OS -->|Binlog/Index| Loader
    Loader -->|Load| Sealed
    
    Delegator -->|读取| Growing
    Delegator -->|读取| Sealed
    Delegator -->|应用| DeleteBuf
    
    Delegator -->|调度| Worker
    Worker -->|执行| Search
    Search -->|访问| Sealed
    Search -->|访问| Growing
    
    style Input fill:#ffe6e6
    style Processing fill:#fff4e6
    style Storage fill:#e6ffe6
    style Query fill:#e1f5ff

5. 关键设计与优化

5.1 Segment Cache(LRU)

目的:内存有限,无法加载所有Segment

实现

  • LRU缓存策略
  • 内存使用量跟踪
  • 自动淘汰最久未使用的Segment

优化

  • LazyLoad:按需加载Segment
  • Mmap模式:减少内存占用
  • DiskCache:将不常用的Segment放到磁盘

5.2 BloomFilter 优化

用途

  1. PKOracle:判断主键是否在Segment中
  2. Delete过滤:快速过滤不相关的Delete记录

优势

  • 空间效率高:1%误判率只需要约10bits/key
  • 查询速度快:O(k)复杂度,k为hash函数数量
  • 大幅减少不必要的Delete操作

5.3 并发控制

Pin/Unpin机制

  • Distribution Pin:防止查询过程中Segment分布变化
  • DeleteBuffer Pin:防止LoadSegment过程中Delete数据被清理
  • 版本号机制:检测并发修改

锁优化

  • 读写锁:读多写少场景
  • 分段锁:减少锁竞争
  • 无锁数据结构:ConcurrentMap

5.4 Segment Pruning

目的:减少不必要的Segment查询

方法

  1. 统计信息Pruning:

    • 使用PartitionStats的min/max值
    • 过滤不可能包含结果的Segment
  2. BloomFilter Pruning:

    • 使用PKOracle快速判断
    • 过滤不包含指定主键的Segment

效果

  • 减少Segment扫描量50%-90%
  • 降低查询延迟

6. 性能与容量

6.1 性能指标

指标 数值 说明
Search QPS 1000-10000 取决于索引类型和向量维度
Search延迟 P99: 50-200ms HNSW索引,768维向量
Query QPS 5000-20000 标量查询,取决于过滤条件
Load Segment 1GB/10秒 取决于网络带宽和磁盘速度
内存占用 数据量*1.5-2x 包含索引和元数据
DML吞吐 10000-50000 rows/s 取决于字段数量和向量维度

6.2 索引性能对比

测试环境:100万向量,768维,TopK=10

索引 构建时间 查询QPS 查询延迟(P99) 召回率 内存占用
FLAT 0s 10 100ms 100% 3GB
IVF_FLAT 30s 1000 10ms 99% 3GB
HNSW 5分钟 5000 5ms 99.5% 4.5GB
DiskANN 10分钟 2000 20ms 98% 300MB+磁盘

6.3 容量规划

单个QueryNode容量

  • 内存:64GB-256GB
  • 支持Collection数:10-100个
  • 支持Segment数:1000-10000个
  • 支持向量数:1亿-10亿(取决于向量维度)

扩容策略

  • 水平扩容:增加QueryNode数量
  • Segment分片:将大Collection分到多个QueryNode
  • Replica:多副本提高QPS

7. 配置参数

queryNode:
  # 内存配置
  cache:
    enabled: true
    memoryLimit: 64GB          # 最大内存限制
    
  # Growing Segment配置
  growingSegment:
    loadStrategy: "BruteForce"  # FLAT/BruteForce
    sealPolicy:
      rowLimit: 1048576         # 100万行自动Seal
      
  # DeleteBuffer配置
  deleteBuffer:
    sizePerBlock: 16777216      # 16MB per block
    maxSize: 1073741824         # 1GB最大缓存
    
  # 并发配置
  search:
    topKMergeRatio: 10          # TopK合并批次
    segmentParallel: 4          # Segment并发数
    workerPoolSize: 8           # Worker池大小
    
  # 索引配置
  index:
    enableMmap: false           # 是否启用Mmap
    mmapVectorIndex: true       # 向量索引使用Mmap
    mmapScalarIndex: false      # 标量索引不使用Mmap
    loadMode: "LoadAndRelease"  # LoadAndHold/LoadAndRelease
    
  # Scheduler配置
  scheduler:
    policy: "fifo"              # fifo/priority
    maxReadConcurrency: 100     # 最大并发查询数
    
  # LazyLoad配置
  lazyLoad:
    enabled: false              # 是否启用LazyLoad
    waitTime: 30s               # 等待时间
    
  # Segment Pruning配置
  segmentPrune:
    enabled: true               # 是否启用Segment Pruning
    filterRatio: 0.5            # 默认过滤比例

8. 故障处理与容错

8.1 Worker不可用

场景:RemoteWorker对应的QueryNode宕机

处理

  1. Delegator检测到Worker不可用(gRPC错误码Unavailable)
  2. 标记Segment为Offline:markSegmentOffline(segmentIDs...)
  3. 返回错误给Proxy
  4. Proxy重试其他副本或等待QueryCoord重新分配

8.2 Segment加载失败

场景:Object Storage不可用或文件损坏

处理

  1. SegmentLoader返回错误
  2. QueryNode上报心跳时标记Segment加载失败
  3. QueryCoord重新调度到其他QueryNode
  4. 或者重试加载

8.3 内存不足

场景:内存超过限制,无法加载新Segment

处理

  1. SegmentLoader检查内存限制
  2. 触发LRU淘汰机制
  3. 释放最久未使用的Segment
  4. 如果仍不足,返回错误
  5. QueryCoord调度到其他QueryNode

8.4 DeleteBuffer溢出

场景:Delete速率过高,DeleteBuffer占用内存过大

处理

  1. 监控DeleteBuffer大小
  2. 加快清理速度(降低TSafe延迟)
  3. 或者限流Delete操作
  4. 告警通知运维

9. 监控指标

9.1 关键Metrics

// 查询相关
QueryNodeSQCount               // Search/Query总数
QueryNodeSQLatency             // Search/Query延迟
QueryNodeSegmentLatency        // 单Segment查询延迟
QueryNodeReduceLatency         // Reduce延迟

// Segment相关
QueryNodeNumSegments           // Segment数量(按类型)
QueryNodeSegmentSize           // Segment大小
QueryNodeSegmentRowNum         // Segment行数

// DeleteBuffer相关
QueryNodeDeleteBufferRowNum    // DeleteBuffer行数
QueryNodeDeleteBufferSize      // DeleteBuffer内存大小

// Pipeline相关
QueryNodeConsumerMsgCount      // 消费消息数
QueryNodeConsumeTimeTickLag    // 消费延迟(TSafe落后)
QueryNodeWaitProcessingMsgCount // 等待处理的消息数

// 资源相关
QueryNodeMemoryUsage           // 内存使用量
QueryNodeDiskUsage             // 磁盘使用量
QueryNodeCPUUsage              // CPU使用率

9.2 告警规则

  1. 查询延迟过高:P99延迟 > 500ms
  2. 内存使用过高:内存使用率 > 80%
  3. TSafe落后:消费延迟 > 10秒
  4. DeleteBuffer过大:DeleteBuffer大小 > 2GB
  5. Segment加载失败:加载失败率 > 5%