kubernetes源码剖析:kube-scheduler

目录


1. 调度整体观

  • 输入pod.spec.nodeName 为空的 Pod。
  • 过程:经调度框架(预选 Filter → 优选 Score)从集群 Nodes 中挑选一个可行节点。
  • 输出:回写 pod.spec.nodeName(经 assumereserve/permitbind),最终持久化到 etcd。

2. 入口命令与 runCommand

借助 cobra,入口命令创建并执行:

func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
    opts := options.NewOptions()
    return &cobra.Command{
        Use: "kube-scheduler",
        RunE: func(cmd *cobra.Command, args []string) error {
            return runCommand(cmd, opts, registryOptions...)
        },
    }
}

func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
    cc, sched, err := Setup(ctx, opts, registryOptions...)
    if err != nil { return err }
    return Run(ctx, cc, sched)
}

3. Setup:构造 Scheduler 与依赖

opts.Config 创建 kubeClientInformerFactoryDynInformerFactory,以 scheduler.With* 选项配置调度器:

func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (
    *schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error,
) {
    c, err := opts.Config(ctx)
    if err != nil { return nil, nil, err }
    cc := c.Complete()

    recorderFactory := getRecorderFactory(&cc)
    completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)

    sched, err := scheduler.New(
        ctx,
        cc.Client,
        cc.InformerFactory,
        cc.DynInformerFactory,
        recorderFactory,
        scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
        scheduler.WithKubeConfig(cc.KubeConfig),
        scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
        scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
        scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
        scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration),
        scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
        scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
        scheduler.WithBuildFrameworkCapturer(func(p kubeschedulerconfig.KubeSchedulerProfile) {
            completedProfiles = append(completedProfiles, p)
        }),
    )
    if err != nil { return nil, nil, err }
    return &cc, sched, nil
}

4. Clientset 初始化

基于 rest.Confighttp.Client 生成各资源组客户端:

func NewForConfig(c *rest.Config) (*Clientset, error) {
    cfg := *c
    if cfg.UserAgent == "" { cfg.UserAgent = rest.DefaultKubernetesUserAgent() }
    httpClient, err := rest.HTTPClientFor(&cfg)
    if err != nil { return nil, err }
    return NewForConfigAndClient(&cfg, httpClient)
}

func NewForConfigAndClient(c *rest.Config, httpClient *http.Client) (*Clientset, error) {
    cfg := *c
    var cs Clientset
    var err error
    cs.coreV1, err = corev1.NewForConfigAndClient(&cfg, httpClient)
    if err != nil { return nil, err }
    cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfigAndClient(&cfg, httpClient)
    if err != nil { return nil, err }
    return &cs, nil
}

func (c *Clientset) CoreV1() corev1.CoreV1Interface { return c.coreV1 }

5. SharedInformerFactory 与 Pod Informer

func NewInformerFactory(cs clientset.Interface, resync time.Duration) informers.SharedInformerFactory {
    f := informers.NewSharedInformerFactory(cs, resync)
    f.InformerFor(&v1.Pod{}, newPodInformer)
    return f
}

func newPodInformer(cs clientset.Interface, resync time.Duration) cache.SharedIndexInformer {
    inf := coreinformers.NewFilteredPodInformer(
        cs, metav1.NamespaceAll, resync, cache.Indexers{}, tweakListOptions,
    )
    inf.SetTransform(trim)
    return inf
}

NewFilteredPodInformer 注册 List/Watch 函数,通过 client.CoreV1().Pods(namespace) 与 apiserver 交互。


6. SharedIndexInformer 三要素

  • Indexer:本地缓存(线程安全存储 + keyFunc + 多索引);
  • Processor:分发器(sharedProcessor + 多个 processorListener);
  • ListerWatcher:连接 apiserver 的 List/Watch。
func NewSharedIndexInformerWithOptions(lw ListerWatcher, example runtime.Object, opt SharedIndexInformerOptions) SharedIndexInformer {
    return &sharedIndexInformer{
        indexer:       NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, opt.Indexers),
        processor:     &sharedProcessor{clock: &clock.RealClock{}},
        listerWatcher: lw,
        objectType:    example,
    }
}

7. 调度器 scheduler.New 及 Profile/Framework

scheduler.New 汇集 ExtendersLister缓存 snapshot等待队列 等,并构建 Profile → Framework

profiles, err := profile.NewMap(
    ctx, options.profiles, registry, recorderFactory,
    frameworkruntime.WithClientSet(client),
    frameworkruntime.WithKubeConfig(options.kubeConfig),
    frameworkruntime.WithInformerFactory(informerFactory),
    frameworkruntime.WithSnapshotSharedLister(snapshot),
    frameworkruntime.WithExtenders(extenders),
    frameworkruntime.WithParallelism(int(options.parallelism)),
)

