kubernetes源码剖析:kube-scheduler
目录
- kubernetes源码剖析:kube-scheduler
- 目录
- 1. 调度整体观
- 2. 入口命令与
runCommand
- 3.
Setup
:构造 Scheduler 与依赖 - 4. Clientset 初始化
- 5. SharedInformerFactory 与 Pod Informer
- 6. SharedIndexInformer 三要素
- 7. 调度器
scheduler.New
及 Profile/Framework - 8. 调度队列:active / backoff / unschedulable
- 9. 事件处理与监听注册
- 10.
Run
:启动、同步与(可选)选主 - 11. Reflector、List/Watch 与 DeltaFIFO
- 12. 调度主循环
ScheduleOne
- 13.
schedulingCycle
:assume / reserve / permit / bind - 14. 候选节点与打分
- 15. Assume 与缓存更新
- 16. 细节:抽样比例与起始下标
- 17. 小结
- 附录 A|Mermaid 时序图:从 List/Watch 到 Bind
- 附录 B|插件扩展点速查表(Cheat Sheet)
- 附录 C|常见失败路径与回退策略
- 18. 抢占机制(Preemption)详解
- 19. 预入队与 QueueingHint
- 20. 性能与可观测性
- 21. 参数调优实践
1. 调度整体观
- 输入:
pod.spec.nodeName
为空的 Pod。 - 过程:经调度框架(预选 Filter → 优选 Score)从集群 Nodes 中挑选一个可行节点。
- 输出:回写
pod.spec.nodeName
(经assume
、reserve/permit
、bind
),最终持久化到 etcd。
2. 入口命令与 runCommand
借助 cobra
,入口命令创建并执行:
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
opts := options.NewOptions()
return &cobra.Command{
Use: "kube-scheduler",
RunE: func(cmd *cobra.Command, args []string) error {
return runCommand(cmd, opts, registryOptions...)
},
}
}
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
cc, sched, err := Setup(ctx, opts, registryOptions...)
if err != nil { return err }
return Run(ctx, cc, sched)
}
3. Setup
:构造 Scheduler 与依赖
opts.Config
创建 kubeClient
、InformerFactory
、DynInformerFactory
,以 scheduler.With*
选项配置调度器:
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (
*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error,
) {
c, err := opts.Config(ctx)
if err != nil { return nil, nil, err }
cc := c.Complete()
recorderFactory := getRecorderFactory(&cc)
completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
sched, err := scheduler.New(
ctx,
cc.Client,
cc.InformerFactory,
cc.DynInformerFactory,
recorderFactory,
scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
scheduler.WithKubeConfig(cc.KubeConfig),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
scheduler.WithBuildFrameworkCapturer(func(p kubeschedulerconfig.KubeSchedulerProfile) {
completedProfiles = append(completedProfiles, p)
}),
)
if err != nil { return nil, nil, err }
return &cc, sched, nil
}
4. Clientset 初始化
基于 rest.Config
与 http.Client
生成各资源组客户端:
func NewForConfig(c *rest.Config) (*Clientset, error) {
cfg := *c
if cfg.UserAgent == "" { cfg.UserAgent = rest.DefaultKubernetesUserAgent() }
httpClient, err := rest.HTTPClientFor(&cfg)
if err != nil { return nil, err }
return NewForConfigAndClient(&cfg, httpClient)
}
func NewForConfigAndClient(c *rest.Config, httpClient *http.Client) (*Clientset, error) {
cfg := *c
var cs Clientset
var err error
cs.coreV1, err = corev1.NewForConfigAndClient(&cfg, httpClient)
if err != nil { return nil, err }
cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfigAndClient(&cfg, httpClient)
if err != nil { return nil, err }
return &cs, nil
}
func (c *Clientset) CoreV1() corev1.CoreV1Interface { return c.coreV1 }
5. SharedInformerFactory 与 Pod Informer
func NewInformerFactory(cs clientset.Interface, resync time.Duration) informers.SharedInformerFactory {
f := informers.NewSharedInformerFactory(cs, resync)
f.InformerFor(&v1.Pod{}, newPodInformer)
return f
}
func newPodInformer(cs clientset.Interface, resync time.Duration) cache.SharedIndexInformer {
inf := coreinformers.NewFilteredPodInformer(
cs, metav1.NamespaceAll, resync, cache.Indexers{}, tweakListOptions,
)
inf.SetTransform(trim)
return inf
}
NewFilteredPodInformer
注册 List/Watch 函数,通过 client.CoreV1().Pods(namespace)
与 apiserver 交互。
6. SharedIndexInformer 三要素
- Indexer:本地缓存(线程安全存储 + keyFunc + 多索引);
- Processor:分发器(
sharedProcessor
+ 多个processorListener
); - ListerWatcher:连接 apiserver 的 List/Watch。
func NewSharedIndexInformerWithOptions(lw ListerWatcher, example runtime.Object, opt SharedIndexInformerOptions) SharedIndexInformer {
return &sharedIndexInformer{
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, opt.Indexers),
processor: &sharedProcessor{clock: &clock.RealClock{}},
listerWatcher: lw,
objectType: example,
}
}
7. 调度器 scheduler.New
及 Profile/Framework
scheduler.New
汇集 Extenders、Lister、缓存 snapshot、等待队列 等,并构建 Profile → Framework:
profiles, err := profile.NewMap(
ctx, options.profiles, registry, recorderFactory,
frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithParallelism(int(options.parallelism)),
)
Framework 内部落地各类插件(QueueSort/PreFilter/Filter/PreScore/Score/Reserve/Permit/Bind…),并校验权重、扩展点等。
8. 调度队列:active / backoff / unschedulable
优先队列由三级结构组成:
- activeQ:待调度的 Pod;
- backoffQ:退避中的 Pod(失败后指数退避);
- unschedulablePods:当前不可调度(等待事件触发再次入队)。
func NewPriorityQueue(lessFn framework.LessFunc, f informers.SharedInformerFactory, opts ...Option) *PriorityQueue {
backoffQ := newBackoffQueue(...)
return &PriorityQueue{
backoffQ: backoffQ,
unschedulablePods: newUnschedulablePods(...),
activeQ: newActiveQueue(...),
nsLister: f.Core().V1().Namespaces().Lister(),
nominator: newPodNominator(options.podLister),
}
}
9. 事件处理与监听注册
向 Pod Informer 注册 两个处理器:
- 已调度 Pod(assignedPod):维护调度器缓存(add/update/delete);
- 未调度 Pod(!assigned & responsibleForPod):进调度队列(add/update/delete)。
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok { return responsibleForPod(pod, sched.Profiles) }
return false
default:
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToSchedulingQueue,
UpdateFunc: sched.updatePodInSchedulingQueue,
DeleteFunc: sched.deletePodFromSchedulingQueue,
},
}
10. Run
:启动、同步与(可选)选主
关键步骤:
- 启动 Informers:
InformerFactory.Start(ctx.Done())
; - 等待缓存同步:
WaitForCacheSync
; - 等待 handlers 初始列表投递完成:
sched.WaitForHandlersSync
; - (若启用)LeaderElection 成为 Leader 后启动
sched.Run(ctx)
。
11. Reflector、List/Watch 与 DeltaFIFO
Informer 启动后,内部 controller
运行 Reflector
:
-
ListAndWatch 周期拉取与增量监听;
-
事件进入 DeltaFIFO;
-
processLoop
从队列弹出,调用HandleDeltas
:- 更新本地 Indexer;
- 通过 Processor → Listeners 分发至回调(
OnAdd/OnUpdate/OnDelete
)。
这一套机制驱动 调度队列与缓存 的持续更新。
12. 调度主循环 ScheduleOne
核心:从队列取 Pod → 定位 Framework → 启动一次调度周期。
func (sched *Scheduler) ScheduleOne(ctx context.Context) {
podInfo, _ := sched.NextPod(logger) // = pq.Pop
pod := podInfo.Pod
fwk, _ := sched.frameworkForPod(pod)
state := framework.NewCycleState()
podsToActivate := framework.NewPodsToActivate()
state.Write(framework.PodsToActivateKey, podsToActivate)
result, assumedPodInfo, status := sched.schedulingCycle(ctx, state, fwk, podInfo, start, podsToActivate)
if !status.IsSuccess() { sched.FailureHandler(...); return }
go func() {
status := sched.bindingCycle(ctx, state, fwk, result, assumedPodInfo, start, podsToActivate)
if !status.IsSuccess() { sched.handleBindingCycleError(...) }
}()
}
13. schedulingCycle
:assume / reserve / permit / bind
schedulePod
找到建议节点 → assume → 执行 Reserve/Permit → 激活受影响 Pod:
scheduleResult, _ := sched.SchedulePod(ctx, fwk, state, pod)
assumed := podInfo.DeepCopy().Pod
if err := sched.assume(logger, assumed, scheduleResult.SuggestedHost); err != nil { ... }
if sts := fwk.RunReservePluginsReserve(ctx, state, assumed, scheduleResult.SuggestedHost); !sts.IsSuccess() {
fwk.RunReservePluginsUnreserve(ctx, state, assumed, scheduleResult.SuggestedHost)
sched.Cache.ForgetPod(logger, assumed)
return ...
}
runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumed, scheduleResult.SuggestedHost)
if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
fwk.RunReservePluginsUnreserve(ctx, state, assumed, scheduleResult.SuggestedHost)
sched.Cache.ForgetPod(logger, assumed)
return ...
}
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(logger, podsToActivate.Map)
}
绑定 bind
在 bindingCycle 异步完成。
14. 候选节点与打分
schedulePod
核心:更新快照 → 预选(Filter)→ 优选(Score)→ 选择 Host。
sched.Cache.UpdateSnapshot(logger, sched.nodeInfoSnapshot)
if sched.nodeInfoSnapshot.NumNodes() == 0 { return ErrNoNodesAvailable }
feasible, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
if len(feasible) == 0 { return FitError{Diagnosis: diagnosis} }
priorityList, _ := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasible)
host, _, err := selectHost(priorityList, numberOfHighestScoredNodesToReport)
- 仅 1 个可行节点时直接返回;
- 运行
Score
前会先运行PreScore
; - Extender 的
Prioritize
分值会归一到MaxNodeScore
后与内置打分合并。
15. Assume 与缓存更新
assume
先在 调度缓存 上标记资源占用,使随后决策基于最新“假定”状态:
func (sched *Scheduler) assume(logger klog.Logger, assumed *v1.Pod, host string) error {
assumed.Spec.NodeName = host
if err := sched.Cache.AssumePod(logger, assumed); err != nil { return err }
sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
return nil
}
NodeInfo.update
将 Pod 资源请求量计入 Requested
、端口占用、PVC 计数等,提升后续调度准确性。
16. 细节:抽样比例与起始下标
-
percentageOfNodesToScore
:为提升吞吐,调度器常抽样部分节点评估(默认按节点规模自适应,至少保证阈值)。 -
nextStartNodeIndex
:下一轮从不同起点遍历节点,避免从 0 开始导致的偏置。sched.nextStartNodeIndex = (sched.nextStartNodeIndex + processedNodes) % len(allNodes)
17. 小结
- Informer 负责“感知变化、填充缓存、驱动回调”;
Reflector → DeltaFIFO → Indexer → Processor
形成稳定的资源事件链路。 - 调度器构建:Profile/Framework 搭插件体系,缓存(snapshot & scheduler cache)与队列(active/backoff/unschedulable)协同。
- 一次调度:
Pop → Filter → Score → select → assume → reserve/permit → bind
,中间穿插 诊断信息、退避与激活 机制,既追求低延迟又兼顾公平与稳定。
附录 A|Mermaid 时序图:从 List/Watch 到 Bind
sequenceDiagram
autonumber
participant APIServer as Kube-APIServer
participant Reflector as Reflector
participant FIFO as DeltaFIFO
participant Indexer as Cache(Indexer)
participant Proc as Processor
participant Handlers as SchedulerHandlers
participant Q as SchedulingQueue<br/>(active/backoff/unsched)
participant Sched as Scheduler
participant F as Framework(Plugins)
participant Snap as Snapshot & SchedulerCache
participant Binder as APIServer(Bind)
APIServer-)Reflector: Watch/List Pods/Nodes (增量事件)
Reflector->>FIFO: 推送 Deltas(Added/Updated/Deleted)
Note over FIFO: controller.processLoop Pop
FIFO->>Indexer: HandleDeltas → 更新本地缓存
Indexer-->>Proc: 触发分发
Proc-->>Handlers: OnAdd/OnUpdate/OnDelete
alt 未调度 Pod
Handlers->>Q: Add/Update 到 activeQ
else 已调度 Pod/Node 事件
Handlers->>Snap: add/update/delete 到 SchedulerCache
Handlers->>Q: 根据事件触发 Activate()
end
loop 调度循环
Sched->>Q: Pop() 取出最高优先级 Pod
Sched->>Snap: Cache.UpdateSnapshot()
Sched->>F: RunPreFilter()
Sched->>F: RunFilter() (并行/抽样)
alt 无可行节点
F-->>Sched: FitError / Unschedulable
Sched->>F: RunPostFilter() (可能触发 Preemption)
Sched->>Q: 放入 unschedulable / backoff
else 有可行节点
Sched->>F: RunPreScore() → RunScore() (+Extenders)
Sched->>Sched: 选择最高分节点 selectHost
Sched->>Snap: Assume(pod,node) & 占用资源
Sched->>F: RunReserve() / onFail→Unreserve+Forget
Sched->>F: RunPermit() (Wait/Reject/Success)
alt Permit Wait/Reject
F-->>Sched: Wait/Reject
Sched->>F: Unreserve()
Sched->>Snap: ForgetPod()
Sched->>Q: Requeue→backoff/unsched
else Permit Success
par 异步绑定
Sched->>Binder: Bind(Pod→Node)
Binder-->>Sched: Success/Failure
and 激活可能受影响的 Pods
Sched->>Q: Activate(related pods)
end
alt Bind 失败
Sched->>Snap: ForgetPod()
Sched->>Q: Requeue→backoff
end
end
end
end