Milvus-04-QueryCoord-概览
1. 模块概述
1.1 职责定义
QueryCoord(查询协调器)是 Milvus 查询子系统的核心协调组件,负责管理所有 QueryNode 节点、协调数据分布、任务调度和负载均衡。
核心职责:
-
QueryNode 生命周期管理
- 节点注册与会话管理
- 心跳监控(默认 1s 间隔)
- 节点资源统计(内存、CPU、Segment/Channel 数量)
- 故障检测与自动恢复(10s 超时)
-
Collection/Partition 加载管理
- LoadCollection/LoadPartition:创建加载计划并分配数据
- ReleaseCollection/ReleasePartition:卸载数据并清理资源
- Segment 分配到 QueryNode(基于负载均衡策略)
- 副本管理(Replica):支持多副本提升查询吞吐和可用性
- ResourceGroup:资源隔离与 quota 管理
-
目标状态管理(Target Management)
- Next Target:从 DataCoord 拉取最新的 Segment/Channel 列表
- Current Target:当前应该被加载的 Segment/Channel 列表
- Target 更新与版本管理
- Partition 级别的 Target 管理
-
实际分布跟踪(Distribution Management)
- 从 QueryNode 心跳收集 Segment/Channel 实际加载情况
- Leader View 管理:每个 Channel 的 Shard Leader 信息
- Growing Segment 跟踪(Streaming 数据)
- Sealed Segment 跟踪(Historical 数据)
-
协调与自动修复(Reconciliation)
- Target vs Distribution Diff:对比目标状态与实际状态
- 自动生成 Load/Release Task 驱动状态收敛
- SegmentChecker:Sealed Segment 协调
- ChannelChecker:DmChannel 协调
- LeaderChecker:Leader View 协调
- BalanceChecker:负载均衡协调
-
负载均衡(Load Balancing)
- Segment 自动迁移(基于多种策略)
- Channel 负载均衡
- 节点间负载均衡(考虑 CPU、内存、Segment 数量)
- 多种 Balancer:RoundRobin、RowCount、ScoreBased、ChannelLevelScore
-
Handoff 协调
- Growing → Sealed Segment 切换(Flush 完成后)
- 从 DataCoord 接收 Segment Seal 通知
- 更新 Target 并触发 QueryNode 加载新 Segment
- 卸载旧的 Growing Segment
-
查询路由信息提供
- ShardLeader 管理(每个 Channel 的查询入口)
- ShowCollections:Collection 加载进度和可用性
- GetShardLeaders:返回 Shard Leader 给 Proxy
- GetReplicas:返回副本信息
1.2 输入/输出与上下游依赖
输入:
- Proxy:LoadCollection/LoadPartition/ReleaseCollection/ReleasePartition 请求
- DataCoord:Segment Seal 通知(Handoff)、Segment/Channel 元数据
- QueryNode:心跳数据(Segment/Channel 分布、Leader View、资源状态)
- etcd/TiKV:持久化的 Collection/Replica/ResourceGroup 元数据
输出:
- QueryNode:Load/Release Segment/Channel 任务
- Proxy:Collection 加载进度、Shard Leader 信息、副本信息
- etcd/TiKV:Collection/Replica/ResourceGroup 元数据变更
- Metrics:负载、性能、状态指标
上下游依赖:
- 依赖 RootCoord:Collection Schema、Partition 列表
- 依赖 DataCoord:RecoveryInfo(Segment/Channel 列表)、IndexInfo
- 依赖 etcd/TiKV:元数据存储
- 被 Proxy 依赖:查询路由信息
1.3 整体架构图
flowchart TB
subgraph External["外部组件"]
Proxy[Proxy]
DataCoord[DataCoord]
RootCoord[RootCoord]
QN1[QueryNode 1]
QN2[QueryNode 2]
QN3[QueryNode N]
ETCD[(etcd/TiKV)]
end
subgraph QueryCoord["QueryCoord Server"]
subgraph API["API Layer"]
gRPC[gRPC Service<br/>services.go]
end
subgraph MetaLayer["元数据层 (Meta)"]
CollMgr[CollectionManager<br/>Collection/Partition元数据]
ReplicaMgr[ReplicaManager<br/>副本管理]
ResourceMgr[ResourceManager<br/>资源组管理]
TargetMgr[TargetManager<br/>目标状态: Next/Current]
DistMgr[DistributionManager<br/>实际分布:Segment/Channel]
NodeMgr[NodeManager<br/>QueryNode会话管理]
end
subgraph Scheduler["调度层"]
JobScheduler[JobScheduler<br/>Load/Release Job]
TaskScheduler[TaskScheduler<br/>Segment/Channel Task]
end
subgraph Heartbeat["心跳层"]
DistController[DistController<br/>分布控制器]
DistHandler[DistHandler<br/>心跳处理器 per QN]
end
subgraph Checker["检查器层 (Reconciliation)"]
CheckerController[CheckerController<br/>协调控制器]
SegmentChecker[SegmentChecker<br/>Segment差异检测]
ChannelChecker[ChannelChecker<br/>Channel差异检测]
LeaderChecker[LeaderChecker<br/>LeaderView差异检测]
BalanceChecker[BalanceChecker<br/>负载均衡检测]
end
subgraph Observer["观察者层"]
CollectionObserver[CollectionObserver<br/>加载进度监控]
TargetObserver[TargetObserver<br/>Target更新]
ReplicaObserver[ReplicaObserver<br/>副本状态监控]
ResourceObserver[ResourceObserver<br/>资源组监控]
end
subgraph Cluster["QueryNode 集群管理"]
ClusterClient[QueryCluster<br/>gRPC Client Pool]
end
Broker[CoordinatorBroker<br/>RootCoord/DataCoord<br/>通信代理]
end
%% API 层交互
Proxy -->|LoadCollection<br/>ShowCollections<br/>GetShardLeaders| gRPC
gRPC -->|Submit Job| JobScheduler
%% Job/Task 调度流程
JobScheduler -->|Execute| CollMgr
JobScheduler -->|Execute| ReplicaMgr
JobScheduler -->|UpdateNextTarget| TargetObserver
TargetObserver -->|Pull RecoveryInfo| Broker
Broker -->|GetRecoveryInfo| DataCoord
Broker -->|GetPartitions<br/>DescribeCollection| RootCoord
TargetObserver -->|Update| TargetMgr
%% Checker 协调流程
CheckerController -->|Trigger Check| SegmentChecker
CheckerController -->|Trigger Check| ChannelChecker
CheckerController -->|Trigger Check| LeaderChecker
CheckerController -->|Trigger Check| BalanceChecker
SegmentChecker -->|Read Target| TargetMgr
SegmentChecker -->|Read Dist| DistMgr
SegmentChecker -->|Generate Tasks| TaskScheduler
ChannelChecker -->|Read Target| TargetMgr
ChannelChecker -->|Read Dist| DistMgr
ChannelChecker -->|Generate Tasks| TaskScheduler
LeaderChecker -->|Read Target| TargetMgr
LeaderChecker -->|Read Dist| DistMgr
LeaderChecker -->|Generate Tasks| TaskScheduler
BalanceChecker -->|Balance Replica| ReplicaMgr
BalanceChecker -->|Generate Tasks| TaskScheduler
%% Task 执行流程
TaskScheduler -->|Dispatch| DistController
DistController -->|Assign to Node| ClusterClient
ClusterClient -->|LoadSegments<br/>ReleaseSegments<br/>WatchDmChannels| QN1
ClusterClient -->|LoadSegments<br/>ReleaseSegments<br/>WatchDmChannels| QN2
ClusterClient -->|LoadSegments<br/>ReleaseSegments<br/>WatchDmChannels| QN3
%% 心跳处理流程
DistHandler -->|GetDataDistribution<br/>1s周期| QN1
DistHandler -->|GetDataDistribution<br/>1s周期| QN2
DistHandler -->|GetDataDistribution<br/>1s周期| QN3
DistHandler -->|Update Distribution| DistMgr
DistHandler -->|Update HeartBeat| NodeMgr
%% Observer 监控流程
CollectionObserver -->|Check LoadStatus<br/>200ms周期| DistMgr
CollectionObserver -->|Check LoadStatus<br/>200ms周期| TargetMgr
CollectionObserver -->|UpdateLoadPercent| CollMgr
CollectionObserver -->|Trigger Check| CheckerController
%% 元数据持久化
CollMgr <-->|Persist| ETCD
ReplicaMgr <-->|Persist| ETCD
ResourceMgr <-->|Persist| ETCD
%% Handoff 流程
DataCoord -.->|SegmentChangeInfo<br/>Handoff通知| TargetObserver
style QueryCoord fill:#e1f5ff,stroke:#333,stroke-width:2px
style External fill:#fff4e6,stroke:#333,stroke-width:1px
style MetaLayer fill:#e6ffe6,stroke:#333,stroke-width:1px
style Scheduler fill:#ffe6f0,stroke:#333,stroke-width:1px
style Checker fill:#f0e6ff,stroke:#333,stroke-width:1px
style Observer fill:#fff0e6,stroke:#333,stroke-width:1px
架构图说明:
-
API Layer(API 层)
- 接收来自 Proxy 的 Load/Release/Show 等请求
- 创建 Job 提交给 JobScheduler
-
Meta Layer(元数据层)
- CollectionManager:管理 Collection/Partition 的加载状态和元数据
- ReplicaManager:管理副本分配和节点归属
- ResourceManager:管理 ResourceGroup 和节点资源分配
- TargetManager:维护 Next Target 和 Current Target
- Next Target:最新从 DataCoord 拉取的目标状态
- Current Target:当前正在协调的目标状态
- DistributionManager:从心跳收集的实际 Segment/Channel 分布
- NodeManager:管理 QueryNode 会话和资源信息
-
Scheduler Layer(调度层)
- JobScheduler:处理用户级 Job(LoadCollection、ReleaseCollection)
- TaskScheduler:处理底层 Task(LoadSegment、ReleaseSegment、WatchChannel)
-
Heartbeat Layer(心跳层)
- DistController:管理所有 QueryNode 的 DistHandler
- DistHandler(每个 QueryNode 一个):
- 定期(1s)拉取 QueryNode 的数据分布
- 更新 DistributionManager
- 更新 NodeManager 的心跳时间和资源统计
-
Checker Layer(检查器层/协调层)
- CheckerController:定期(默认 5s)触发所有 Checker
- SegmentChecker:检测 Sealed Segment 差异,生成 Load/Release Task
- ChannelChecker:检测 DmChannel 差异,生成 Watch/Unwatch Task
- LeaderChecker:检测 Leader View 差异,更新路由信息
- BalanceChecker:检测负载不均衡,生成 Balance Task
-
Observer Layer(观察者层)
- CollectionObserver:监控 Collection 加载进度(200ms 周期)
- TargetObserver:监控 Target 更新和 Handoff 事件(500ms 周期)
- ReplicaObserver:监控副本状态(1s 周期)
- ResourceObserver:监控资源组状态(1s 周期)
-
协调流程(Reconciliation Loop)
Target (应该加载什么) ⬇ Diff Distribution (实际加载了什么) ⬇ Generate Tasks (LoadSegment/ReleaseSegment) ⬇ Execute QueryNode (执行加载/卸载) ⬇ Report Distribution (更新实际状态)
1.4 核心概念详解
1.4.1 Target(目标状态)
定义:Target 表示 Collection/Partition 应该加载哪些 Segment 和 Channel,是系统的期望状态。
两级 Target:
-
Next Target:
- 从 DataCoord 拉取的最新 Segment/Channel 列表
- Handoff 时首先更新 Next Target
- Collection Load 时首先写入 Next Target
-
Current Target:
- 正在协调的目标状态
- Checker 使用 Current Target 进行差异检测
- Next Target 协调完成后提升为 Current Target
Target 数据结构:
type CollectionTarget struct {
segments map[int64]*datapb.SegmentInfo // SegmentID -> SegmentInfo
dmChannels map[string]*DmChannel // ChannelName -> DmChannel
partitionIDs []int64 // Partition列表
version int64 // Target版本号
}
Target 更新触发条件:
- LoadCollection/LoadPartition:创建新 Target
- Handoff:DataCoord 通知 Segment Seal,更新 Next Target
- Refresh:手动刷新 Target
1.4.2 Distribution(实际分布)
定义:Distribution 表示 QueryNode 实际加载的 Segment 和 Channel,是系统的当前状态。
数据来源:
- 通过 DistHandler 定期(1s)从每个 QueryNode 拉取
GetDataDistribution - 包含:Sealed Segments、Growing Segments、DmChannels、Leader Views
Distribution 数据结构:
// SegmentDistManager: 管理Segment分布
type SegmentDistManager struct {
segments map[int64]nodeSegments // NodeID -> Segments
}
// ChannelDistManager: 管理Channel分布
type ChannelDistManager struct {
channels map[int64]nodeChannels // NodeID -> Channels
}
// Segment包含版本号,用于检测更新
type Segment struct {
SegmentInfo *datapb.SegmentInfo
Node int64 // 所在QueryNode
Version int64 // 版本号(用于Leader选举)
LastDeltaTimestamp int64 // 最后Delta加载时间
}
1.4.3 Reconciliation(协调/状态收敛)
定义:Reconciliation 是对比 Target 和 Distribution 的差异,生成并执行 Task 使 Distribution 收敛到 Target 的过程。
协调循环:
flowchart LR
Target[Target<br/>目标状态<br/>应该加载什么] -->|Diff| Checker[Checker<br/>差异检测]
Dist[Distribution<br/>实际状态<br/>已经加载什么] -->|Diff| Checker
Checker -->|生成Task| TaskScheduler[TaskScheduler]
TaskScheduler -->|执行Task| QN[QueryNode]
QN -->|LoadSegments<br/>ReleaseSegments| Storage[(Object Storage)]
QN -->|心跳上报<br/>1s周期| DistHandler[DistHandler]
DistHandler -->|更新Distribution| Dist
style Target fill:#e6ffe6
style Dist fill:#ffe6e6
style Checker fill:#e6f3ff
style TaskScheduler fill:#fff0e6
Checker 类型:
-
SegmentChecker:
- Lacks(缺失):Target 有但 Distribution 没有 → 生成 LoadSegment Task
- Redundancies(冗余):Distribution 有但 Target 没有 → 生成 ReleaseSegment Task
- Repeated(重复):同一 Segment 在多个节点上 → 生成 ReleaseSegment Task(保留 Leader 所在节点)
-
ChannelChecker:
- Lacks:Target 有但 Distribution 没有 → 生成 WatchDmChannel Task
- Redundancies:Distribution 有但 Target 没有 → 生成 UnsubDmChannel Task
-
LeaderChecker:
- 检测 Leader View 与实际 Segment 分布的差异
- 生成 LeaderSegmentTask 更新 Delegator 的路由表
-
BalanceChecker:
- 检测节点间负载不均衡(Segment 数量、行数、内存)
- 生成 Balance Task 迁移 Segment/Channel
协调间隔:
- CheckerController:默认 5s 触发一次
- CollectionObserver:默认 200ms 检查一次加载进度
1.4.4 Job vs Task
Job(用户级操作):
- LoadCollectionJob
- LoadPartitionJob
- ReleaseCollectionJob
- ReleasePartitionJob
Task(执行单元):
- LoadSegmentTask:加载一批 Segment 到指定 QueryNode
- ReleaseSegmentTask:从指定 QueryNode 释放一批 Segment
- WatchChannelTask:订阅一个 DmChannel
- UnsubChannelTask:取消订阅一个 DmChannel
- LeaderSegmentTask:更新 Leader View
Job → Task 分解示例:
LoadCollectionJob (collection=100, replica=2)
├── WatchDmChannelTask (QN1, channel=dml0)
├── WatchDmChannelTask (QN2, channel=dml1)
├── LoadSegmentTask (QN1, segment=[1,2,3]) # Replica 1
├── LoadSegmentTask (QN2, segment=[4,5,6]) # Replica 1
├── LoadSegmentTask (QN3, segment=[1,2,3]) # Replica 2 (副本)
└── LoadSegmentTask (QN4, segment=[4,5,6]) # Replica 2 (副本)
1.4.5 Replica(副本)
定义:Replica 是 Collection 的一个完整副本,包含所有 Shard 的数据。
副本结构:
type Replica struct {
CollectionID int64
ID int64 // ReplicaID
Nodes []int64 // 归属的QueryNode
ResourceGroup string // 所属资源组
ChannelNodeMap map[string][]int64 // Channel → 负责的Node
}
多副本作用:
- 提升查询吞吐:查询可以路由到多个副本并行执行
- 提升可用性:某个副本故障时可以切换到其他副本
- 负载隔离:不同副本可以分配到不同 ResourceGroup
Shard Leader:
- 每个 DmChannel 在每个 Replica 中有一个 Leader(Delegator)
- Leader 负责接收查询请求并协调 Segment 读取
- Leader 选举规则:Version 最大且 Serviceable 的 QueryNode
1.4.6 ResourceGroup(资源组)
定义:ResourceGroup 是 QueryNode 的逻辑分组,用于资源隔离和 quota 管理。
用途:
- 租户隔离:不同租户使用不同的 ResourceGroup
- 优先级隔离:高优先级和低优先级 Collection 分配到不同 ResourceGroup
- 资源配额:限制每个 ResourceGroup 的节点数和资源量
默认资源组:
__default_resource_group:默认所有 QueryNode 归属此资源组
2. gRPC API 列表与接口定义
2.1 Collection 加载管理 API
2.1.1 LoadCollection
接口定义:
rpc LoadCollection(LoadCollectionRequest) returns (common.Status)
message LoadCollectionRequest {
int64 collectionID = 1;
int32 replicaNumber = 2; // 副本数,默认1
repeated string resourceGroups = 3; // 资源组列表
bool refresh = 4; // 是否刷新模式
repeated int64 fieldIndexIDs = 5; // 要加载的索引
}
功能:加载 Collection 的所有 Partition 到 QueryNode。
关键代码路径:
服务入口: internal/querycoordv2/services.go: LoadCollection()
├─> 创建 LoadCollectionJob
├─> JobScheduler.Add(job)
├─> JobScheduler.process(job)
│ ├─> job.PreExecute(): 校验参数
│ ├─> job.Execute():
│ │ ├─> broker.GetPartitions(): 从RootCoord获取Partition列表
│ │ ├─> SpawnReplicas(): 创建副本
│ │ ├─> CollectionManager.PutCollection(): 保存Collection元数据
│ │ ├─> TargetObserver.UpdateNextTarget(): 更新NextTarget
│ │ └─> CollectionObserver.LoadCollection(): 注册加载任务
│ └─> job.PostExecute(): 清理(失败时回滚)
│
└─> CollectionObserver定期检查加载进度并更新LoadPercentage
时序图详解见 2.2 LoadCollection 完整时序图
2.1.2 ReleaseCollection
接口定义:
rpc ReleaseCollection(ReleaseCollectionRequest) returns (common.Status)
message ReleaseCollectionRequest {
int64 collectionID = 1;
}
功能:卸载 Collection 的所有数据并释放资源。
关键代码路径:
服务入口: internal/querycoordv2/services.go: ReleaseCollection()
├─> 创建 ReleaseCollectionJob
├─> JobScheduler.Add(job)
├─> job.Execute():
│ ├─> TargetObserver.ReleaseCollection(): 移除Target
│ ├─> CollectionManager.RemoveCollection(): 移除元数据
│ ├─> ReplicaManager.RemoveCollection(): 移除副本
│ └─> (Checker会自动生成ReleaseSegment Task)
└─> 等待所有Segment卸载完成
2.1.3 LoadPartitions
接口定义:
rpc LoadPartitions(LoadPartitionsRequest) returns (common.Status)
message LoadPartitionsRequest {
int64 collectionID = 1;
repeated int64 partitionIDs = 2;
int32 replicaNumber = 3;
repeated string resourceGroups = 4;
}
功能:加载 Collection 的指定 Partition。
2.1.4 ReleasePartitions
接口定义:
rpc ReleasePartitions(ReleasePartitionsRequest) returns (common.Status)
message ReleasePartitionsRequest {
int64 collectionID = 1;
repeated int64 partitionIDs = 2;
}
功能:卸载 Collection 的指定 Partition。
2.2 查询信息 API
2.2.1 ShowCollections
接口定义:
rpc ShowCollections(ShowCollectionsRequest) returns (ShowCollectionsResponse)
message ShowCollectionsResponse {
repeated int64 collectionIDs = 1;
repeated int64 inMemoryPercentages = 2; // 加载进度 0-100
repeated bool queryServiceAvailable = 3; // 是否可查询
repeated int64 refreshProgress = 4; // 刷新进度
}
功能:查询 Collection 的加载状态和进度。
加载进度计算:
// internal/querycoordv2/meta/collection_manager.go
func (m *CollectionManager) CalculateLoadPercentage(collectionID int64) int32 {
// 1. 计算所有Partition的加载进度
// 2. 加权平均(按Segment数量)
// 3. 返回 0-100 的百分比
}
2.2.2 GetShardLeaders
接口定义:
rpc GetShardLeaders(GetShardLeadersRequest) returns (GetShardLeadersResponse)
message GetShardLeadersResponse {
repeated ShardLeadersList shards = 1;
}
message ShardLeadersList {
string channel_name = 1;
repeated int64 node_ids = 2; // Leader节点ID列表
repeated string node_addrs = 3; // Leader节点地址列表
}
功能:返回 Collection 每个 Shard 的 Leader 信息,供 Proxy 路由查询请求。
Leader 选举规则:
// internal/querycoordv2/meta/channel_dist_manager.go: GetShardLeader()
// 1. 筛选该Channel在该Replica中的所有节点
// 2. 优先选择Serviceable(Channel状态正常)的节点
// 3. 相同Serviceable状态下,选择Version最大的节点
// 4. 返回Leader节点
2.2.3 GetReplicas
接口定义:
rpc GetReplicas(GetReplicasRequest) returns (GetReplicasResponse)
message GetReplicasResponse {
repeated ReplicaInfo replicas = 1;
}
message ReplicaInfo {
int64 replicaID = 1;
int64 collectionID = 2;
repeated int64 nodeIds = 3; // 归属节点
repeated ShardReplica shardReplicas = 4; // Shard详情
string resourceGroupName = 5;
}
功能:查询 Collection 的副本信息。
2.3 负载均衡 API
2.3.1 LoadBalance
接口定义:
rpc LoadBalance(LoadBalanceRequest) returns (common.Status)
message LoadBalanceRequest {
int64 collectionID = 1;
int64 srcNodeID = 2; // 源节点
repeated int64 dstNodeIDs = 3; // 目标节点列表
repeated int64 segmentIDs = 4; // 要迁移的Segment
bool sealed_only = 5; // 是否只迁移Sealed Segment
}
功能:手动触发 Segment 迁移。
2.3.2 TransferSegment / TransferChannel
手动迁移指定的 Segment 或 Channel。
2.4 ResourceGroup 管理 API
2.4.1 CreateResourceGroup
rpc CreateResourceGroup(CreateResourceGroupRequest) returns (common.Status)
message CreateResourceGroupRequest {
string resource_group = 1;
milvus.ResourceGroupConfig config = 2;
}
2.4.2 UpdateResourceGroups
更新 ResourceGroup 配置。
2.4.3 TransferNode / TransferReplica
在 ResourceGroup 之间转移节点或副本。
3. 核心流程详解
3.1 LoadCollection 完整流程
3.1.1 LoadCollection 总览时序图
sequenceDiagram
autonumber
participant P as Proxy
participant QC as QueryCoord API
participant JS as JobScheduler
participant Job as LoadCollectionJob
participant Broker as CoordinatorBroker
participant RC as RootCoord
participant DC_ext as DataCoord
participant CollMgr as CollectionManager
participant RepMgr as ReplicaManager
participant TO as TargetObserver
participant TM as TargetManager
participant CO as CollectionObserver
participant CC as CheckerController
participant SC as SegmentChecker
participant CHC as ChannelChecker
participant TS as TaskScheduler
participant DistCtrl as DistController
participant QN as QueryNode
participant ETCD as etcd/TiKV
%% ===== Phase 1: 接收请求 =====
rect rgb(255, 240, 230)
Note over P,QC: Phase 1: 接收LoadCollection请求
P->>+QC: LoadCollection(collectionID=100, replicaNumber=2, resourceGroups=["rg1"])
QC->>QC: 健康检查
QC->>QC: 处理默认参数<br/>(replica默认1, rg默认default)
QC->>Job: 创建LoadCollectionJob
QC->>JS: JobScheduler.Add(job)
QC-->>-P: Status(Success, "job submitted")
end
%% ===== Phase 2: Job执行 - PreExecute =====
rect rgb(230, 255, 230)
Note over JS,RC: Phase 2: Job PreExecute - 参数校验
JS->>+Job: job.PreExecute()
Job->>Broker: DescribeCollection(collectionID)
Broker->>RC: DescribeCollection(collectionID)
RC-->>Broker: Collection Schema + VirtualChannels
Broker-->>Job: Collection Info
Job->>Job: 校验Collection状态
Job->>Job: 校验replica和resourceGroup参数
Job-->>-JS: PreExecute OK
end
%% ===== Phase 3: Job执行 - Execute =====
rect rgb(230, 240, 255)
Note over JS,ETCD: Phase 3: Job Execute - 创建元数据
JS->>+Job: job.Execute()
%% 获取Partition列表
Job->>Broker: GetPartitions(collectionID)
Broker->>RC: GetPartitions(collectionID)
RC-->>Broker: PartitionIDs=[1,2,3]
Broker-->>Job: PartitionIDs
%% 创建Replica
Job->>RepMgr: SpawnReplicas(collectionID, replicaNum=2, resourceGroups)
RepMgr->>RepMgr: AllocateNodes from ResourceGroup
RepMgr->>ETCD: SaveReplicas
ETCD-->>RepMgr: OK
RepMgr-->>Job: Replicas=[{ID:1,Nodes:[QN1,QN2]},{ID:2,Nodes:[QN3,QN4]}]
%% 保存Collection元数据
Job->>CollMgr: PutCollection(collection, partitions, loadType=LoadCollection, status=Loading)
CollMgr->>ETCD: SaveCollection + Partitions
ETCD-->>CollMgr: OK
CollMgr-->>Job: OK
%% 更新NextTarget
Job->>TO: UpdateNextTarget(collectionID)
TO->>Broker: GetRecoveryInfoV2(collectionID, partitionIDs)
Broker->>DC_ext: GetRecoveryInfoV2(collectionID, partitionIDs)
DC_ext-->>Broker: VchannelInfos + SegmentInfos
Broker-->>TO: RecoveryInfo
TO->>TO: 构造CollectionTarget:<br/>- dmChannels=[dml0,dml1]<br/>- segments=[seg1,seg2,...seg100]
TO->>TM: next.updateCollectionTarget(collectionID, target)
TM-->>TO: NextTarget Updated
TO-->>Job: OK
%% 注册加载任务到CollectionObserver
Job->>CO: LoadCollection(collectionID)
CO->>CO: loadTasks.Insert(traceID, LoadTask)
CO->>CC: CheckerController.Check()
CO-->>Job: OK
Job-->>-JS: Execute OK
end
%% ===== Phase 4: 协调循环 - ChannelChecker =====
rect rgb(255, 240, 255)
Note over CC,QN: Phase 4: ChannelChecker - 加载DmChannel
CC->>+CHC: ChannelChecker.Check()
CHC->>TM: GetDmChannels(collectionID, CurrentTarget)
TM-->>CHC: TargetChannels=[]
CHC->>TM: GetDmChannels(collectionID, NextTarget)
TM-->>CHC: TargetChannels=[dml0,dml1]
CHC->>CHC: 对比Target vs Distribution<br/>发现缺失: [dml0,dml1]
loop For each Replica
CHC->>CHC: 为每个Channel分配节点<br/>Replica1: dml0->QN1, dml1->QN2<br/>Replica2: dml0->QN3, dml1->QN4
CHC->>TS: Add WatchDmChannelTask(QN1, dml0)
CHC->>TS: Add WatchDmChannelTask(QN2, dml1)
CHC->>TS: Add WatchDmChannelTask(QN3, dml0)
CHC->>TS: Add WatchDmChannelTask(QN4, dml1)
end
CHC-->>-CC: Tasks Generated
%% TaskScheduler执行Task
TS->>DistCtrl: Dispatch Tasks to Nodes
DistCtrl->>QN: WatchDmChannels(channels=[dml0], seekPositions)
QN->>QN: 订阅DmChannel<br/>创建Delegator
QN-->>DistCtrl: OK
DistCtrl-->>TS: Task Success
end
%% ===== Phase 5: 协调循环 - SegmentChecker =====
rect rgb(230, 255, 255)
Note over CC,QN: Phase 5: SegmentChecker - 加载Segment
CC->>+SC: SegmentChecker.Check()
SC->>TM: GetSealedSegments(collectionID, CurrentTarget)
TM-->>SC: TargetSegments={}
SC->>TM: GetSealedSegments(collectionID, NextTarget)
TM-->>SC: TargetSegments={seg1,seg2,...,seg100}
SC->>SC: 对比Target vs Distribution<br/>发现缺失: [seg1,seg2,...,seg100]
loop For each Channel
SC->>SC: 按Channel分组Segment<br/>dml0: [seg1-seg50]<br/>dml1: [seg51-seg100]
SC->>SC: Balancer分配节点<br/>考虑节点负载、内存
SC->>TS: Add LoadSegmentTask(QN1, segments=[seg1-seg25])
SC->>TS: Add LoadSegmentTask(QN2, segments=[seg26-seg50])
SC->>TS: Add LoadSegmentTask(QN3, segments=[seg51-seg75])
SC->>TS: Add LoadSegmentTask(QN4, segments=[seg76-seg100])
end
SC-->>-CC: Tasks Generated
%% TaskScheduler执行Task
TS->>DistCtrl: Dispatch Tasks to Nodes
loop For each QueryNode
DistCtrl->>QN: LoadSegments(segments, loadScope=Historical)
QN->>QN: 从ObjectStorage加载Segment<br/>加载Index<br/>构建内存结构
QN-->>DistCtrl: OK
end
DistCtrl-->>TS: All Tasks Success
end
%% ===== Phase 6: 心跳上报 =====
rect rgb(255, 255, 230)
Note over DistCtrl,TM: Phase 6: 心跳上报 - 更新Distribution
loop 每1秒
DistCtrl->>QN: GetDataDistribution()
QN-->>DistCtrl: {segments, channels, leaderViews}
DistCtrl->>DistCtrl: DistributionManager.Update(nodeID, segments, channels)
end
end
%% ===== Phase 7: CollectionObserver监控 =====
rect rgb(240, 255, 240)
Note over CO,CollMgr: Phase 7: CollectionObserver - 监控加载进度
loop 每200ms
CO->>CO: observeLoadStatus(collectionID)
CO->>TM: GetTargetSegments + GetTargetChannels
TM-->>CO: TargetSegments + TargetChannels
CO->>CO: DistMgr.GetSegments + GetChannels
CO->>CO: 计算加载进度:<br/>(已加载Segment数 / 总Segment数) * 100
alt 加载进度=100%
CO->>TO: TargetObserver.Check(collectionID)
TO->>TO: 将NextTarget提升为CurrentTarget
TO->>TM: current.updateCollectionTarget(collectionID, target)
TM-->>TO: CurrentTarget Updated
TO-->>CO: OK
CO->>CollMgr: UpdateCollectionStatus(collectionID, status=Loaded, loadPercent=100)
CollMgr->>ETCD: SaveCollection
ETCD-->>CollMgr: OK
CollMgr-->>CO: OK
CO->>CO: loadTasks.Remove(traceID)
Note over CO: 加载完成,停止监控
else 加载进度<100%
CO->>CollMgr: UpdateLoadPercent(collectionID, loadPercent)
CollMgr-->>CO: OK
end
end
end
%% ===== Phase 8: 返回结果 =====
rect rgb(255, 245, 230)
Note over P,QC: Phase 8: Proxy查询加载状态
loop Proxy轮询
P->>QC: ShowCollections(collectionID)
QC->>CollMgr: GetLoadPercentage(collectionID)
CollMgr-->>QC: LoadPercentage=100%, QueryServiceAvailable=true
QC-->>P: ShowCollectionsResponse(inMemoryPercentage=100)
alt LoadPercentage=100%
Note over P: Load完成
else LoadPercentage<100%
Note over P: 继续轮询
end
end
end
3.1.2 LoadCollection 流程说明
Phase 1: 接收请求
- Proxy 调用
LoadCollectiongRPC - QueryCoord 执行健康检查,确保服务可用
- 处理默认参数:
replicaNumber默认 1,resourceGroups默认["__default_resource_group"] - 创建
LoadCollectionJob并提交到JobScheduler - 立即返回成功(异步执行)
Phase 2: Job PreExecute - 参数校验
关键代码:
// internal/querycoordv2/job/job_load.go
func (job *LoadCollectionJob) PreExecute() error {
// 1. 从RootCoord获取Collection信息
collInfo, err := job.broker.DescribeCollection(job.ctx, job.req.GetCollectionID())
// 2. 校验Collection是否已经加载
collection := job.meta.GetCollection(job.ctx, job.req.GetCollectionID())
if collection != nil {
// 如果replica或resourceGroup变化,转换为UpdateLoadConfigJob
// 否则视为重复加载
}
// 3. 校验replica和resourceGroup参数
if job.req.GetReplicaNumber() <= 0 {
return errors.New("replica number must be positive")
}
return nil
}
Phase 3: Job Execute - 创建元数据
关键代码:
// internal/querycoordv2/job/job_load.go
func (job *LoadCollectionJob) Execute() error {
// 1. 获取Partition列表
partitionIDs, err := job.broker.GetPartitions(job.ctx, job.req.GetCollectionID())
// 2. 创建Replica(首次加载)
replicas := job.meta.ReplicaManager.GetByCollection(job.ctx, req.GetCollectionID())
if len(replicas) == 0 {
replicas, err = utils.SpawnReplicasWithRG(
job.ctx, job.meta,
req.GetCollectionID(),
req.GetResourceGroups(),
req.GetReplicaNumber(),
job.collInfo.GetVirtualChannelNames(),
priority)
// SpawnReplicasWithRG内部:
// - 从ResourceGroup选择可用的QueryNode
// - 为每个Replica分配足够的Node
// - 为每个DmChannel分配RW Node
// - 持久化到etcd
}
// 3. 保存Collection元数据
collection := &meta.Collection{
CollectionID: req.GetCollectionID(),
ReplicaNumber: req.GetReplicaNumber(),
Status: querypb.LoadStatus_Loading,
LoadPercentage: 0,
CreatedAt: time.Now(),
}
err = job.meta.CollectionManager.PutCollection(job.ctx, collection, partitions...)
// 4. 更新NextTarget(从DataCoord拉取最新数据)
_, err = job.targetObserver.UpdateNextTarget(req.GetCollectionID())
// UpdateNextTarget内部:
// - 调用DataCoord.GetRecoveryInfoV2获取VchannelInfo和SegmentInfo
// - 构造CollectionTarget
// - 写入TargetManager.next
// 5. 注册到CollectionObserver
job.collectionObserver.LoadCollection(ctx, req.GetCollectionID())
return nil
}
Phase 4: ChannelChecker - 加载 DmChannel
关键代码:
// internal/querycoordv2/checkers/channel_checker.go
func (c *ChannelChecker) Check(ctx context.Context) []task.Task {
collectionIDs := c.meta.GetAll(ctx)
for _, collectionID := range collectionIDs {
replicas := c.meta.ReplicaManager.GetByCollection(ctx, collectionID)
for _, replica := range replicas {
// 获取Target中的Channel
targetChannels := c.targetMgr.GetDmChannelsByCollection(ctx, collectionID, meta.NextTargetFirst)
// 获取Distribution中的Channel
distChannels := c.dist.ChannelDistManager.GetByCollectionAndFilter(collectionID, meta.WithReplica(replica))
// Diff: 找出缺失的Channel
lacks := findLacks(targetChannels, distChannels)
for _, channel := range lacks {
// 为该Channel选择一个Node(从Replica的RWNodes中选择)
nodeID := selectNodeForChannel(replica, channel)
// 生成WatchDmChannelTask
task := createWatchChannelTask(nodeID, channel)
tasks = append(tasks, task)
}
// Diff: 找出多余的Channel
redundancies := findRedundancies(targetChannels, distChannels)
for _, channel := range redundancies {
// 生成UnsubDmChannelTask
task := createUnsubChannelTask(channel.Node, channel)
tasks = append(tasks, task)
}
}
}
return tasks
}
Phase 5: SegmentChecker - 加载 Segment
关键代码:
// internal/querycoordv2/checkers/segment_checker.go
func (c *SegmentChecker) getSealedSegmentDiff(
ctx context.Context,
collectionID int64,
replicaID int64,
) (toLoad []*datapb.SegmentInfo, loadPriorities []commonpb.LoadPriority, toRelease []*meta.Segment) {
// 1. 获取NextTarget中的Segment
nextTargetMap := c.targetMgr.GetSealedSegmentsByCollection(ctx, collectionID, meta.NextTarget)
// 2. 获取CurrentTarget中的Segment
currentTargetMap := c.targetMgr.GetSealedSegmentsByCollection(ctx, collectionID, meta.CurrentTarget)
// 3. 获取Distribution中的Segment
dist := c.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithReplica(replica))
distMap := makeDistMap(dist)
// 4. Lacks: NextTarget有但Distribution没有
for segID, segment := range nextTargetMap {
if _, existInDist := distMap[segID]; !existInDist {
// 判断优先级: 如果在CurrentTarget中,优先级为HIGH;否则为NORMAL
if _, existOnCurrent := currentTargetMap[segID]; existOnCurrent {
loadPriorities = append(loadPriorities, commonpb.LoadPriority_HIGH)
} else {
loadPriorities = append(loadPriorities, replica.LoadPriority())
}
toLoad = append(toLoad, segment)
}
}
// 5. Redundancies: Distribution有但NextTarget和CurrentTarget都没有
for segID, segment := range distMap {
_, existOnNext := nextTargetMap[segID]
_, existOnCurrent := currentTargetMap[segID]
if !existOnNext && !existOnCurrent {
toRelease = append(toRelease, segment)
}
}
return toLoad, loadPriorities, toRelease
}
func (c *SegmentChecker) createSegmentLoadTasks(
ctx context.Context,
segments []*datapb.SegmentInfo,
loadPriorities []commonpb.LoadPriority,
replica *meta.Replica,
) []task.Task {
// 1. 按Channel分组Segment
shardSegments := lo.GroupBy(segments, func(s *datapb.SegmentInfo) string {
return s.GetInsertChannel()
})
// 2. 为每个Shard的Segment分配Node
plans := []balance.SegmentAssignPlan{}
for shard, segments := range shardSegments {
// 获取该Channel的RWNodes
rwNodes := replica.GetChannelRWNodes(shard)
if len(rwNodes) == 0 {
rwNodes = replica.GetRWNodes()
}
// 使用Balancer分配Segment到Node
shardPlans := c.getBalancerFunc().AssignSegment(ctx, replica.GetCollectionID(), segments, rwNodes, true)
for i := range shardPlans {
shardPlans[i].Replica = replica
shardPlans[i].LoadPriority = loadPriorities[i]
}
plans = append(plans, shardPlans...)
}
// 3. 从Plan生成Task
tasks := balance.CreateSegmentTasksFromPlans(ctx, c.ID(), timeout, plans)
return tasks
}
Phase 6: 心跳上报 - 更新 Distribution
关键代码:
// internal/querycoordv2/dist/dist_handler.go
func (dh *distHandler) pullDist(ctx context.Context) {
// 1. 调用QueryNode的GetDataDistribution
resp, err := dh.client.GetDataDistribution(ctx, dh.nodeID, &querypb.GetDataDistributionRequest{})
// 2. 更新Segment分布
updates := []*meta.Segment{}
for _, s := range resp.GetSegments() {
segmentInfo := dh.target.GetSealedSegment(ctx, s.GetCollection(), s.GetID(), meta.CurrentTargetFirst)
updates = append(updates, &meta.Segment{
SegmentInfo: segmentInfo,
Node: resp.GetNodeID(),
Version: s.GetVersion(),
LastDeltaTimestamp: s.GetLastDeltaTimestamp(),
})
}
dh.dist.SegmentDistManager.Update(resp.GetNodeID(), updates...)
// 3. 更新Channel分布
channels := []*meta.DmChannel{}
for _, lview := range resp.GetLeaderViews() {
// 从Target获取Channel信息
channelInfo := dh.target.GetDmChannel(ctx, lview.GetCollection(), lview.GetChannel(), meta.CurrentTarget)
// 构造DmChannel(包含LeaderView)
dmChannel := &meta.DmChannel{
VchannelInfo: channelInfo.VchannelInfo,
Node: resp.GetNodeID(),
Version: lview.GetVersion(),
}
channels = append(channels, dmChannel)
}
dh.dist.ChannelDistManager.Update(resp.GetNodeID(), channels...)
// 4. 更新NodeManager的心跳时间
node := dh.nodeManager.Get(resp.GetNodeID())
node.SetLastHeartbeat(time.Now())
node.UpdateStats(
session.WithSegmentCnt(len(resp.GetSegments())),
session.WithChannelCnt(len(resp.GetChannels())),
)
}
Phase 7: CollectionObserver - 监控加载进度
关键代码:
// internal/querycoordv2/observers/collection_observer.go
func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) {
ob.loadTasks.Range(func(traceID string, task LoadTask) bool {
collection := ob.meta.GetCollection(ctx, task.CollectionID)
partitions := ob.meta.GetPartitionsByCollection(ctx, task.CollectionID)
// 1. 计算Channel加载进度
channelTargetNum, subChannelCount := ob.observeChannelStatus(ctx, task.CollectionID)
// 2. 计算每个Partition的加载进度
for _, partition := range partitions {
replicaNum := ob.meta.GetReplicaNumber(ctx, partition.GetCollectionID())
// 获取Target中的Segment数量
targetSegments := ob.targetMgr.GetSealedSegmentsByPartition(ctx, partition.CollectionID, partition.PartitionID, meta.NextTargetFirst)
targetSegmentNum := len(targetSegments)
// 获取Distribution中已加载的Segment数量
distSegments := ob.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(partition.CollectionID), meta.WithPartition(partition.PartitionID))
loadedSegmentNum := len(distSegments) / replicaNum
// 计算加载进度
if targetSegmentNum == 0 {
loadPercentage = 100 // 只有Channel,没有Segment
} else {
loadPercentage = int32(loadedSegmentNum * 100 / targetSegmentNum)
}
// 更新Partition加载进度
if loadPercentage == 100 {
// 提升NextTarget到CurrentTarget
ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID)
}
ob.meta.CollectionManager.UpdatePartitionLoadPercent(ctx, partition.PartitionID, loadPercentage)
}
// 3. 更新Collection加载进度
collectionPercentage, _ := ob.meta.CollectionManager.UpdateCollectionLoadPercent(ctx, task.CollectionID)
// 4. 如果全部加载完成,移除LoadTask
if collectionPercentage == 100 {
ob.loadTasks.Remove(traceID)
}
return true
})
}
关键设计要点:
- 异步执行:LoadCollection 立即返回,后台异步协调加载
- 两级 Target:NextTarget 用于接收更新,CurrentTarget 用于协调,避免协调过程中的数据不一致
- 优先级加载:CurrentTarget 中的 Segment 优先级为 HIGH,NextTarget 新增的 Segment 优先级为 NORMAL
- 自动协调:Checker 持续对比 Target 和 Distribution,自动生成 Task 驱动收敛
- 进度计算:基于 Segment 数量计算加载进度,Replica 数量影响进度计算
- 最终一致性:通过心跳不断更新 Distribution,最终与 Target 保持一致
3.2 Handoff 流程
触发场景:DataNode Flush 完成,Segment 从 Growing 变为 Sealed
3.2.1 Handoff 时序图
sequenceDiagram
autonumber
participant DN as DataNode
participant DC as DataCoord
participant TO as TargetObserver
participant TM as TargetManager
participant CC as CheckerController
participant SC as SegmentChecker
participant LC as LeaderChecker
participant TS as TaskScheduler
participant QN as QueryNode
participant Dist as DistManager
%% ===== Phase 1: Flush完成,通知Handoff =====
rect rgb(255, 240, 230)
Note over DN,DC: Phase 1: DataNode Flush Segment
DN->>DN: FlushSegment(segmentID)
DN->>DN: 上传Segment到ObjectStorage
DN->>DC: SaveBinlogPaths(segmentID, binlogPaths)
DC->>DC: 更新Segment状态: Growing → Sealed
DC->>DC: 生成SegmentChangeInfo
end
%% ===== Phase 2: QueryCoord接收Handoff通知 =====
rect rgb(230, 255, 230)
Note over DC,TM: Phase 2: 更新NextTarget
DC->>TO: NotifySegmentChangeInfo(collectionID, segmentChangeInfo)
Note over TO: segmentChangeInfo包含:<br/>- sealedSegments: [segID1, segID2]<br/>- growingSegments: [segID3, segID4]
TO->>TO: 处理Handoff事件
TO->>DC: GetRecoveryInfoV2(collectionID, partitionIDs)
DC-->>TO: 最新的SegmentInfos + VchannelInfos
TO->>TO: 构造新的CollectionTarget<br/>包含新Sealed的Segment
TO->>TM: next.updateCollectionTarget(collectionID, newTarget)
TM->>TM: 更新NextTarget<br/>+ Sealed Segments<br/>+ Growing Segments (if new)
TM-->>TO: NextTarget Updated (version++)
end
%% ===== Phase 3: SegmentChecker协调 =====
rect rgb(230, 240, 255)
Note over CC,TS: Phase 3: SegmentChecker加载新Sealed Segment
CC->>SC: SegmentChecker.Check()
SC->>TM: GetSealedSegments(collectionID, NextTarget)
TM-->>SC: NextTargetSegments (包含新Sealed Segment)
SC->>TM: GetSealedSegments(collectionID, CurrentTarget)
TM-->>SC: CurrentTargetSegments (不包含新Segment)
SC->>Dist: GetSegments(collectionID)
Dist-->>SC: DistributionSegments (不包含新Segment)
SC->>SC: Diff: 发现缺失新Sealed Segment
SC->>SC: 为新Segment分配Node<br/>(与该Segment的Channel在同一节点)
loop For each new Sealed Segment
SC->>TS: Add LoadSegmentTask(nodeID, segmentID, priority=HIGH)
Note over TS: priority=HIGH因为该Segment<br/>在CurrentTarget中存在对应的Growing
end
SC-->>CC: LoadSegmentTasks Created
TS->>QN: LoadSegments(segments=[newSealedSeg], loadScope=Historical)
QN->>QN: 从ObjectStorage加载Sealed Segment
QN->>QN: 加载Index
QN-->>TS: LoadSegment Success
end
%% ===== Phase 4: 心跳上报,更新Distribution =====
rect rgb(255, 240, 255)
Note over QN,Dist: Phase 4: 心跳上报新Segment
QN->>TO: GetDataDistribution()
Note over QN: Distribution包含:<br/>- Sealed: [newSealedSeg]<br/>- Growing: [oldGrowingSeg] (对应的旧Segment)
TO->>Dist: DistManager.Update(nodeID, segments)
Dist->>Dist: 更新Distribution<br/>+ newSealedSeg
end
%% ===== Phase 5: CollectionObserver提升CurrentTarget =====
rect rgb(230, 255, 255)
Note over TO,TM: Phase 5: NextTarget → CurrentTarget
loop CollectionObserver每200ms
TO->>TO: observeLoadStatus(collectionID)
TO->>TM: GetTargetSegments(NextTarget)
TO->>Dist: GetDistSegments()
TO->>TO: 检查加载进度:<br/>新Sealed Segment已加载完成
alt 新Segment全部加载完成
TO->>TO: Check(collectionID)
TO->>TO: 将NextTarget提升为CurrentTarget
TO->>TM: current.updateCollectionTarget(collectionID, nextTarget)
TM->>TM: CurrentTarget更新<br/>+ Sealed Segments<br/>- 对应的Growing Segments
TM-->>TO: CurrentTarget Updated
end
end
end
%% ===== Phase 6: SegmentChecker释放旧Growing Segment =====
rect rgb(255, 255, 230)
Note over CC,QN: Phase 6: 释放旧Growing Segment
CC->>SC: SegmentChecker.Check()
SC->>TM: GetGrowingSegments(collectionID, CurrentTarget)
TM-->>SC: CurrentTarget不再包含旧Growing Segment
SC->>Dist: GetGrowingSegments(collectionID)
Dist-->>SC: Distribution仍包含旧Growing Segment
SC->>SC: Diff: 发现多余的Growing Segment
loop For each old Growing Segment
SC->>TS: Add ReleaseSegmentTask(nodeID, growingSegID, scope=Streaming)
end
SC-->>CC: ReleaseSegmentTasks Created
TS->>QN: ReleaseSegments(segments=[oldGrowingSeg], scope=Streaming)
QN->>QN: 从内存释放Growing Segment
QN-->>TS: ReleaseSegment Success
end
%% ===== Phase 7: LeaderChecker更新LeaderView =====
rect rgb(240, 255, 240)
Note over CC,QN: Phase 7: 更新LeaderView路由表
CC->>LC: LeaderChecker.Check()
LC->>Dist: GetLeaderView(channel)
Dist-->>LC: LeaderView仍指向oldGrowingSeg
LC->>TM: GetSealedSegments(CurrentTarget)
TM-->>LC: CurrentTarget包含newSealedSeg
LC->>Dist: GetDistSegments(newSealedSeg)
Dist-->>LC: newSealedSeg已加载在nodeX
LC->>LC: Diff: LeaderView需要更新<br/>oldGrowingSeg → newSealedSeg
LC->>TS: Add LeaderSegmentTask(leaderNode, +newSealedSeg, -oldGrowingSeg)
TS->>QN: SyncDistribution(actions=[+newSealedSeg, -oldGrowingSeg])
QN->>QN: Delegator更新路由表<br/>查询请求将路由到newSealedSeg
QN-->>TS: SyncDistribution Success
end
Note over DN,QN: Handoff完成!<br/>查询将使用新Sealed Segment<br/>旧Growing Segment已释放
3.2.2 Handoff 流程说明
Phase 1: DataNode Flush Segment
- DataNode 将 Growing Segment Flush 到 ObjectStorage
- 调用 DataCoord 的
SaveBinlogPaths保存 Binlog 路径 - DataCoord 更新 Segment 状态从 Growing 变为 Sealed
- DataCoord 生成
SegmentChangeInfo事件
Phase 2: 更新 NextTarget
- DataCoord 通过 Watch 机制通知 QueryCoord 的 TargetObserver
- TargetObserver 接收到 Handoff 事件后,重新从 DataCoord 拉取最新的 RecoveryInfo
- 构造新的 CollectionTarget,包含新 Sealed 的 Segment
- 更新 NextTarget,版本号递增
Phase 3-4: 加载新 Sealed Segment
- SegmentChecker 检测到 NextTarget 有新 Segment 但 Distribution 没有
- 为新 Segment 生成 LoadSegmentTask(优先级 HIGH,因为对应的 Growing Segment 在 CurrentTarget 中)
- QueryNode 从 ObjectStorage 加载 Sealed Segment 和 Index
- 心跳上报新 Segment,更新 Distribution
Phase 5: 提升 NextTarget 到 CurrentTarget
- CollectionObserver 检测到新 Segment 全部加载完成
- 调用 TargetObserver.Check() 提升 NextTarget 到 CurrentTarget
- CurrentTarget 更新后,不再包含旧的 Growing Segment
Phase 6: 释放旧 Growing Segment
- SegmentChecker 检测到 CurrentTarget 不再包含旧 Growing Segment
- 但 Distribution 仍包含旧 Growing Segment(多余)
- 生成 ReleaseSegmentTask(scope=Streaming)
- QueryNode 从内存释放 Growing Segment
Phase 7: 更新 LeaderView 路由表
- LeaderChecker 检测到 LeaderView 仍指向旧 Growing Segment
- 而 CurrentTarget 和 Distribution 已包含新 Sealed Segment
- 生成 LeaderSegmentTask 更新 Delegator 的路由表
- Delegator 将查询请求路由到新 Sealed Segment
关键设计要点:
- 两级 Target 保证平滑切换:NextTarget 先更新,加载完成后再提升为 CurrentTarget
- 优先级 HIGH:Handoff 的 Segment 优先加载,保证查询一致性
- 无缝切换:新 Sealed Segment 加载完成后才释放旧 Growing Segment
- LeaderView 更新:确保查询请求路由到正确的 Segment
- 幂等性:Handoff 可以重复执行,不会产生副作用
3.3 负载均衡流程
触发条件:
- 节点间 Segment 数量差异 > 配置阈值(默认 100)
- 节点内存使用率差异 > 配置阈值
- 手动触发 Balance API(LoadBalance/TransferSegment/TransferChannel)
- 新节点加入或节点下线
3.3.1 负载均衡时序图
sequenceDiagram
autonumber
participant BC as BalanceChecker
participant Balancer as Balancer (ScoreBased)
participant RepMgr as ReplicaManager
participant NodeMgr as NodeManager
participant DistMgr as DistManager
participant TS as TaskScheduler
participant SrcQN as SourceQueryNode
participant DstQN as TargetQueryNode
participant Dist as DistHandler
%% ===== Phase 1: 负载检测 =====
rect rgb(255, 240, 230)
Note over BC,NodeMgr: Phase 1: 检测负载不均衡
loop 每60秒
BC->>BC: BalanceChecker.Check()
BC->>RepMgr: GetAllReplicas()
RepMgr-->>BC: Replicas
loop For each Replica
BC->>NodeMgr: GetNodes(replica)
NodeMgr-->>BC: Nodes + Stats<br/>(SegmentCnt, Memory, CPU)
BC->>DistMgr: GetSegments(replica)
DistMgr-->>BC: SegmentDistribution
BC->>BC: 计算节点负载分数<br/>score = f(segmentCnt, memory, cpu)
BC->>BC: 检查不平衡:<br/>maxScore - minScore > threshold
alt 不平衡
BC->>Balancer: BalanceReplica(replica)
Note over BC: 继续Phase 2
else 平衡
Note over BC: 跳过该Replica
end
end
end
end
%% ===== Phase 2: 生成Balance Plan =====
rect rgb(230, 255, 230)
Note over Balancer,DistMgr: Phase 2: 生成Balance Plan
Balancer->>NodeMgr: GetNodes(replica)
NodeMgr-->>Balancer: Nodes + Stats
Balancer->>DistMgr: GetSegments(replica)
DistMgr-->>Balancer: SegmentDistribution
Balancer->>Balancer: 1. 计算每个节点的负载分数<br/>score = segmentCnt * W1 + rowCnt * W2 + memory * W3
Balancer->>Balancer: 2. 排序节点:<br/>sourceNodes (高负载) vs targetNodes (低负载)
loop While 不平衡 && 可迁移
Balancer->>Balancer: 3. 从sourceNode选择Segment<br/>- 选择最大的Segment<br/>- 考虑Channel亲和性
Balancer->>Balancer: 4. 选择targetNode<br/>- 负载最低的节点<br/>- 同一Channel的Segment优先同一节点
Balancer->>Balancer: 5. 生成SegmentAssignPlan:<br/>{Segment, From: srcNode, To: dstNode, Replica}
Balancer->>Balancer: 6. 模拟迁移,更新负载分数
Balancer->>Balancer: 7. 检查是否平衡:<br/>maxScore - minScore <= threshold
end
Balancer-->>BC: BalancePlans=[{seg1: QN1→QN3}, {seg2: QN1→QN4}]
end
%% ===== Phase 3: 创建Balance Task =====
rect rgb(230, 240, 255)
Note over BC,TS: Phase 3: 创建Balance Task
loop For each BalancePlan
BC->>TS: CreateSegmentTask(plan)<br/>Actions: [Load(dstNode, segID), Release(srcNode, segID)]
Note over TS: Priority: Low (避免影响正常Load)
TS-->>BC: Task Created
end
end
%% ===== Phase 4: 执行Balance Task =====
rect rgb(255, 240, 255)
Note over TS,DstQN: Phase 4: 目标节点加载Segment
TS->>DstQN: LoadSegments(segments=[seg1], loadScope=Historical)
DstQN->>DstQN: 从ObjectStorage加载Segment
DstQN->>DstQN: 加载Index
DstQN-->>TS: LoadSegment Success
Note over TS: 等待Segment在目标节点加载完成<br/>并通过心跳上报
loop 等待心跳
Dist->>DstQN: GetDataDistribution()
DstQN-->>Dist: Distribution (包含seg1)
Dist->>DistMgr: Update(DstQN, segments)
DistMgr->>DistMgr: 更新Distribution:<br/>DstQN加载了seg1
end
TS->>TS: 检查seg1在DstQN上加载成功
end
%% ===== Phase 5: 释放源节点Segment =====
rect rgb(230, 255, 255)
Note over TS,SrcQN: Phase 5: 源节点释放Segment
alt 目标节点加载成功
TS->>SrcQN: ReleaseSegments(segments=[seg1], loadScope=Historical)
SrcQN->>SrcQN: 从内存释放Segment
SrcQN-->>TS: ReleaseSegment Success
loop 等待心跳
Dist->>SrcQN: GetDataDistribution()
SrcQN-->>Dist: Distribution (不再包含seg1)
Dist->>DistMgr: Update(SrcQN, segments)
DistMgr->>DistMgr: 更新Distribution:<br/>SrcQN释放了seg1
end
Note over TS: Balance成功
else 目标节点加载失败
Note over TS: 回滚: 不释放源节点<br/>保持原状态
end
end
%% ===== Phase 6: 更新LeaderView =====
rect rgb(255, 255, 230)
Note over TS,DstQN: Phase 6: LeaderChecker更新路由
Note over TS: LeaderChecker会自动检测Segment位置变化<br/>并更新Delegator的路由表
end
Note over BC,DstQN: Balance完成!<br/>seg1从QN1迁移到QN3<br/>负载更加均衡
3.3.2 负载均衡策略
1. ScoreBasedBalancer(基于分数的均衡器)
计算节点负载分数:
// internal/querycoordv2/balance/score_based_balancer.go
func (b *ScoreBasedBalancer) calculateScore(nodeID int64) int64 {
node := b.nodeManager.Get(nodeID)
segments := b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID))
// 1. Segment数量分数
segmentScore := int64(len(segments)) * segmentScoreWeight
// 2. 行数分数
var totalRows int64
for _, seg := range segments {
totalRows += seg.GetNumOfRows()
}
rowScore := totalRows * rowScoreWeight
// 3. 内存分数
memoryScore := node.MemoryUsage * memoryScoreWeight
return segmentScore + rowScore + memoryScore
}
选择要迁移的 Segment:
func (b *ScoreBasedBalancer) selectSegmentToMove(srcNode int64) *meta.Segment {
segments := b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(srcNode))
// 优先选择:
// 1. 最大的Segment(减少迁移次数)
// 2. 同一Channel的Segment优先同一节点(提升查询性能)
sort.Slice(segments, func(i, j int) bool {
return segments[i].GetNumOfRows() > segments[j].GetNumOfRows()
})
return segments[0]
}
2. RowCountBasedBalancer(基于行数的均衡器)
基于行数而非 Segment 数量进行均衡,更适合 Segment 大小差异较大的场景。
3. ChannelLevelScoreBalancer(Channel 级别的均衡器)
按 Channel 维度进行均衡,确保同一 Channel 的 Segment 尽量在同一节点,提升查询性能。
4. MultiTargetBalancer(多目标均衡器)
同时考虑多个目标(Segment 数量、行数、内存、CPU),更加智能。
3.3.3 负载均衡关键代码
// internal/querycoordv2/checkers/balance_checker.go
func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
// 1. 获取所有Replica
collections := b.meta.GetAll(ctx)
var replicaIDs []int64
for _, collection := range collections {
replicas := b.meta.ReplicaManager.GetByCollection(ctx, collection.GetCollectionID())
for _, replica := range replicas {
replicaIDs = append(replicaIDs, replica.GetID())
}
}
// 2. 生成Balance Plan
segmentPlans, channelPlans := b.generateBalanceTasksFromReplicas(ctx, replicaIDs, config)
// 3. 创建Task
tasks := []task.Task{}
for _, plan := range segmentPlans {
actions := []task.Action{
task.NewSegmentAction(plan.To, task.ActionTypeGrow, plan.Segment.GetInsertChannel(), plan.Segment.GetID()),
task.NewSegmentAction(plan.From, task.ActionTypeReduce, plan.Segment.GetInsertChannel(), plan.Segment.GetID()),
}
t, err := task.NewSegmentTask(ctx, timeout, balance.BalanceReason, plan.Replica.GetCollectionID(), plan.Replica, actions...)
if err == nil {
t.SetPriority(task.TaskPriorityLow) // 低优先级,避免影响Load
t.SetReason("segment unbalanced")
tasks = append(tasks, t)
}
}
return tasks
}
func (b *BalanceChecker) generateBalanceTasksFromReplicas(ctx context.Context, replicas []int64, config balanceConfig) ([]balance.SegmentAssignPlan, []balance.ChannelAssignPlan) {
segmentPlans := []balance.SegmentAssignPlan{}
channelPlans := []balance.ChannelAssignPlan{}
for _, rid := range replicas {
replica := b.meta.ReplicaManager.Get(ctx, rid)
if replica == nil {
continue
}
// 调用Balancer生成Plan
sPlans, cPlans := b.getBalancerFunc().BalanceReplica(ctx, replica)
segmentPlans = append(segmentPlans, sPlans...)
channelPlans = append(channelPlans, cPlans...)
}
return segmentPlans, channelPlans
}
关键设计要点:
- 优先加载后释放:先在目标节点加载成功,再释放源节点,保证可用性
- 低优先级:Balance Task 优先级低,避免影响正常的 Load 操作
- 可回滚:如果目标节点加载失败,保持原状态,不释放源节点
- 多种策略:支持多种 Balancer 策略,可根据场景选择
- Channel 亲和性:同一 Channel 的 Segment 优先分配到同一节点,提升查询性能
- 自动触发:默认每 60s 检查一次,自动执行均衡
4. 心跳处理与分布状态更新
4.1 心跳处理机制
DistController 为每个 QueryNode 创建一个 DistHandler,定期拉取数据分布。
// internal/querycoordv2/dist/dist_controller.go
type DistController struct {
handlers map[int64]*distHandler // NodeID -> DistHandler
cluster session.Cluster
nodeMgr *session.NodeManager
dist *meta.DistributionManager
targetMgr meta.TargetManagerInterface
}
func (dc *DistController) StartDistHandler(nodeID int64) {
handler := &distHandler{
nodeID: nodeID,
client: dc.cluster,
nodeManager: dc.nodeMgr,
dist: dc.dist,
target: dc.targetMgr,
}
dc.handlers[nodeID] = handler
go handler.start(dc.ctx) // 启动心跳goroutine
}
心跳流程:
- 每 1s 调用 QueryNode 的
GetDataDistributiongRPC - 解析返回的 Segment 和 Channel 信息
- 更新 DistributionManager 的 SegmentDistManager 和 ChannelDistManager
- 更新 NodeManager 的心跳时间和资源统计
优化机制:
- LastModifyTs 检查:QueryNode 返回数据变更时间戳,未变更时跳过更新
- 心跳超时检测:超过 10s 未收到心跳,标记节点为 Offline
4.2 DistributionManager 工作机制
SegmentDistManager:
type SegmentDistManager struct {
segments map[int64]nodeSegments // NodeID -> Segments
}
func (m *SegmentDistManager) Update(nodeID int64, segments ...*Segment) {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
// 为每个Segment设置Node属性
for _, segment := range segments {
segment.Node = nodeID
}
// 更新该节点的Segment列表
m.segments[nodeID] = composeNodeSegments(segments)
}
ChannelDistManager:
type ChannelDistManager struct {
channels map[int64]nodeChannels // NodeID -> Channels
collectionIndex map[int64][]*DmChannel // CollectionID -> Channels (索引)
}
func (m *ChannelDistManager) Update(nodeID int64, channels ...*DmChannel) []*DmChannel {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
// 检测新Serviceable的Channel(用于触发Checker)
newServiceableChannels := []DmChannel{}
for _, channel := range channels {
channel.Node = nodeID
old, ok := m.channels[nodeID].nameChannel[channel.GetChannelName()]
if channel.IsServiceable() && (!ok || !old.IsServiceable()) {
newServiceableChannels = append(newServiceableChannels, channel)
}
}
m.channels[nodeID] = composeNodeChannels(channels...)
m.updateCollectionIndex() // 更新二级索引
return newServiceableChannels
}
5. 性能与容量
5.1 性能指标
| 指标 | 数值 | 说明 |
|---|---|---|
| LoadCollection 延迟 | 数据量 / 加载速度 | 10GB 约 10-30 秒(取决于 QueryNode 数量和带宽) |
| Handoff 延迟 | P99: 3-5 秒 | Segment 从 Growing 切换到 Sealed |
| Balance 延迟 | 数据量 / 带宽 | 1GB 约 5-10 秒(单 Segment 迁移) |
| HeartBeat 处理吞吐 | > 1000 节点 | 每个节点 1s 间隔 |
| Checker 协调周期 | 5s | SegmentChecker / ChannelChecker / LeaderChecker |
| CollectionObserver 周期 | 200ms | 加载进度监控 |
| BalanceChecker 周期 | 60s | 负载均衡检查 |
5.2 容量规划
| 维度 | 容量 | 说明 |
|---|---|---|
| QueryNode 数量 | 1000+ | 单 QueryCoord 可管理 |
| Loaded Collection 数量 | 100+ | 并发加载 |
| Segment 数量 | 100 万+ | 内存占用约 5GB(每个 Segment 元数据约 5KB) |
| 并发 Task 数量 | 256 (可配置) | TaskScheduler 并发上限 |
| etcd 元数据大小 | < 100MB | Collection / Replica / ResourceGroup 元数据 |
5.3 性能优化建议
- 增加 QueryNode 数量:提升并行加载能力
- 使用 ResourceGroup:隔离高优先级和低优先级 Collection
- 调整 Replica 数量:根据查询 QPS 和可用性需求调整
- 优化 Segment 大小:512MB-1GB 为宜,避免过小或过大
- 使用 ChannelLevelScoreBalancer:提升查询性能(Channel 亲和性)
- 调整 Checker 周期:生产环境可适当增大(减少协调开销)
6. 配置参数
queryCoord:
# ==== 心跳配置 ====
heartbeatAvailableTime: 10000 # QueryNode 心跳超时时间(ms),默认 10s
distPullInterval: 1000 # 拉取 Distribution 间隔(ms),默认 1s
checkExecutedFlagInterval: 100 # 检查任务执行标志间隔(ms)
# ==== 加载配置 ====
loadTimeoutSeconds: 1800 # LoadCollection 超时时间(s),默认 30分钟
collectionObserverInterval: 200 # CollectionObserver 检查间隔(ms)
# ==== Checker 配置 ====
checkInterval: 5000 # Checker 触发间隔(ms),默认 5s
segmentCheckInterval: 1000 # SegmentChecker 间隔(ms)
channelCheckInterval: 1000 # ChannelChecker 间隔(ms)
balanceCheckInterval: 60000 # BalanceChecker 间隔(ms),默认 60s
leaderCheckInterval: 1000 # LeaderChecker 间隔(ms)
# ==== 负载均衡配置 ====
balancer: "ScoreBasedBalancer" # Balancer 策略: ScoreBased / RowCountBased / ChannelLevelScore / MultiTarget
balanceSegmentCntThreshold: 100 # 触发 Balance 的 Segment 数量差异阈值
balanceScoreThreshold: 0.1 # 触发 Balance 的分数差异阈值(百分比)
autoBalance: true # 是否自动负载均衡
# ==== 任务调度配置 ====
taskExecutionCap: 256 # TaskScheduler 最大并发 Task 数
segmentTaskTimeout: 120000 # Segment Task 超时时间(ms),默认 2分钟
channelTaskTimeout: 60000 # Channel Task 超时时间(ms),默认 1分钟
# ==== Handoff 配置 ====
handoffSegmentNum: 4 # 每次 Handoff 处理的 Segment 数量
# ==== Broker 配置 ====
brokerTimeout: 5000 # 与 RootCoord / DataCoord 通信超时(ms)
# ==== Target 配置 ====
targetObserverInterval: 500 # TargetObserver 检查间隔(ms)
targetUpdateInterval: 60000 # Target 定期更新间隔(ms)
# ==== Session 配置 ====
sessionTTL: 60 # QueryCoord Session TTL(s),默认 60s
sessionRetryTimes: 30 # Session 注册重试次数
7. 异常处理与容错
7.1 QueryNode 下线处理
检测:
- 心跳超时(10s)
- 节点 Session 过期
处理流程:
- NodeManager 标记节点为 Offline
- SegmentChecker 检测到该节点的 Segment 在 CurrentTarget 中但 Distribution 中缺失
- 为缺失的 Segment 生成 LoadSegmentTask,分配到其他节点
- ChannelChecker 同样处理缺失的 Channel
7.2 LoadCollection 失败处理
超时机制:
- 默认 30 分钟超时
- CollectionObserver 检测超时后自动回滚
回滚流程:
func (job *LoadCollectionJob) PostExecute() {
if job.Error() != nil {
job.undo.RollBack()
// RollBack包括:
// 1. 移除 Collection 元数据
// 2. 移除 Replica
// 3. 移除 Target
// 4. Checker 会自动生成 ReleaseSegment Task
}
}
7.3 Handoff 失败处理
失败场景:
- DataCoord 通知丢失
- QueryNode 加载 Sealed Segment 失败
恢复机制:
- TargetObserver 定期刷新:每 60s 重新拉取 Target,确保最终一致
- Checker 自动重试:SegmentChecker 持续检测 Target vs Distribution 差异
- 幂等性:重复加载同一 Segment 不会产生副作用
7.4 Balance 失败处理
失败场景:
- 目标节点加载失败
- 源节点释放失败
回滚机制:
- 优先加载后释放:只有目标节点加载成功后才释放源节点
- 保持可用性:失败时保持原状态,不影响查询
8. 最佳实践与运维建议
8.1 Collection 加载优化
-
合理设置副本数:
- 高 QPS 场景:replica >= 2
- 高可用场景:replica >= 3
-
使用 ResourceGroup 隔离:
# 高优先级 Collection LoadCollection(collectionID, replicaNumber=2, resourceGroups=["rg_high"]) # 低优先级 Collection LoadCollection(collectionID, replicaNumber=1, resourceGroups=["rg_low"]) -
批量加载:
- 避免频繁 Load/Release
- 使用 LoadPartition 代替 LoadCollection(数据量小时)
8.2 负载均衡优化
-
选择合适的 Balancer:
- Segment 大小均匀:
ScoreBasedBalancer - Segment 大小差异大:
RowCountBasedBalancer - 查询性能优先:
ChannelLevelScoreBalancer
- Segment 大小均匀:
-
调整 Balance 阈值:
# 生产环境建议 balanceSegmentCntThreshold: 200 # 增大阈值,减少不必要的迁移 balanceCheckInterval: 120000 # 2分钟检查一次 -
手动触发 Balance:
# 手动迁移 Segment client.load_balance( src_node_id=1, dst_node_ids=[2, 3], segment_ids=[101, 102, 103], sealed_only=True )
8.3 监控指标
关键指标:
querycoord_collection_load_percentage:Collection 加载进度querycoord_segment_num_in_target:Target 中的 Segment 数量querycoord_segment_num_in_dist:Distribution 中的 Segment 数量querycoord_task_num:Task 队列长度querycoord_checker_latency:Checker 执行延迟querycoord_balance_segment_num:Balance 迁移的 Segment 数量
告警建议:
- LoadCollection 超过 30 分钟未完成
- Task 队列积压 > 1000
- Checker 执行延迟 > 10s
- 节点心跳丢失 > 3 次
8.4 故障排查
LoadCollection 卡住:
- 检查 CollectionObserver 日志:
grep "observeLoadStatus" querycoord.log - 检查 TargetManager:是否正确拉取 RecoveryInfo
- 检查 SegmentChecker:是否生成 LoadSegmentTask
- 检查 QueryNode:是否成功加载 Segment
Handoff 延迟高:
- 检查 TargetObserver 日志:是否收到 SegmentChangeInfo
- 检查网络延迟:QueryCoord 与 DataCoord 之间
- 检查 QueryNode 加载性能:ObjectStorage 带宽
Balance 不生效:
- 检查 BalanceChecker 是否触发:
grep "BalanceChecker" querycoord.log - 检查负载分数:是否超过阈值
- 检查 Task 状态:是否被取消或失败