Framework 内部落地各类插件(QueueSort/PreFilter/Filter/PreScore/Score/Reserve/Permit/Bind…),并校验权重、扩展点等。


8. 调度队列:active / backoff / unschedulable

优先队列由三级结构组成:

  • activeQ:待调度的 Pod;
  • backoffQ:退避中的 Pod(失败后指数退避);
  • unschedulablePods:当前不可调度(等待事件触发再次入队)。
func NewPriorityQueue(lessFn framework.LessFunc, f informers.SharedInformerFactory, opts ...Option) *PriorityQueue {
    backoffQ := newBackoffQueue(...)
    return &PriorityQueue{
        backoffQ:          backoffQ,
        unschedulablePods: newUnschedulablePods(...),
        activeQ:           newActiveQueue(...),
        nsLister:          f.Core().V1().Namespaces().Lister(),
        nominator:         newPodNominator(options.podLister),
    }
}

9. 事件处理与监听注册

向 Pod Informer 注册 两个处理器:

  • 已调度 Pod(assignedPod):维护调度器缓存(add/update/delete);
  • 未调度 Pod(!assigned & responsibleForPod):进调度队列(add/update/delete)。
cache.FilteringResourceEventHandler{
    FilterFunc: func(obj interface{}) bool {
        switch t := obj.(type) {
        case *v1.Pod:
            return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
        case cache.DeletedFinalStateUnknown:
            if pod, ok := t.Obj.(*v1.Pod); ok { return responsibleForPod(pod, sched.Profiles) }
            return false
        default:
            return false
        }
    },
    Handler: cache.ResourceEventHandlerFuncs{
        AddFunc:    sched.addPodToSchedulingQueue,
        UpdateFunc: sched.updatePodInSchedulingQueue,
        DeleteFunc: sched.deletePodFromSchedulingQueue,
    },
}

10. Run:启动、同步与(可选)选主

关键步骤:

  1. 启动 InformersInformerFactory.Start(ctx.Done())
  2. 等待缓存同步WaitForCacheSync
  3. 等待 handlers 初始列表投递完成sched.WaitForHandlersSync
  4. (若启用)LeaderElection 成为 Leader 后启动 sched.Run(ctx)

11. Reflector、List/Watch 与 DeltaFIFO

Informer 启动后,内部 controller 运行 Reflector

  • ListAndWatch 周期拉取与增量监听;

  • 事件进入 DeltaFIFO

  • processLoop 从队列弹出,调用 HandleDeltas

    • 更新本地 Indexer
    • 通过 Processor → Listeners 分发至回调(OnAdd/OnUpdate/OnDelete)。

这一套机制驱动 调度队列与缓存 的持续更新。


12. 调度主循环 ScheduleOne

核心:从队列取 Pod → 定位 Framework → 启动一次调度周期。

func (sched *Scheduler) ScheduleOne(ctx context.Context) {
    podInfo, _ := sched.NextPod(logger) // = pq.Pop
    pod := podInfo.Pod
    fwk, _ := sched.frameworkForPod(pod)

    state := framework.NewCycleState()
    podsToActivate := framework.NewPodsToActivate()
    state.Write(framework.PodsToActivateKey, podsToActivate)

    result, assumedPodInfo, status := sched.schedulingCycle(ctx, state, fwk, podInfo, start, podsToActivate)
    if !status.IsSuccess() { sched.FailureHandler(...); return }

    go func() {
        status := sched.bindingCycle(ctx, state, fwk, result, assumedPodInfo, start, podsToActivate)
        if !status.IsSuccess() { sched.handleBindingCycleError(...) }
    }()
}

13. schedulingCycle:assume / reserve / permit / bind

schedulePod 找到建议节点 → assume → 执行 Reserve/Permit → 激活受影响 Pod:

scheduleResult, _ := sched.SchedulePod(ctx, fwk, state, pod)
assumed := podInfo.DeepCopy().Pod

if err := sched.assume(logger, assumed, scheduleResult.SuggestedHost); err != nil { ... }

if sts := fwk.RunReservePluginsReserve(ctx, state, assumed, scheduleResult.SuggestedHost); !sts.IsSuccess() {
    fwk.RunReservePluginsUnreserve(ctx, state, assumed, scheduleResult.SuggestedHost)
    sched.Cache.ForgetPod(logger, assumed)
    return ...
}

runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumed, scheduleResult.SuggestedHost)
if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
    fwk.RunReservePluginsUnreserve(ctx, state, assumed, scheduleResult.SuggestedHost)
    sched.Cache.ForgetPod(logger, assumed)
    return ...
}

if len(podsToActivate.Map) != 0 {
    sched.SchedulingQueue.Activate(logger, podsToActivate.Map)
}

绑定 bindbindingCycle 异步完成。


14. 候选节点与打分

