Kubernetes-03-Scheduler-概览
模块职责
kube-scheduler 是 Kubernetes 控制平面的调度器,负责将待调度的 Pod 分配到最合适的 Node 上运行。它通过调度算法评估所有可用节点,选择最优节点,并将 Pod 绑定到该节点。
核心职责
-
Pod 调度决策
- 监听待调度的 Pod(
Pod.Spec.NodeName为空) - 执行两阶段调度算法:Filtering(过滤)+ Scoring(打分)
- 选择得分最高的节点作为调度目标
- 监听待调度的 Pod(
-
插件化调度框架
- 提供 10+ 个扩展点(PreFilter、Filter、PostFilter、Score、Bind 等)
- 支持自定义插件(实现 Plugin 接口)
- 内置 20+ 插件(资源检查、亲和性、反亲和性、拓扑域等)
-
抢占调度(Preemption)
- 当集群资源不足时,驱逐低优先级 Pod 为高优先级 Pod 腾出空间
- 选择最小驱逐成本的节点和 Pod 组合
-
绑定 Pod 到节点
- 乐观假设(Assume):先在缓存中记录 Pod 的调度结果
- 异步绑定(Bind):调用 API Server 更新 Pod.Spec.NodeName
- 失败回滚:绑定失败时从缓存中移除假设
输入/输出
输入:
- 待调度的 Pod:
Pod.Spec.NodeName为空的 Pod - 节点状态:Node 的资源容量、可分配资源、污点(Taints)
- 调度约束:亲和性、反亲和性、节点选择器、拓扑域约束
输出:
- 绑定结果:将 Pod.Spec.NodeName 设置为选中的节点名称
- 调度事件:记录调度成功/失败的事件(如
Scheduled、FailedScheduling)
上下游依赖
上游(被调用方):
- API Server:获取 Pod、Node、PVC 等资源对象,绑定 Pod 到节点
下游(调用方):
- Kubelet:监听 Pod.Spec.NodeName 变化,启动 Pod 容器
生命周期
// 启动流程(简化版)
func main() {
// 1. 创建 Scheduler
sched := scheduler.New(client, informerFactory, profile)
// 2. 启动 Informer(监听 Pod、Node、PVC 等资源)
informerFactory.Start(stopCh)
// 3. 启动 Scheduler(主循环)
sched.Run(ctx)
}
启动阶段:
- Leader Election(主备选举,确保只有一个实例工作)
- 创建调度框架(Framework):初始化插件
- 创建调度队列(SchedulingQueue):存储待调度的 Pod
- 启动 Informer:监听 Pod、Node、PVC 等资源变化
- 启动调度循环(Scheduling Loop):从队列取 Pod 并调度
运行阶段:
- SchedulingQueue 接收待调度的 Pod
- 调度循环不断从队列取 Pod
- 执行调度算法:Filtering → Scoring → Binding
- 绑定成功后,Kubelet 启动 Pod
停止阶段:
- 收到 SIGTERM 信号
- 停止调度循环(不再接受新 Pod)
- 等待当前调度任务完成(优雅关闭)
- 退出进程
调度算法概览
两阶段调度算法
flowchart LR
POD[待调度 Pod] --> FILTER[Filtering Phase<br/>过滤阶段]
FILTER --> FEASIBLE[Feasible Nodes<br/>可行节点列表]
FEASIBLE --> SCORE[Scoring Phase<br/>打分阶段]
SCORE --> BEST[Best Node<br/>最佳节点]
BEST --> BIND[Binding Phase<br/>绑定阶段]
BIND --> DONE[调度完成]
style FILTER fill:#FF6B6B,color:#fff
style SCORE fill:#4ECDC4,color:#fff
style BIND fill:#45B7D1,color:#fff
1. Filtering Phase(过滤阶段)
目的:过滤出可以运行 Pod 的节点。
执行逻辑:
- 遍历所有节点,对每个节点执行所有 Filter 插件
- 如果任何一个插件返回失败,该节点被标记为不可行
- 返回所有可行节点列表
常用 Filter 插件:
| 插件名称 | 检查内容 |
|---|---|
| NodeResourcesFit | 节点资源(CPU、内存)是否满足 Pod 请求 |
| NodePorts | 节点端口是否被占用 |
| VolumeBinding | PVC 是否可以绑定到节点的 PV |
| NodeAffinity | Pod 的节点亲和性规则是否匹配 |
| PodTopologySpread | Pod 拓扑域约束是否满足 |
| TaintToleration | Pod 是否容忍节点的污点 |
| NodeUnschedulable | 节点是否标记为不可调度(spec.unschedulable=true) |
示例:
集群有 10 个节点
- 执行 NodeResourcesFit:过滤掉 3 个资源不足的节点(剩余 7 个)
- 执行 NodePorts:过滤掉 1 个端口冲突的节点(剩余 6 个)
- 执行 TaintToleration:过滤掉 2 个有污点的节点(剩余 4 个)
→ 可行节点列表:Node-1, Node-2, Node-3, Node-4
2. Scoring Phase(打分阶段)
目的:对可行节点打分排序,选择最佳节点。
执行逻辑:
- 对每个可行节点,执行所有 Score 插件
- 每个插件返回 0-100 的分数
- 分数乘以插件权重(Weight),累加得到总分
- 选择总分最高的节点
常用 Score 插件:
| 插件名称 | 打分逻辑 | 权重 |
|---|---|---|
| NodeResourcesBalancedAllocation | 资源使用率越均衡,分数越高 | 1 |
| NodeResourcesLeastAllocated | 可用资源越多,分数越高 | 1 |
| ImageLocality | 节点已有镜像,分数越高 | 1 |
| InterPodAffinity | Pod 亲和性匹配度越高,分数越高 | 2 |
| NodeAffinity | 节点亲和性匹配度越高,分数越高 | 1 |
| PodTopologySpread | 拓扑域分布越均匀,分数越高 | 2 |
示例:
可行节点:Node-1, Node-2, Node-3, Node-4
打分结果(假设所有插件权重为 1):
- Node-1:NodeResourcesBalancedAllocation=80 + ImageLocality=50 = 130
- Node-2:NodeResourcesBalancedAllocation=70 + ImageLocality=100 = 170
- Node-3:NodeResourcesBalancedAllocation=60 + ImageLocality=60 = 120
- Node-4:NodeResourcesBalancedAllocation=90 + ImageLocality=40 = 130
→ 选择 Node-2(总分最高)
3. Binding Phase(绑定阶段)
目的:将 Pod 绑定到选定的节点。
执行流程:
- Assume(乐观假设):在缓存中记录 Pod 的调度结果(设置
Pod.Spec.NodeName) - WaitOnPermit(等待许可):等待 Permit 插件的批准(可用于流量控制)
- PreBind(绑定前处理):执行绑定前的准备工作(如预留 PV)
- Bind(绑定):调用 API Server 更新 Pod.Spec.NodeName
- PostBind(绑定后处理):执行绑定后的清理工作
乐观假设(Assume)的作用:
- 绑定操作是异步的(避免阻塞调度循环)
- 先在缓存中记录调度结果,后续调度决策基于该假设
- 如果绑定失败,从缓存中移除假设,重新调度
调度框架(Scheduling Framework)
调度框架架构
flowchart TB
subgraph "Scheduling Framework 调度框架"
subgraph "Extension Points 扩展点"
EP1[PreEnqueue<br/>入队前检查]
EP2[QueueSort<br/>队列排序]
EP3[PreFilter<br/>预过滤]
EP4[Filter<br/>过滤]
EP5[PostFilter<br/>后过滤/抢占]
EP6[PreScore<br/>预打分]
EP7[Score<br/>打分]
EP8[NormalizeScore<br/>归一化分数]
EP9[Reserve<br/>预留资源]
EP10[Permit<br/>许可]
EP11[PreBind<br/>绑定前处理]
EP12[Bind<br/>绑定]
EP13[PostBind<br/>绑定后处理]
end
subgraph "Built-in Plugins 内置插件"
P1[NodeResourcesFit]
P2[NodePorts]
P3[VolumeBinding]
P4[NodeAffinity]
P5[PodTopologySpread]
P6[TaintToleration]
P7[InterPodAffinity]
PN[... 20+ Plugins]
end
EP4 --> P1
EP4 --> P2
EP4 --> P3
EP4 --> P4
EP4 --> P5
EP4 --> P6
EP7 --> P7
EP7 --> PN
end
subgraph "Scheduler"
QUEUE[SchedulingQueue<br/>调度队列]
LOOP[Scheduling Loop<br/>调度循环]
CACHE[Scheduler Cache<br/>调度缓存]
end
POD[待调度 Pod] --> QUEUE
QUEUE --> LOOP
LOOP --> EP1
EP1 --> EP2
EP2 --> EP3
EP3 --> EP4
EP4 --> EP5
EP5 --> EP6
EP6 --> EP7
EP7 --> EP8
EP8 --> EP9
EP9 --> EP10
EP10 --> EP11
EP11 --> EP12
EP12 --> EP13
EP13 --> CACHE
style EP4 fill:#FF6B6B,color:#fff
style EP7 fill:#4ECDC4,color:#fff
style EP12 fill:#45B7D1,color:#fff
扩展点详解
1. PreEnqueue(入队前检查)
触发时机:Pod 创建/更新时,在加入调度队列前 作用:检查 Pod 是否可以被调度(如检查 PVC 是否已绑定) 返回值:Success(加入队列)或 Unschedulable(拒绝调度)
2. QueueSort(队列排序)
触发时机:Pod 加入调度队列时 作用:定义队列中 Pod 的排序规则 默认实现:按优先级排序(PrioritySort 插件)
3. PreFilter(预过滤)
触发时机:Filter 阶段之前 作用:预计算共享状态(如 Pod 的亲和性匹配结果),避免重复计算 返回值:Success 或 Unschedulable
4. Filter(过滤)
触发时机:对每个节点执行 作用:判断节点是否可以运行 Pod 返回值:Success(可行)或 Unschedulable(不可行)
5. PostFilter(后过滤/抢占)
触发时机:所有节点都不可行时 作用:尝试抢占调度(驱逐低优先级 Pod) 返回值:Success(找到抢占方案)或 Unschedulable(无法抢占)
6. PreScore(预打分)
触发时机:Score 阶段之前 作用:预计算共享状态(如节点的资源使用率)
7. Score(打分)
触发时机:对每个可行节点执行 作用:给节点打分(0-100) 返回值:分数
8. NormalizeScore(归一化分数)
触发时机:Score 阶段之后 作用:将分数归一化到 0-100 范围(如将百分比转换为分数)
9. Reserve(预留资源)
触发时机:选定节点后,绑定前 作用:预留资源(如预留 PV、预分配 IP) 返回值:Success 或 Error
10. Permit(许可)
触发时机:Reserve 之后,Bind 之前 作用:控制 Pod 何时可以绑定(可用于批量调度、Gang Scheduling) 返回值:Success(立即绑定)、Wait(等待)或 Reject(拒绝)
11. PreBind(绑定前处理)
触发时机:Bind 之前 作用:执行绑定前的准备工作(如挂载远程存储) 返回值:Success 或 Error
12. Bind(绑定)
触发时机:PreBind 成功后 作用:调用 API Server 更新 Pod.Spec.NodeName 返回值:Success 或 Error
13. PostBind(绑定后处理)
触发时机:Bind 成功后 作用:执行绑定后的清理工作(如记录指标) 返回值:无返回值(仅通知)
核心数据结构
1. SchedulingQueue(调度队列)
调度队列存储待调度的 Pod,分为 3 个子队列:
flowchart TB
subgraph "SchedulingQueue"
AQ[ActiveQ<br/>活跃队列<br/>优先级队列]
BQ[BackoffQ<br/>退避队列<br/>延迟队列]
UQ[UnschedulableQ<br/>不可调度队列<br/>Map]
end
POD[新 Pod] --> AQ
AQ --> SCHED[调度器]
SCHED -->|调度失败| BQ
BQ -->|退避时间到| AQ
SCHED -->|不可调度| UQ
UQ -->|集群事件触发| AQ
style AQ fill:#4ECDC4,color:#fff
style BQ fill:#F7B801,color:#fff
style UQ fill:#FF6B6B,color:#fff
ActiveQ(活跃队列):
- 存储待调度的 Pod(按优先级排序)
- 调度器不断从队列头部取 Pod
BackoffQ(退避队列):
- 存储调度失败的 Pod(等待重试)
- 退避时间:初始 1s,最大 10s(指数退避)
UnschedulableQ(不可调度队列):
- 存储所有节点都不可行的 Pod
- 等待集群事件(如新节点加入、Pod 删除)触发重新调度
2. Scheduler Cache(调度缓存)
调度缓存存储节点和 Pod 的状态,用于快速查询:
NodeInfo:
- Node 的资源容量、可分配资源、已使用资源
- Node 上运行的 Pod 列表
- Node 的污点(Taints)、标签(Labels)
PodInfo:
- Pod 的资源请求(CPU、内存)
- Pod 的调度约束(亲和性、反亲和性、节点选择器)
Assume 机制:
- 调度器选定节点后,先在缓存中记录 Pod 的调度结果
- 后续调度决策基于该假设(避免多个 Pod 调度到同一节点导致资源冲突)
- 绑定成功后,确认假设;绑定失败后,移除假设
关键代码片段
Scheduler 主循环
// pkg/scheduler/schedule_one.go
// scheduleOne 调度一个 Pod
func (sched *Scheduler) scheduleOne(ctx context.Context) {
// 1. 从队列获取待调度的 Pod
podInfo, _ := sched.SchedulingQueue.Pop(ctx)
pod := podInfo.Pod
// 2. 执行调度周期(Scheduling Cycle)
// - PreFilter → Filter → PostFilter → PreScore → Score → NormalizeScore
scheduleResult, assumedPod, status := sched.schedulingCycle(ctx, state, fwk, podInfo, start)
if !status.IsSuccess() {
// 调度失败,处理失败逻辑(加入 BackoffQ 或 UnschedulableQ)
sched.FailureHandler(ctx, fwk, podInfo, status, scheduleResult)
return
}
// 3. 异步执行绑定周期(Binding Cycle)
// - Reserve → Permit → PreBind → Bind → PostBind
go func() {
status := sched.bindingCycle(ctx, state, fwk, scheduleResult, assumedPod, start)
if !status.IsSuccess() {
// 绑定失败,回滚假设并重新调度
sched.handleBindingCycleError(ctx, state, fwk, assumedPod, start, scheduleResult, status)
}
}()
}
调度周期(Scheduling Cycle)
// schedulingCycle 执行调度周期
func (sched *Scheduler) schedulingCycle(ctx context.Context, state *fwk.CycleState, fwk framework.Framework, podInfo *fwk.QueuedPodInfo, start time.Time) (ScheduleResult, *v1.Pod, *fwk.Status) {
pod := podInfo.Pod
// 1. 执行 PreFilter 插件
preFilterResult, status := fwk.RunPreFilterPlugins(ctx, state, pod)
if !status.IsSuccess() {
return ScheduleResult{}, nil, status
}
// 2. 执行调度算法:Filtering + Scoring
scheduleResult, err := sched.schedulePod(ctx, fwk, state, pod)
if err != nil {
// 所有节点都不可行,执行 PostFilter(抢占)
postFilterResult, status := fwk.RunPostFilterPlugins(ctx, state, pod, filteredNodeStatusMap)
if !status.IsSuccess() {
return ScheduleResult{}, nil, status
}
// 抢占成功,返回抢占的节点
scheduleResult.SuggestedHost = postFilterResult.NominatedNodeName
}
// 3. 乐观假设(Assume)
assumedPod := pod.DeepCopy()
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
return ScheduleResult{}, nil, fwk.AsStatus(err)
}
// 4. 执行 Reserve 插件(预留资源)
if status := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() {
// Reserve 失败,回滚假设
sched.Cache.ForgetPod(assumedPod)
return ScheduleResult{}, nil, status
}
return scheduleResult, assumedPod, nil
}
调度算法(schedulePod)
// schedulePod 执行调度算法(Filtering + Scoring)
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *fwk.CycleState, pod *v1.Pod) (ScheduleResult, error) {
// 1. 更新缓存快照(Node 和 Pod 的状态)
if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
return ScheduleResult{}, err
}
// 2. 执行 Filtering(过滤阶段)
feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
if err != nil {
return ScheduleResult{}, err
}
// 3. 如果没有可行节点,返回错误
if len(feasibleNodes) == 0 {
return ScheduleResult{}, &framework.FitError{
Pod: pod,
NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
Diagnosis: diagnosis,
}
}
// 4. 如果只有一个可行节点,直接返回
if len(feasibleNodes) == 1 {
return ScheduleResult{
SuggestedHost: feasibleNodes[0].Node().Name,
EvaluatedNodes: 1,
FeasibleNodes: 1,
}, nil
}
// 5. 执行 Scoring(打分阶段)
priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
if err != nil {
return ScheduleResult{}, err
}
// 6. 选择得分最高的节点
host, _, err := selectHost(priorityList, numberOfHighestScoredNodesToReport)
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes),
FeasibleNodes: len(feasibleNodes),
}, err
}
Filtering 阶段
// findNodesThatFitPod 执行 Filtering 阶段
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *fwk.CycleState, pod *v1.Pod) ([]fwk.NodeInfo, framework.Diagnosis, error) {
var feasibleNodes []fwk.NodeInfo
diagnosis := framework.Diagnosis{NodeToStatus: framework.NewDefaultNodeToStatus()}
// 并发执行 Filter 插件(每个节点一个 Goroutine)
errCh := parallelize.NewErrorChannel()
feasibleNodesLen := atomic.NewInt32(0)
feasibleNodes = make([]fwk.NodeInfo, sched.nodeInfoSnapshot.NumNodes())
sched.Parallelizer().Until(ctx, len(allNodes), func(index int) {
nodeInfo := allNodes[index]
// 执行所有 Filter 插件
status := fwk.RunFilterPlugins(ctx, state, pod, nodeInfo)
if status.IsSuccess() {
// 节点可行,加入结果列表
length := feasibleNodesLen.Inc()
feasibleNodes[length-1] = nodeInfo
} else {
// 节点不可行,记录原因
diagnosis.NodeToStatus.Set(nodeInfo.Node().Name, status)
}
})
// 截取有效的可行节点
feasibleNodes = feasibleNodes[:feasibleNodesLen.Load()]
return feasibleNodes, diagnosis, nil
}
Scoring 阶段
// prioritizeNodes 执行 Scoring 阶段
func prioritizeNodes(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) ([]fwk.NodePluginScores, error) {
// 1. 执行所有 Score 插件
scoresMap, status := fwk.RunScorePlugins(ctx, state, pod, nodes)
if !status.IsSuccess() {
return nil, status.AsError()
}
// 2. 累加各插件的加权分数
// - 每个插件的分数 * 权重
// - 累加得到总分
for i := range scoresMap {
for j := range scoresMap[i].Scores {
scoresMap[i].TotalScore += scoresMap[i].Scores[j].Score
}
}
// 3. 返回排序后的节点列表(按总分降序)
return scoresMap, nil
}
Bind 阶段
// bind 执行绑定操作
func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *fwk.CycleState) *fwk.Status {
// 1. 执行 PreBind 插件
if status := fwk.RunPreBindPlugins(ctx, state, assumed, targetNode); !status.IsSuccess() {
return status
}
// 2. 执行 Bind 插件(调用 API Server)
status := fwk.RunBindPlugins(ctx, state, assumed, targetNode)
if !status.IsSuccess() {
return status
}
// 3. 执行 PostBind 插件
fwk.RunPostBindPlugins(ctx, state, assumed, targetNode)
return nil
}
// DefaultBinder 默认绑定插件
func (b DefaultBinder) Bind(ctx context.Context, state *fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status {
binding := &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
Target: v1.ObjectReference{Kind: "Node", Name: nodeName},
}
// 调用 API Server 绑定 Pod
err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
if err != nil {
return fwk.AsStatus(err)
}
return nil
}
边界条件与约束
并发与性能
并发调度:
- 默认单线程调度(一次只调度一个 Pod)
- 可通过
--kube-scheduler-qps和--kube-scheduler-burst调整 API 调用速率
Filter 并发:
- Filter 插件并发执行(每个节点一个 Goroutine)
- 适用于大集群(1000+ 节点)
Score 并发:
- Score 插件并发执行(每个节点一个 Goroutine)
资源限制
节点数量:
- 推荐 5000 节点以下
- 超过 5000 节点时,需要调整调度器参数(如
--kube-api-qps、--kube-api-burst)
Pod 调度速率:
- 默认约 100 Pods/s(取决于节点数量和插件复杂度)
- 可通过减少插件数量或禁用某些插件提高速率
扩展点
自定义插件:
- 实现 Plugin 接口(Filter、Score、Bind 等)
- 注册插件到调度框架
- 配置插件的权重和参数
Scheduler Extenders:
- HTTP 回调方式扩展调度器
- 适用于不方便修改调度器代码的场景
最佳实践
1. 合理设置 Pod 资源请求
问题:
- Pod 未设置资源请求(requests)时,调度器无法准确评估节点资源
- 可能导致节点资源超卖,影响 Pod 稳定性
建议:
- 为所有 Pod 设置
resources.requests(CPU、内存) - 使用 LimitRange 强制要求 Pod 设置资源请求
2. 使用节点亲和性和反亲和性
场景:
- 将同一应用的 Pod 分散到不同节点(高可用)
- 将数据密集型 Pod 调度到 SSD 节点
示例:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchLabels:
app: nginx
topologyKey: kubernetes.io/hostname
3. 使用拓扑域约束
场景:
- 将 Pod 均匀分布到多个可用区(Zone)
- 避免所有 Pod 集中在一个区域
示例:
topologySpreadConstraints:
- maxSkew: 1
topologyKey: topology.kubernetes.io/zone
whenUnsatisfiable: DoNotSchedule
labelSelector:
matchLabels:
app: nginx
4. 使用优先级和抢占
场景:
- 保证关键业务 Pod 优先调度
- 资源不足时,驱逐低优先级 Pod
示例:
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: high-priority
value: 1000000
globalDefault: false
description: "High priority for critical workloads"
Kubernetes-03-Scheduler-时序图
时序图概述
本文档提供 Scheduler 核心场景的时序图,包括:
- Pod 调度完整流程:从队列取出 Pod 到绑定成功
- Filtering 和 Scoring 流程:插件执行的详细步骤
- 抢占调度流程:资源不足时驱逐低优先级 Pod
- Assume 和 Binding 流程:乐观假设与异步绑定
场景 1:Pod 调度完整流程
时序图
sequenceDiagram
autonumber
participant U as User (kubectl)
participant API as API Server
participant INF as Informer
participant Q as SchedulingQueue
participant SCHED as Scheduler
participant FWK as Framework
participant CACHE as Scheduler Cache
Note over U,CACHE: 场景:用户创建 Pod(未指定 NodeName)
U->>API: POST /api/v1/pods<br/>{spec: {containers: ...}}
API-->>U: 201 Created (Pod RV=12345)
API->>INF: Watch Event: ADDED<br/>Pod/nginx (NodeName="")
INF->>Q: Add(pod)
Note over Q: Pod 加入 ActiveQ<br/>(按优先级排序)
Note over SCHED,CACHE: Scheduler 主循环
SCHED->>Q: Pop() -> Pod/nginx
Q-->>SCHED: QueuedPodInfo{Pod: nginx, Attempts: 1}
Note over SCHED: 开始调度周期(Scheduling Cycle)
SCHED->>FWK: RunPreFilterPlugins(ctx, state, pod)
FWK->>FWK: NodeResourcesFit.PreFilter()<br/>- 计算 Pod 资源请求<br/>- 存入 CycleState
FWK-->>SCHED: Success
SCHED->>SCHED: schedulePod(ctx, fwk, state, pod)
Note over SCHED,CACHE: 步骤 1:更新缓存快照
SCHED->>CACHE: UpdateSnapshot(snapshot)
CACHE->>CACHE: 复制所有 NodeInfo 到快照<br/>(用于并发调度)
CACHE-->>SCHED: Success
Note over SCHED,FWK: 步骤 2:Filtering Phase<br/>(过滤不可行的节点)
SCHED->>SCHED: findNodesThatFitPod(ctx, fwk, state, pod)
loop 并发过滤所有节点
SCHED->>FWK: RunFilterPlugins(ctx, state, pod, nodeInfo)
FWK->>FWK: NodeResourcesFit.Filter()<br/>- 检查 CPU/内存是否足够
alt 资源足够
FWK-->>SCHED: Success(节点可行)
else 资源不足
FWK-->>SCHED: Unschedulable(节点不可行)
end
FWK->>FWK: TaintToleration.Filter()<br/>- 检查 Pod 是否容忍污点
FWK->>FWK: VolumeBinding.Filter()<br/>- 检查 PVC 是否可绑定
FWK->>FWK: PodTopologySpread.Filter()<br/>- 检查拓扑域约束
end
SCHED->>SCHED: feasibleNodes = [Node-1, Node-2, Node-3]<br/>(3 个可行节点)
Note over SCHED,FWK: 步骤 3:Scoring Phase<br/>(对可行节点打分)
SCHED->>FWK: RunScorePlugins(ctx, state, pod, feasibleNodes)
loop 并发对每个节点打分
FWK->>FWK: NodeResourcesBalancedAllocation.Score()<br/>- 计算资源均衡度<br/>- 返回分数 0-100
FWK->>FWK: ImageLocality.Score()<br/>- 检查节点是否已有镜像<br/>- 返回分数 0-100
FWK->>FWK: InterPodAffinity.Score()<br/>- 计算 Pod 亲和性<br/>- 返回分数 0-100
end
FWK->>FWK: 累加各插件的加权分数<br/>- Node-1: 180<br/>- Node-2: 220<br/>- Node-3: 150
FWK-->>SCHED: NodePluginScores (排序后)
SCHED->>SCHED: selectHost(scores)<br/>选择得分最高的节点:Node-2
Note over SCHED,CACHE: 步骤 4:Assume(乐观假设)
SCHED->>CACHE: AssumePod(pod, "Node-2")
CACHE->>CACHE: 1. 记录假设状态<br/>2. 更新 Node-2 的资源<br/>3. 设置 Pod.Spec.NodeName="Node-2"
CACHE-->>SCHED: Success
Note over SCHED,FWK: 步骤 5:Reserve(预留资源)
SCHED->>FWK: RunReservePluginsReserve(ctx, state, pod, "Node-2")
FWK->>FWK: VolumeBinding.Reserve()<br/>- 预留 PV(更新 PV.ClaimRef)
FWK-->>SCHED: Success
Note over SCHED,FWK: 调度周期结束<br/>开始绑定周期(Binding Cycle)
Note over SCHED: 异步执行绑定(不阻塞调度循环)
SCHED->>SCHED: go bindingCycle(ctx, state, fwk, pod, "Node-2")
Note over SCHED,FWK: 绑定周期(后台 Goroutine)
SCHED->>FWK: WaitOnPermit(ctx, pod)
FWK->>FWK: Permit 插件批准<br/>(可用于批量调度、流量控制)
FWK-->>SCHED: Success
SCHED->>FWK: RunPreBindPlugins(ctx, state, pod, "Node-2")
FWK->>FWK: VolumeBinding.PreBind()<br/>- 挂载远程存储<br/>- 创建 PVC 的 Volume
FWK-->>SCHED: Success
SCHED->>FWK: RunBindPlugins(ctx, state, pod, "Node-2")
FWK->>API: POST /api/v1/pods/nginx/binding<br/>{target: {kind: "Node", name: "Node-2"}}
API->>API: 更新 Pod.Spec.NodeName="Node-2"
API-->>FWK: 200 OK (Pod RV=12346)
FWK-->>SCHED: Success
SCHED->>CACHE: FinishBinding(pod)
CACHE->>CACHE: 确认假设(移出 assumedPods)
CACHE-->>SCHED: Success
SCHED->>FWK: RunPostBindPlugins(ctx, state, pod, "Node-2")
FWK->>FWK: 记录指标、发送事件
Note over API,INF: Informer 监听到 Pod 更新
API->>INF: Watch Event: MODIFIED<br/>Pod/nginx (NodeName="Node-2")
Note over INF: Kubelet 监听到 Pod 分配到本节点<br/>开始启动容器
要点说明
1. 图意概述
- 触发条件:用户创建 Pod 且未指定
NodeName - 核心流程:Pop → PreFilter → Filtering → Scoring → Assume → Reserve → Bind
- 异步绑定:绑定操作在后台 Goroutine 执行,不阻塞调度循环
2. 关键步骤
- PreFilter:预计算共享状态(如 Pod 资源请求),避免重复计算
- Filtering:并发过滤所有节点(每个节点一个 Goroutine)
- Scoring:并发对可行节点打分(每个节点一个 Goroutine)
- Assume:乐观假设(先在缓存中记录调度结果,后续调度基于该假设)
- Reserve:预留资源(如预留 PV、预分配 IP)
- Bind:异步绑定(调用 API Server 更新 Pod.Spec.NodeName)
3. 边界条件
- 所有节点都不可行:执行 PostFilter(抢占调度)
- 绑定失败:ForgetPod(回滚假设),重新加入队列
- Permit 拒绝:回滚 Reserve,重新加入队列
4. 性能优化
- 并发过滤:Filter 插件并发执行(提高大集群性能)
- 并发打分:Score 插件并发执行
- 异步绑定:绑定操作不阻塞调度循环(提高吞吐量)
场景 2:抢占调度流程
时序图
sequenceDiagram
autonumber
participant SCHED as Scheduler
participant FWK as Framework
participant CACHE as Scheduler Cache
participant API as API Server
Note over SCHED,API: 场景:高优先级 Pod 无法调度<br/>(所有节点资源不足)
SCHED->>FWK: RunFilterPlugins(ctx, state, pod, allNodes)
FWK-->>SCHED: 所有节点都返回 Unschedulable
Note over SCHED,FWK: Filtering 失败<br/>执行 PostFilter(抢占)
SCHED->>FWK: RunPostFilterPlugins(ctx, state, pod, filteredNodeStatusMap)
Note over FWK: DefaultPreemption 插件执行
FWK->>FWK: 检查 Pod 是否允许抢占<br/>- Pod.Spec.Priority > 0<br/>- Pod.Spec.PreemptionPolicy != "Never"
alt Pod 不允许抢占
FWK-->>SCHED: Unschedulable
SCHED->>SCHED: 加入 UnschedulableQ<br/>等待集群事件触发重新调度
else Pod 允许抢占
Note over FWK: 查找可以通过抢占变为可行的节点
loop 遍历所有节点
FWK->>FWK: 1. 列出节点上运行的 Pod
FWK->>FWK: 2. 按优先级排序<br/>(低优先级在前)
FWK->>FWK: 3. 逐个尝试驱逐 Pod<br/>直到节点可行
alt 驱逐后节点可行
FWK->>FWK: 记录抢占候选:<br/>- Node<br/>- Victims(被驱逐的 Pod)<br/>- PDB Violations(违反 PDB 的数量)
end
end
FWK->>FWK: 选择驱逐成本最小的候选<br/>- 优先驱逐优先级最低的 Pod<br/>- 优先驱逐 Pod 数量最少的节点<br/>- 优先违反 PDB 最少的节点
Note over FWK,API: 驱逐低优先级 Pod
loop 遍历 Victims
FWK->>API: POST /api/v1/pods/{name}/eviction<br/>{deleteOptions: {gracePeriodSeconds: 30}}
API->>API: 设置 Pod.DeletionTimestamp<br/>(优雅删除)
API-->>FWK: 200 OK
end
FWK->>FWK: 设置 Pod.Status.NominatedNodeName<br/>(记录抢占的节点,等待资源释放)
FWK-->>SCHED: PostFilterResult{NominatedNodeName: "Node-1"}
SCHED->>API: PATCH /api/v1/pods/high-priority-pod/status<br/>{status.nominatedNodeName: "Node-1"}
API-->>SCHED: 200 OK
SCHED->>SCHED: 加入 UnschedulableQ<br/>等待 Victims 删除事件
end
Note over API,SCHED: Victims 删除后<br/>触发重新调度
API->>SCHED: Watch Event: DELETED<br/>Pod/low-priority-pod
SCHED->>SCHED: MoveAllToActiveOrBackoffQueue()<br/>(UnschedulableQ -> ActiveQ)
SCHED->>SCHED: Pop() -> Pod/high-priority-pod
SCHED->>FWK: RunFilterPlugins(...)<br/>(此时 Node-1 资源已释放)
FWK-->>SCHED: Success(Node-1 可行)
SCHED->>SCHED: schedulePod() -> Node-1
SCHED->>SCHED: Assume + Bind
要点说明
1. 抢占条件
- Pod 优先级 > 0(
Pod.Spec.Priority) - 抢占策略不是 Never(
Pod.Spec.PreemptionPolicy)
2. 驱逐成本计算
- 优先级:优先驱逐优先级最低的 Pod
- 数量:优先驱逐 Pod 数量最少的节点
- PDB 违反:优先违反 PodDisruptionBudget 最少的节点
- 节点优先级:优先驱逐低优先级节点上的 Pod
3. PodDisruptionBudget(PDB)
- 限制同时驱逐的 Pod 数量(保证最小可用副本数)
- 抢占时可能违反 PDB(但会优先选择违反最少的方案)
场景 3:绑定失败回滚流程
时序图
sequenceDiagram
autonumber
participant SCHED as Scheduler
participant CACHE as Scheduler Cache
participant FWK as Framework
participant API as API Server
participant Q as SchedulingQueue
Note over SCHED,Q: 场景:Assume 成功,但 Bind 失败
SCHED->>CACHE: AssumePod(pod, "Node-1")
CACHE->>CACHE: 记录假设(Pod.Spec.NodeName="Node-1")
CACHE-->>SCHED: Success
SCHED->>FWK: RunReservePluginsReserve(ctx, state, pod, "Node-1")
FWK->>FWK: VolumeBinding.Reserve()<br/>- 预留 PV
FWK-->>SCHED: Success
Note over SCHED: 异步绑定
SCHED->>FWK: RunBindPlugins(ctx, state, pod, "Node-1")
FWK->>API: POST /api/v1/pods/nginx/binding<br/>{target: {kind: "Node", name: "Node-1"}}
alt Bind 成功
API-->>FWK: 200 OK
FWK-->>SCHED: Success
SCHED->>CACHE: FinishBinding(pod)
else Bind 失败(如 API Server 错误、ResourceVersion 冲突)
API-->>FWK: 409 Conflict / 500 Internal Server Error
FWK-->>SCHED: Error
Note over SCHED,Q: 回滚流程
SCHED->>FWK: RunReservePluginsUnreserve(ctx, state, pod, "Node-1")
FWK->>FWK: VolumeBinding.Unreserve()<br/>- 释放预留的 PV
SCHED->>CACHE: ForgetPod(pod)
CACHE->>CACHE: 1. 移出 assumedPods<br/>2. 回滚 Node-1 的资源<br/>3. 删除 Pod 状态
CACHE-->>SCHED: Success
SCHED->>Q: AddUnschedulableIfNotPresent(pod, BackoffQ)
Q->>Q: 加入 BackoffQ<br/>(退避 1s 后重试)
Note over SCHED: 记录失败事件
SCHED->>API: POST /api/v1/events<br/>{type: "Warning", reason: "FailedScheduling"}
end
要点说明
1. 回滚步骤
- Unreserve:释放预留的资源(如 PV、IP)
- ForgetPod:从缓存中移除假设(回滚节点资源)
- Requeue:重新加入队列(BackoffQ,退避后重试)
2. 常见绑定失败原因
- API Server 错误:网络故障、etcd 不可用
- ResourceVersion 冲突:Pod 被其他组件更新(如 Kubelet 更新 Status)
- 节点不可用:节点在绑定前被删除或标记为不可调度
场景 4:Scheduler 启动流程
时序图(精简版)
sequenceDiagram
autonumber
participant MAIN as main()
participant SCHED as Scheduler
participant LE as Leader Election
participant INF as InformerFactory
participant Q as SchedulingQueue
participant FWK as Framework
MAIN->>SCHED: NewScheduler(client, config)
SCHED->>FWK: NewFramework(profile, plugins)
FWK->>FWK: 注册所有插件<br/>(Filter、Score、Bind 等)
SCHED->>Q: NewSchedulingQueue(lessFn)
MAIN->>LE: LeaderElector.Run()
loop 每 2 秒重试
LE->>LE: 尝试获取 Lease
alt 获取成功
LE->>SCHED: OnStartedLeading() 回调
else 获取失败
LE->>LE: 等待 2s 后重试
end
end
SCHED->>INF: Start(stopCh)
INF->>INF: 启动所有 Informer<br/>- Pod Informer<br/>- Node Informer<br/>- PVC Informer
SCHED->>Q: Run()
loop Scheduler 主循环
SCHED->>Q: Pop() -> PodInfo
SCHED->>SCHED: scheduleOne(ctx, podInfo)
Note over SCHED: PreFilter → Filter → Score<br/>Assume → Reserve → Bind
end
性能指标与可观测性
关键指标
| 指标 | 类型 | 说明 |
|---|---|---|
scheduler_scheduling_duration_seconds |
Histogram | 调度延迟(从 Pop 到 Bind) |
scheduler_scheduling_attempt_duration_seconds |
Histogram | 单次调度尝试延迟 |
scheduler_pod_scheduling_attempts |
Histogram | Pod 调度尝试次数 |
scheduler_queue_incoming_pods_total |
Counter | 入队 Pod 总数 |
scheduler_pending_pods |
Gauge | 待调度 Pod 数量 |
scheduler_preemption_attempts_total |
Counter | 抢占尝试次数 |
scheduler_preemption_victims |
Histogram | 每次抢占驱逐的 Pod 数量 |
scheduler_framework_extension_point_duration_seconds |
Histogram | 各扩展点执行时间 |
scheduler_plugin_execution_duration_seconds |
Histogram | 各插件执行时间 |
性能调优建议
- 减少插件数量:禁用不必要的插件
- 调整插件权重:降低不重要插件的权重
- 增加调度器副本:多 Scheduler 实例(注意配置不同的 schedulerName)
- 优化节点数量:单个 Scheduler 推荐 5000 节点以下
Kubernetes-03-Scheduler-数据结构
数据结构概述
Scheduler 的核心数据结构围绕**调度框架(Scheduling Framework)**设计,主要包括:
- SchedulingQueue(调度队列):管理待调度的 Pod
- Scheduler Cache(调度缓存):存储节点和 Pod 的状态
- Framework(调度框架):管理插件和扩展点
- CycleState(调度周期状态):插件间共享数据
核心数据结构 UML 图
1. Scheduler 整体数据结构
classDiagram
class Scheduler {
+SchedulingQueue queue
+SchedulerCache cache
+Framework framework
+Profiles []Profile
+Algorithm genericScheduler
+scheduleOne(ctx Context)
}
class SchedulingQueue {
+ActiveQ PriorityQueue
+BackoffQ DelayingQueue
+UnschedulableQ Map
+Add(pod Pod)
+Pop() (PodInfo, error)
+MoveAllToActiveOrBackoffQueue(event string)
}
class SchedulerCache {
+nodes map~string~NodeInfo
+assumedPods map~string~bool
+AssumePod(pod Pod) error
+FinishBinding(pod Pod) error
+ForgetPod(pod Pod) error
+UpdateSnapshot(snapshot Snapshot) error
}
class Framework {
+preFilterPlugins []PreFilterPlugin
+filterPlugins []FilterPlugin
+postFilterPlugins []PostFilterPlugin
+scorePlugins []ScorePlugin
+bindPlugins []BindPlugin
+RunFilterPlugins(ctx, state, pod, nodeInfo) Status
+RunScorePlugins(ctx, state, pod, nodes) NodePluginScores
}
class CycleState {
+storage map~StateKey~StateData
+recordPluginMetrics bool
+SkipFilterPlugins Set~string~
+SkipScorePlugins Set~string~
+Read(key StateKey) (StateData, error)
+Write(key StateKey, val StateData)
}
Scheduler "1" --> "1" SchedulingQueue : uses
Scheduler "1" --> "1" SchedulerCache : uses
Scheduler "1" --> "1" Framework : uses
Scheduler "1" --> "1" CycleState : creates per cycle
数据结构详解
1. SchedulingQueue(调度队列)
1.1 接口定义
// pkg/scheduler/internal/queue/scheduling_queue.go
// SchedulingQueue 调度队列接口
type SchedulingQueue interface {
// Add 添加 Pod 到队列
Add(logger klog.Logger, pod *v1.Pod) error
// Pop 从队列取出下一个待调度的 Pod(阻塞)
Pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
// Update 更新队列中的 Pod
Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
// Delete 从队列删除 Pod
Delete(pod *v1.Pod) error
// MoveAllToActiveOrBackoffQueue 移动所有 Pod 到 ActiveQ 或 BackoffQ
// - 触发时机:集群事件(如 Node 新增、Pod 删除)
MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, preCheck PreEnqueueCheck) error
// Run 启动队列(后台 Goroutine)
Run(logger klog.Logger)
// Close 关闭队列
Close()
}
1.2 数据结构
// PriorityQueue 优先级队列实现
type PriorityQueue struct {
// 三个子队列
activeQ *heap.Heap // 活跃队列(优先级堆)
backoffQ *heap.Heap // 退避队列(延迟队列)
unschedulableQ *UnschedulablePodsMap // 不可调度队列(Map)
// 索引
podInfoMap map[types.UID]*framework.QueuedPodInfo // UID -> PodInfo
// 锁
lock sync.RWMutex
// 条件变量(Pop 阻塞时等待)
cond sync.Cond
// 队列已关闭标志
closed bool
}
字段说明:
| 字段 | 类型 | 说明 |
|---|---|---|
activeQ |
*heap.Heap |
活跃队列(按优先级和时间戳排序) |
backoffQ |
*heap.Heap |
退避队列(存储调度失败的 Pod,等待重试) |
unschedulableQ |
*UnschedulablePodsMap |
不可调度队列(存储所有节点都不可行的 Pod) |
podInfoMap |
map[types.UID]*QueuedPodInfo |
UID 到 PodInfo 的映射(快速查找) |
1.3 QueuedPodInfo(队列中的 Pod 信息)
// QueuedPodInfo 队列中的 Pod 信息
type QueuedPodInfo struct {
*PodInfo
// Timestamp Pod 加入队列的时间
Timestamp time.Time
// Attempts 调度尝试次数
Attempts int
// InitialAttemptTimestamp 第一次调度尝试的时间
InitialAttemptTimestamp *time.Time
// Gated Pod 是否被门控(等待 Permit 插件批准)
Gated bool
// PendingPlugins 等待批准的插件集合
PendingPlugins map[string]*time.Time
}
1.4 优先级排序规则
// Less 比较两个 Pod 的优先级
func (pq *PriorityQueue) Less(pInfo1, pInfo2 interface{}) bool {
p1 := pInfo1.(*framework.QueuedPodInfo)
p2 := pInfo2.(*framework.QueuedPodInfo)
// 1. 优先级(Priority):高优先级优先
if p1.Pod.Spec.Priority != nil && p2.Pod.Spec.Priority != nil {
if *p1.Pod.Spec.Priority > *p2.Pod.Spec.Priority {
return true
}
if *p1.Pod.Spec.Priority < *p2.Pod.Spec.Priority {
return false
}
}
// 2. 时间戳(Timestamp):早加入的优先
return p1.Timestamp.Before(p2.Timestamp)
}
1.5 核心方法
Add(添加 Pod):
func (pq *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) error {
pq.lock.Lock()
defer pq.lock.Unlock()
// 1. 检查 Pod 是否已在队列中
pInfo := pq.newQueuedPodInfo(pod)
if oldPInfo, exists := pq.podInfoMap[pod.UID]; exists {
// 更新已存在的 Pod
pInfo.Attempts = oldPInfo.Attempts
pInfo.InitialAttemptTimestamp = oldPInfo.InitialAttemptTimestamp
}
// 2. 检查 Pod 是否可以入队(PreEnqueue 插件)
if !pq.isPodBackingoff(pInfo) {
// 加入 activeQ
if err := pq.activeQ.Add(pInfo); err != nil {
return err
}
pq.podInfoMap[pod.UID] = pInfo
} else {
// 加入 backoffQ(等待退避时间)
if err := pq.backoffQ.Add(pInfo); err != nil {
return err
}
pq.podInfoMap[pod.UID] = pInfo
}
// 3. 唤醒等待的 Pop
pq.cond.Broadcast()
return nil
}
Pop(取出 Pod):
func (pq *PriorityQueue) Pop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
pq.lock.Lock()
defer pq.lock.Unlock()
// 1. 等待队列非空或关闭
for pq.activeQ.Len() == 0 {
if pq.closed {
return nil, fmt.Errorf("scheduling queue is closed")
}
pq.cond.Wait()
}
// 2. 从 activeQ 取出优先级最高的 Pod
obj, err := pq.activeQ.Pop()
if err != nil {
return nil, err
}
pInfo := obj.(*framework.QueuedPodInfo)
pInfo.Attempts++
// 3. 保留在 podInfoMap 中(用于 Update/Delete)
return pInfo, nil
}
MoveAllToActiveOrBackoffQueue(移动 Pod):
func (pq *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, preCheck PreEnqueueCheck) error {
pq.lock.Lock()
defer pq.lock.Unlock()
// 1. 遍历 unschedulableQ 中的所有 Pod
unschedulablePods := pq.unschedulableQ.GetAllPods()
for _, pInfo := range unschedulablePods {
// 2. 检查 Pod 是否因该事件可重新调度
if !preCheck(pInfo.Pod, event) {
continue
}
// 3. 移动到 activeQ 或 backoffQ
pq.unschedulableQ.Delete(pInfo.Pod.UID)
if pq.isPodBackingoff(pInfo) {
pq.backoffQ.Add(pInfo)
} else {
pq.activeQ.Add(pInfo)
}
}
// 4. 唤醒等待的 Pop
pq.cond.Broadcast()
return nil
}
2. Scheduler Cache(调度缓存)
2.1 接口定义
// pkg/scheduler/backend/cache/interface.go
// Cache 调度缓存接口
type Cache interface {
// AssumePod 乐观假设(在缓存中记录 Pod 的调度结果)
AssumePod(logger klog.Logger, pod *v1.Pod) error
// FinishBinding 确认绑定成功
FinishBinding(logger klog.Logger, pod *v1.Pod) error
// ForgetPod 忘记假设(绑定失败时调用)
ForgetPod(logger klog.Logger, pod *v1.Pod) error
// AddPod 添加 Pod 到缓存
AddPod(logger klog.Logger, pod *v1.Pod) error
// UpdatePod 更新缓存中的 Pod
UpdatePod(logger klog.Logger, oldPod, newPod *v1.Pod) error
// RemovePod 从缓存移除 Pod
RemovePod(logger klog.Logger, pod *v1.Pod) error
// AddNode 添加节点到缓存
AddNode(logger klog.Logger, node *v1.Node) *framework.NodeInfo
// UpdateNode 更新缓存中的节点
UpdateNode(logger klog.Logger, oldNode, newNode *v1.Node) *framework.NodeInfo
// RemoveNode 从缓存移除节点
RemoveNode(logger klog.Logger, node *v1.Node) error
// UpdateSnapshot 更新快照(供调度算法使用)
UpdateSnapshot(logger klog.Logger, snapshot *Snapshot) error
}
2.2 数据结构
// cacheImpl 实现了 Cache 接口
type cacheImpl struct {
// 节点缓存
nodes map[string]*nodeInfoListItem // nodeName -> NodeInfo
// 已假设的 Pod
assumedPods sets.Set[string] // podKey 集合
// Pod 状态
podStates map[string]*podState // podKey -> podState
// 锁
mu sync.RWMutex
// 镜像状态
imageStates map[string]*imageState // imageName -> imageState
}
字段说明:
| 字段 | 类型 | 说明 |
|---|---|---|
nodes |
map[string]*nodeInfoListItem |
节点名称到 NodeInfo 的映射 |
assumedPods |
sets.Set[string] |
已假设的 Pod 集合(等待绑定确认) |
podStates |
map[string]*podState |
Pod 的状态(假设中、已绑定) |
imageStates |
map[string]*imageState |
镜像的状态(大小、存在于哪些节点) |
2.3 NodeInfo(节点信息)
// NodeInfo 节点信息
type NodeInfo struct {
// Node 对象
node *v1.Node
// 资源信息
Requested *Resource // 已请求的资源
NonZeroRequested *Resource // 非零请求的资源(用于防止超卖)
Allocatable *Resource // 可分配的资源
// Pod 列表
Pods []*PodInfo // 节点上运行的 Pod
// 镜像状态
ImageStates map[string]*ImageStateSummary // 节点上的镜像
// 拓扑域
TopologyZone string // 可用区
TopologyRegion string // 地域
// 代次(用于检测并发更新)
Generation int64
}
// Resource 资源
type Resource struct {
MilliCPU int64
Memory int64
EphemeralStorage int64
AllowedPodNumber int
ScalarResources map[v1.ResourceName]int64 // GPU、HugePages 等
}
2.4 核心方法
AssumePod(乐观假设):
func (cache *cacheImpl) AssumePod(logger klog.Logger, pod *v1.Pod) error {
cache.mu.Lock()
defer cache.mu.Unlock()
key := framework.GetPodKey(pod)
// 1. 检查 Pod 是否已假设
if cache.assumedPods.Has(key) {
return fmt.Errorf("pod %v is assumed", key)
}
// 2. 记录假设状态
cache.assumedPods.Insert(key)
cache.podStates[key] = &podState{
pod: pod,
deadline: time.Now().Add(assumedPodTTL), // 假设 TTL(默认 30s)
}
// 3. 更新节点资源
nodeName := pod.Spec.NodeName
if n, ok := cache.nodes[nodeName]; ok {
n.info.AddPod(pod)
}
return nil
}
FinishBinding(确认绑定):
func (cache *cacheImpl) FinishBinding(logger klog.Logger, pod *v1.Pod) error {
cache.mu.Lock()
defer cache.mu.Unlock()
key := framework.GetPodKey(pod)
// 从假设集合中移除
cache.assumedPods.Delete(key)
// 更新 Pod 状态为已绑定
if ps, ok := cache.podStates[key]; ok {
ps.bindingFinished = true
}
return nil
}
ForgetPod(忘记假设):
func (cache *cacheImpl) ForgetPod(logger klog.Logger, pod *v1.Pod) error {
cache.mu.Lock()
defer cache.mu.Unlock()
key := framework.GetPodKey(pod)
// 1. 从假设集合中移除
cache.assumedPods.Delete(key)
// 2. 回滚节点资源
nodeName := pod.Spec.NodeName
if n, ok := cache.nodes[nodeName]; ok {
n.info.RemovePod(pod)
}
// 3. 删除 Pod 状态
delete(cache.podStates, key)
return nil
}
3. Framework(调度框架)
3.1 数据结构
// pkg/scheduler/framework/runtime/framework.go
// frameworkImpl 实现了 Framework 接口
type frameworkImpl struct {
// 插件集合(按扩展点分类)
preEnqueuePlugins []framework.PreEnqueuePlugin
queueSortPlugins []framework.QueueSortPlugin
preFilterPlugins []framework.PreFilterPlugin
filterPlugins []framework.FilterPlugin
postFilterPlugins []framework.PostFilterPlugin
preScorePlugins []framework.PreScorePlugin
scorePlugins []framework.ScorePlugin
reservePlugins []framework.ReservePlugin
permitPlugins []framework.PermitPlugin
preBindPlugins []framework.PreBindPlugin
bindPlugins []framework.BindPlugin
postBindPlugins []framework.PostBindPlugin
// 插件权重
scorePluginWeight map[string]int // 插件名称 -> 权重
// Handle(供插件使用的辅助接口)
handle framework.Handle
// 并行器
parallelizer parallelize.Parallelizer
}
3.2 Handle(插件辅助接口)
// Handle 供插件使用的辅助接口
type Handle interface {
// SnapshotSharedLister 获取共享的 Lister(节点、Pod 等资源的缓存)
SnapshotSharedLister() SharedLister
// ClientSet 获取 Kubernetes ClientSet
ClientSet() clientset.Interface
// EventRecorder 获取事件记录器
EventRecorder() events.EventRecorder
// SharedInformerFactory 获取共享 InformerFactory
SharedInformerFactory() informers.SharedInformerFactory
// Parallelizer 获取并行器
Parallelizer() parallelize.Parallelizer
}
4. CycleState(调度周期状态)
4.1 数据结构
// CycleState 调度周期状态(插件间共享数据)
type CycleState struct {
// 存储(Key-Value)
storage sync.Map // StateKey -> StateData
// 是否记录插件指标
recordPluginMetrics bool
// 跳过的插件集合
SkipFilterPlugins sets.Set[string]
SkipScorePlugins sets.Set[string]
}
// StateKey 状态键
type StateKey string
// StateData 状态数据接口
type StateData interface {
Clone() StateData
}
4.2 核心方法
Write(写入数据):
func (c *CycleState) Write(key StateKey, val StateData) {
c.storage.Store(key, val)
}
Read(读取数据):
func (c *CycleState) Read(key StateKey) (StateData, error) {
if v, ok := c.storage.Load(key); ok {
return v.(StateData), nil
}
return nil, fmt.Errorf("key %v not found", key)
}
Clone(克隆):
func (c *CycleState) Clone() *CycleState {
clone := &CycleState{
recordPluginMetrics: c.recordPluginMetrics,
SkipFilterPlugins: c.SkipFilterPlugins.Clone(),
SkipScorePlugins: c.SkipScorePlugins.Clone(),
}
c.storage.Range(func(key, value interface{}) bool {
clone.storage.Store(key, value.(StateData).Clone())
return true
})
return clone
}
数据结构关系与约束
1. Assume 机制的作用
问题:
- 调度器选定节点后,需要调用 API Server 绑定 Pod(异步操作)
- 在绑定完成前,后续调度决策可能基于过时的节点资源(导致资源超卖)
解决方案:
- 调度器选定节点后,先在缓存中记录 Pod 的调度结果(Assume)
- 后续调度决策基于该假设(避免多个 Pod 调度到同一节点)
- 绑定成功后,确认假设(FinishBinding)
- 绑定失败后,移除假设(ForgetPod)
示例:
t=0: 节点 Node-1 可用资源:4 CPU
t=1: 调度器选定 Node-1 运行 Pod-A(请求 2 CPU)
- AssumePod(Pod-A)
- 缓存中 Node-1 可用资源:2 CPU
t=2: 调度器选定 Node-1 运行 Pod-B(请求 2 CPU)
- 基于缓存判断:Node-1 可用资源 = 2 CPU(满足)
- AssumePod(Pod-B)
- 缓存中 Node-1 可用资源:0 CPU
t=3: 绑定 Pod-A 成功
- FinishBinding(Pod-A)
t=4: 绑定 Pod-B 失败(实际资源不足)
- ForgetPod(Pod-B)
- 缓存中 Node-1 可用资源:2 CPU
2. 调度队列的退避机制
问题:
- Pod 调度失败后,立即重试可能再次失败(集群状态未变化)
解决方案:
- 调度失败的 Pod 进入 BackoffQ(退避队列)
- 退避时间:初始 1s,最大 10s(指数退避)
- 退避时间到后,移动到 ActiveQ 重试
示例:
t=0: Pod-A 调度失败(所有节点资源不足)
- 加入 BackoffQ(退避时间 1s)
t=1: 退避时间到,移动到 ActiveQ
- 再次调度失败(集群状态未变化)
- 加入 BackoffQ(退避时间 2s)
t=3: 退避时间到,移动到 ActiveQ
- 再次调度失败
- 加入 BackoffQ(退避时间 4s)
t=7: 退避时间到,移动到 ActiveQ
- 调度成功(新节点加入)
3. UnschedulableQ 的触发机制
问题:
- Pod 因集群资源不足而无法调度时,频繁重试浪费 CPU
解决方案:
- 将 Pod 移动到 UnschedulableQ(不可调度队列)
- 等待集群事件触发重新调度(如新节点加入、Pod 删除)
触发事件:
| 事件 | 触发原因 |
|---|---|
| Node Added | 新节点加入,可能有资源 |
| Pod Deleted | Pod 删除,释放资源 |
| PV Created | PV 创建,可能满足 PVC |
| Service Updated | Service 更新,可能影响亲和性 |
性能与容量考虑
1. SchedulingQueue 性能
内存:
- 每个 PodInfo:约 1-2 KB
- 1000 个待调度 Pod ≈ 1-2 MB
查询性能:
- Pop:O(log n)(堆操作)
- Add:O(log n)
- MoveAllToActiveOrBackoffQueue:O(n)(遍历 unschedulableQ)
2. Scheduler Cache 性能
内存:
- 每个 NodeInfo:约 10-50 KB(取决于 Pod 数量)
- 1000 个节点 ≈ 10-50 MB
查询性能:
- AssumePod:O(1)(map 查询)
- UpdateSnapshot:O(n)(遍历所有节点)
3. CycleState 性能
内存:
- 每个 CycleState:约 1-10 KB(取决于插件数据)
- 并发调度 100 个 Pod ≈ 100 KB - 1 MB