schedulePod 核心:更新快照 → 预选(Filter)→ 优选(Score)→ 选择 Host

sched.Cache.UpdateSnapshot(logger, sched.nodeInfoSnapshot)
if sched.nodeInfoSnapshot.NumNodes() == 0 { return ErrNoNodesAvailable }

feasible, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
if len(feasible) == 0 { return FitError{Diagnosis: diagnosis} }

priorityList, _ := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasible)
host, _, err := selectHost(priorityList, numberOfHighestScoredNodesToReport)
  • 仅 1 个可行节点时直接返回;
  • 运行 Score 前会先运行 PreScore
  • Extender 的 Prioritize 分值会归一到 MaxNodeScore 后与内置打分合并。

15. Assume 与缓存更新

assume 先在 调度缓存 上标记资源占用,使随后决策基于最新“假定”状态:

func (sched *Scheduler) assume(logger klog.Logger, assumed *v1.Pod, host string) error {
    assumed.Spec.NodeName = host
    if err := sched.Cache.AssumePod(logger, assumed); err != nil { return err }
    sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
    return nil
}

NodeInfo.update 将 Pod 资源请求量计入 Requested、端口占用、PVC 计数等,提升后续调度准确性。


16. 细节:抽样比例与起始下标

  • percentageOfNodesToScore:为提升吞吐,调度器常抽样部分节点评估(默认按节点规模自适应,至少保证阈值)。

  • nextStartNodeIndex:下一轮从不同起点遍历节点,避免从 0 开始导致的偏置。

    sched.nextStartNodeIndex = (sched.nextStartNodeIndex + processedNodes) % len(allNodes)
    

17. 小结

  • Informer 负责“感知变化、填充缓存、驱动回调”Reflector → DeltaFIFO → Indexer → Processor 形成稳定的资源事件链路。
  • 调度器构建:Profile/Framework 搭插件体系,缓存(snapshot & scheduler cache)与队列(active/backoff/unschedulable)协同。
  • 一次调度Pop → Filter → Score → select → assume → reserve/permit → bind,中间穿插 诊断信息退避与激活 机制,既追求低延迟又兼顾公平与稳定

附录 A|Mermaid 时序图:从 List/Watch 到 Bind

sequenceDiagram
    autonumber
    participant APIServer as Kube-APIServer
    participant Reflector as Reflector
    participant FIFO as DeltaFIFO
    participant Indexer as Cache(Indexer)
    participant Proc as Processor
    participant Handlers as SchedulerHandlers
    participant Q as SchedulingQueue<br/>(active/backoff/unsched)
    participant Sched as Scheduler
    participant F as Framework(Plugins)
    participant Snap as Snapshot & SchedulerCache
    participant Binder as APIServer(Bind)

    APIServer-)Reflector: Watch/List Pods/Nodes (增量事件)
    Reflector->>FIFO: 推送 Deltas(Added/Updated/Deleted)
    Note over FIFO: controller.processLoop Pop

    FIFO->>Indexer: HandleDeltas → 更新本地缓存
    Indexer-->>Proc: 触发分发
    Proc-->>Handlers: OnAdd/OnUpdate/OnDelete

    alt 未调度 Pod
      Handlers->>Q: Add/Update 到 activeQ
    else 已调度 Pod/Node 事件
      Handlers->>Snap: add/update/delete 到 SchedulerCache
      Handlers->>Q: 根据事件触发 Activate()
    end

    loop 调度循环
      Sched->>Q: Pop() 取出最高优先级 Pod
      Sched->>Snap: Cache.UpdateSnapshot()
      Sched->>F: RunPreFilter()
      Sched->>F: RunFilter() (并行/抽样)
      alt 无可行节点
        F-->>Sched: FitError / Unschedulable
        Sched->>F: RunPostFilter() (可能触发 Preemption)
        Sched->>Q: 放入 unschedulable / backoff
      else 有可行节点
        Sched->>F: RunPreScore() → RunScore() (+Extenders)
        Sched->>Sched: 选择最高分节点 selectHost
        Sched->>Snap: Assume(pod,node) & 占用资源
        Sched->>F: RunReserve() / onFail→Unreserve+Forget
        Sched->>F: RunPermit() (Wait/Reject/Success)
        alt Permit Wait/Reject
          F-->>Sched: Wait/Reject
          Sched->>F: Unreserve()
          Sched->>Snap: ForgetPod()
          Sched->>Q: Requeue→backoff/unsched
        else Permit Success
          par 异步绑定
            Sched->>Binder: Bind(Pod→Node)
            Binder-->>Sched: Success/Failure
          and 激活可能受影响的 Pods
            Sched->>Q: Activate(related pods)
          end
          alt Bind 失败
            Sched->>Snap: ForgetPod()
            Sched->>Q: Requeue→backoff
          end
        end
      end
    end