Kubernetes-06-Client Go-概览
模块职责
client-go 是 Kubernetes 的官方 Go 客户端库,提供与 Kubernetes API Server 交互的功能。它是开发 Kubernetes 控制器、Operator 和工具的基础库。
核心组件
- RESTClient:低级 HTTP 客户端
- ClientSet:类型化客户端(每种资源一个客户端)
- DynamicClient:动态客户端(支持 CRD)
- Informer:缓存和 Watch 机制
- Workqueue:工作队列(速率限制、去重)
核心组件架构
flowchart TB
subgraph "Client-Go"
subgraph "客户端层"
REST[RESTClient<br/>HTTP 客户端]
CLIENTSET[ClientSet<br/>类型化客户端]
DYNAMIC[DynamicClient<br/>动态客户端]
end
subgraph "缓存层"
INFORMER[Informer<br/>List-Watch + 缓存]
INDEXER[Indexer<br/>本地缓存]
REFLECTOR[Reflector<br/>List-Watch]
end
subgraph "队列层"
WORKQUEUE[Workqueue<br/>工作队列]
RATELIMIT[RateLimiter<br/>速率限制]
end
end
CLIENTSET --> REST
DYNAMIC --> REST
INFORMER --> REFLECTOR
REFLECTOR --> REST
INFORMER --> INDEXER
WORKQUEUE --> RATELIMIT
style INFORMER fill:#FF6B6B,color:#fff
style CLIENTSET fill:#4ECDC4,color:#fff
style WORKQUEUE fill:#45B7D1,color:#fff
核心代码示例
1. 使用 ClientSet
import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
// 创建 ClientSet
config, _ := rest.InClusterConfig()
clientset, _ := kubernetes.NewForConfig(config)
// 获取 Pod 列表
pods, _ := clientset.CoreV1().Pods("default").List(ctx, metav1.ListOptions{})
2. 使用 Informer
import (
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
// 创建 SharedInformerFactory
factory := informers.NewSharedInformerFactory(clientset, 30*time.Minute)
// 获取 Pod Informer
podInformer := factory.Core().V1().Pods()
// 注册事件处理器
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
fmt.Printf("Pod added: %s\n", pod.Name)
},
})
// 启动 Informer
factory.Start(stopCh)
3. 使用 Workqueue
import "k8s.io/client-go/util/workqueue"
// 创建速率限制队列
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// 添加项目
queue.Add("default/nginx")
// 处理项目
for {
key, shutdown := queue.Get()
if shutdown {
break
}
err := processItem(key)
if err != nil {
queue.AddRateLimited(key) // 失败重试
} else {
queue.Forget(key) // 成功
}
queue.Done(key)
}
Informer 机制详解
List-Watch 流程
sequenceDiagram
participant C as Controller
participant INF as Informer
participant R as Reflector
participant API as API Server
participant IDX as Indexer (Cache)
C->>INF: Start()
INF->>R: Run()
Note over R,API: 步骤 1:List(全量同步)
R->>API: LIST /api/v1/pods
API-->>R: Pods (ResourceVersion=12345)
R->>IDX: Replace(pods)
Note over R,API: 步骤 2:Watch(增量同步)
R->>API: WATCH /api/v1/pods?resourceVersion=12345
loop 持续监听
API-->>R: Event{Type: ADDED, Object: Pod}
R->>IDX: Add(pod)
R->>C: OnAdd(pod)
API-->>R: Event{Type: MODIFIED, Object: Pod}
R->>IDX: Update(pod)
R->>C: OnUpdate(oldPod, newPod)
API-->>R: Event{Type: DELETED, Object: Pod}
R->>IDX: Delete(pod)
R->>C: OnDelete(pod)
end
最佳实践
1. 使用 SharedInformerFactory
避免重复 List-Watch(多个控制器共享 Informer)
factory := informers.NewSharedInformerFactory(clientset, 30*time.Minute)
podInformer := factory.Core().V1().Pods()
deploymentInformer := factory.Apps().V1().Deployments()
// 多个控制器使用相同的 Informer
controller1.podLister = podInformer.Lister()
controller2.podLister = podInformer.Lister()
2. 使用 Lister 而非直接调用 API
// ✅ 推荐:从缓存读取(无 API 调用)
pod, _ := podLister.Pods("default").Get("nginx")
// ❌ 不推荐:直接调用 API(每次请求都打 API Server)
pod, _ := clientset.CoreV1().Pods("default").Get(ctx, "nginx", metav1.GetOptions{})
3. 使用 Workqueue 去重
// 多次添加相同 Key,只处理一次
queue.Add("default/nginx")
queue.Add("default/nginx")
queue.Add("default/nginx")
// 只会取出一次
key, _ := queue.Get() // "default/nginx"
Kubernetes-06-Client Go-时序图
时序图概述
本文档提供 Client-Go 的核心场景时序图:
- Informer 启动与缓存同步流程
- 控制器处理对象变更流程
场景 1:Informer 启动与缓存同步流程
sequenceDiagram
autonumber
participant APP as Application
participant FAC as SharedInformerFactory
participant INF as SharedInformer
participant REF as Reflector
participant DF as DeltaFIFO
participant API as API Server
participant CTL as Controller
participant IDX as Indexer (Cache)
participant H as EventHandler
Note over APP,H: 场景:启动 Informer 并同步缓存
APP->>FAC: NewSharedInformerFactory(clientset, 30*time.Minute)
APP->>FAC: factory.Core().V1().Pods()
FAC-->>APP: podInformer
APP->>INF: podInformer.Informer().AddEventHandler(handler)
INF->>INF: 注册 EventHandler
APP->>FAC: factory.Start(stopCh)
FAC->>INF: informer.Run(stopCh)
INF->>REF: reflector.Run(stopCh)
INF->>CTL: controller.Run(stopCh)
Note over REF,API: 步骤 1:List(全量同步)
REF->>API: LIST /api/v1/pods?resourceVersion=0
API-->>REF: PodList (ResourceVersion=12345)<br/>{items: [pod1, pod2, pod3, ...]}
loop 遍历所有 Pod
REF->>DF: Add(pod1)
REF->>DF: Add(pod2)
REF->>DF: Add(pod3)
end
Note over CTL,H: 步骤 2:处理增量(Pop 循环)
loop Controller Pop 循环
CTL->>DF: Pop()
DF-->>CTL: Deltas [{Type: Sync, Object: pod1}]
CTL->>IDX: Add(pod1)
IDX->>IDX: 更新本地缓存<br/>索引:namespace -> [pod1]
CTL->>H: OnAdd(pod1)
H->>H: 处理 Pod 添加事件
end
Note over INF: 缓存同步完成(HasSynced=true)
INF-->>APP: 返回(HasSynced=true)
Note over REF,API: 步骤 3:Watch(增量同步)
REF->>API: WATCH /api/v1/pods?resourceVersion=12345
loop 持续监听
API-->>REF: Event{Type: ADDED, Object: pod4}
REF->>DF: Add(pod4)
CTL->>DF: Pop()
DF-->>CTL: Deltas [{Type: Added, Object: pod4}]
CTL->>IDX: Add(pod4)
CTL->>H: OnAdd(pod4)
API-->>REF: Event{Type: MODIFIED, Object: pod1}
REF->>DF: Update(pod1)
CTL->>DF: Pop()
DF-->>CTL: Deltas [{Type: Updated, Object: pod1}]
CTL->>IDX: Update(pod1)
CTL->>H: OnUpdate(oldPod1, pod1)
API-->>REF: Event{Type: DELETED, Object: pod2}
REF->>DF: Delete(pod2)
CTL->>DF: Pop()
DF-->>CTL: Deltas [{Type: Deleted, Object: pod2}]
CTL->>IDX: Delete(pod2)
CTL->>H: OnDelete(pod2)
end
要点说明
1. List-Watch 机制
- List:全量同步(获取当前所有对象)
- Watch:增量同步(监听后续变化)
- ResourceVersion:Watch 从 List 返回的 ResourceVersion 开始
2. DeltaFIFO 的作用
- 解耦:Reflector 生产增量,Controller 消费增量
- 去重:相同对象的多次变更合并
- 顺序:保证同一对象的变更按顺序处理
3. 缓存同步标志(HasSynced)
// 等待缓存同步
cache.WaitForCacheSync(stopCh, informer.HasSynced)
// 同步后才能使用 Lister(避免读取到不完整数据)
if !informer.HasSynced() {
return fmt.Errorf("cache not synced")
}
场景 2:控制器处理对象变更流程
sequenceDiagram
autonumber
participant INF as Informer
participant H as EventHandler
participant WQ as Workqueue
participant W as Worker
participant L as Lister (Cache)
participant API as API Server
Note over INF,API: 场景:Pod 状态变更,控制器处理
INF->>H: OnUpdate(oldPod, newPod)
H->>H: 检查是否需要处理<br/>(如 Phase 变化)
alt 需要处理
H->>H: 提取 Key<br/>key = "default/nginx"
H->>WQ: queue.Add("default/nginx")
WQ->>WQ: 去重检查<br/>(如果已在队列,跳过)
end
Note over W,API: Worker 从队列取出 Key
W->>WQ: queue.Get()
WQ-->>W: key = "default/nginx"
W->>W: 解析 Key<br/>namespace="default", name="nginx"
W->>L: lister.Pods("default").Get("nginx")
L-->>W: pod(从本地缓存读取)
W->>W: syncPod(pod)<br/>执行业务逻辑
alt 处理成功
W->>WQ: queue.Forget("default/nginx")
WQ->>WQ: 重置速率限制计数器
W->>WQ: queue.Done("default/nginx")
WQ->>WQ: 从 processing 移除
else 处理失败(临时错误)
W->>WQ: queue.AddRateLimited("default/nginx")
WQ->>WQ: 计算退避时间<br/>delay = 5ms * 2^(retries-1)
WQ->>WQ: 延迟后重新入队
W->>WQ: queue.Done("default/nginx")
else 处理失败(永久错误)
W->>W: 记录错误日志
W->>WQ: queue.Forget("default/nginx")
W->>WQ: queue.Done("default/nginx")
end
Note over W: 继续处理下一个 Key
W->>WQ: queue.Get()
要点说明
1. 为什么需要 Workqueue?
问题:直接在 EventHandler 中处理
// ❌ 不推荐:EventHandler 阻塞会影响 Informer
AddFunc: func(obj interface{}) {
// 如果处理失败,无法重试
processObject(obj) // 阻塞 5 秒
}
解决:通过 Workqueue 异步处理
// ✅ 推荐:EventHandler 只添加到队列(非阻塞)
AddFunc: func(obj interface{}) {
key, _ := cache.MetaNamespaceKeyFunc(obj)
queue.Add(key) // 立即返回
}
// Worker 异步处理
func worker() {
for {
key, _ := queue.Get()
err := processKey(key)
if err != nil {
queue.AddRateLimited(key) // 失败重试
} else {
queue.Forget(key)
}
queue.Done(key)
}
}
2. 速率限制(Rate Limiting)
指数退避示例:
重试 1:延迟 5ms
重试 2:延迟 10ms
重试 3:延迟 20ms
重试 4:延迟 40ms
重试 5:延迟 80ms
...
重试 N:延迟 min(5ms * 2^(N-1), 1000s)
3. 使用 Lister 而非 ClientSet
// ✅ 推荐:从缓存读取(无 API 调用)
pod, err := lister.Pods("default").Get("nginx")
// ❌ 不推荐:直接调用 API(每次请求都打 API Server)
pod, err := clientset.CoreV1().Pods("default").Get(ctx, "nginx", metav1.GetOptions{})
原因:
- 性能:缓存读取 < 1μs,API 调用 > 1ms
- 负载:避免对 API Server 造成压力
- 一致性:Lister 保证读取到的是 Informer 同步的数据
完整控制器示例
flowchart TB
START([开始]) --> INIT[初始化 ClientSet 和 Informer]
INIT --> HANDLER[注册 EventHandler]
HANDLER --> START_INF[启动 Informer]
START_INF --> WAIT[等待缓存同步]
WAIT --> WORKERS[启动多个 Worker]
WORKERS --> GET[Worker: Get() 从队列取出 Key]
GET --> LISTER[Lister 从缓存获取对象]
LISTER --> PROCESS{处理对象}
PROCESS -->|成功| FORGET[queue.Forget()]
PROCESS -->|失败| RETRY[queue.AddRateLimited()]
FORGET --> DONE[queue.Done()]
RETRY --> DONE
DONE --> GET
style PROCESS fill:#FF6B6B,color:#fff
style LISTER fill:#4ECDC4,color:#fff
性能指标
关键指标
| 指标 | 类型 | 说明 |
|---|---|---|
workqueue_depth |
Gauge | 队列当前深度 |
workqueue_adds_total |
Counter | 累计添加次数 |
workqueue_retries_total |
Counter | 累计重试次数 |
workqueue_work_duration_seconds |
Histogram | 处理延迟 |
Kubernetes-06-Client Go-数据结构
数据结构概述
Client-Go 的核心数据结构围绕 缓存和 Watch 机制 设计:
- Reflector:List-Watch 执行器
- DeltaFIFO:增量队列
- Indexer:索引缓存
- Workqueue:工作队列
核心数据结构 UML 图
classDiagram
class SharedInformer {
+Reflector reflector
+Controller controller
+Indexer indexer
+AddEventHandler(handler) error
+Run(stopCh)
}
class Reflector {
+ListerWatcher listerWatcher
+Store store
+ListAndWatch()
}
class DeltaFIFO {
+map~string~[]Delta items
+Add(obj) error
+Update(obj) error
+Delete(obj) error
+Pop() (Delta, error)
}
class Indexer {
+ThreadSafeStore store
+Add(obj) error
+Update(obj) error
+Delete(obj) error
+Get(obj) (interface{}, bool)
+List() []interface{}
}
class Workqueue {
+queue []interface{}
+dirty set
+processing set
+Add(item)
+Get() (item, shutdown)
+Done(item)
}
SharedInformer "1" --> "1" Reflector : uses
SharedInformer "1" --> "1" Indexer : uses
Reflector "1" --> "1" DeltaFIFO : writes to
DeltaFIFO "1" --> "1" Indexer : updates
数据结构详解
1. Reflector(List-Watch 执行器)
// staging/src/k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
// ListerWatcher 用于 List 和 Watch
listerWatcher ListerWatcher
// Store 存储对象(通常是 DeltaFIFO)
store Store
// expectedType 期望的对象类型
expectedType reflect.Type
// resyncPeriod 重新同步周期
resyncPeriod time.Duration
// lastSyncResourceVersion 最后同步的 ResourceVersion
lastSyncResourceVersion string
}
核心方法:
// ListAndWatch 执行 List 和 Watch
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// 1. List 全量数据
list, resourceVersion, err := r.listerWatcher.List(options)
items, _ := meta.ExtractList(list)
// 2. 将数据同步到 Store
r.syncWith(items, resourceVersion)
// 3. Watch 增量数据
for {
w, err := r.listerWatcher.Watch(options)
for event := range w.ResultChan() {
switch event.Type {
case watch.Added:
r.store.Add(event.Object)
case watch.Modified:
r.store.Update(event.Object)
case watch.Deleted:
r.store.Delete(event.Object)
}
}
}
}
2. DeltaFIFO(增量队列)
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
// items 存储对象的增量变化
items map[string][]Delta
// queue 对象 Key 的队列
queue []string
// keyFunc 从对象提取 Key
keyFunc KeyFunc
}
type Delta struct {
Type DeltaType // Added / Updated / Deleted / Sync
Object interface{} // 对象本身
}
type DeltaType string
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Sync DeltaType = "Sync" // 重新同步
)
工作原理:
// Add 添加增量
func (f *DeltaFIFO) Add(obj interface{}) error {
key, _ := f.keyFunc(obj)
// 追加到增量列表
f.items[key] = append(f.items[key], Delta{Type: Added, Object: obj})
// 如果 Key 不在队列中,加入队列
if !f.inQueue(key) {
f.queue = append(f.queue, key)
}
return nil
}
// Pop 弹出增量
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
// 1. 从队列头部取出 Key
key := f.queue[0]
f.queue = f.queue[1:]
// 2. 获取该 Key 的所有增量
deltas := f.items[key]
delete(f.items, key)
// 3. 处理增量
return process(deltas)
}
3. Indexer(索引缓存)
// staging/src/k8s.io/client-go/tools/cache/index.go
type Indexer interface {
Store
// Index 根据索引名称和对象返回匹配的对象列表
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys 根据索引名称和索引键返回对象 Key 列表
IndexKeys(indexName, indexKey string) ([]string, error)
// ListIndexFuncValues 列出索引的所有值
ListIndexFuncValues(indexName string) []string
// ByIndex 根据索引名称和索引键返回对象列表
ByIndex(indexName, indexKey string) ([]interface{}, error)
}
type Indexers map[string]IndexFunc
type IndexFunc func(obj interface{}) ([]string, error)
默认索引(Namespace):
// MetaNamespaceIndexFunc 默认索引函数(按 Namespace 索引)
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, _ := meta.Accessor(obj)
return []string{meta.GetNamespace()}, nil
}
// 使用示例
indexer.ByIndex("namespace", "default") // 返回所有 default namespace 的对象
4. Workqueue(工作队列)
// staging/src/k8s.io/client-go/util/workqueue/queue.go
type Type struct {
// queue 有序队列
queue []t
// dirty 脏集合(等待加入队列的元素)
dirty set
// processing 正在处理的元素
processing set
}
状态机:
[空闲] --Add()--> [dirty] --Get()--> [processing] --Done()--> [空闲]
↓ ↓
[queue] (如果 dirty 中存在,重新入队)
去重机制:
// Add 添加元素
func (q *Type) Add(item interface{}) {
// 1. 如果正在处理,只标记为 dirty(处理完后会重新入队)
if q.processing.has(item) {
q.dirty.insert(item)
return
}
// 2. 如果已在 dirty,跳过(避免重复)
if q.dirty.has(item) {
return
}
// 3. 标记为 dirty 并入队
q.dirty.insert(item)
q.queue = append(q.queue, item)
}
// Get 取出元素
func (q *Type) Get() (item interface{}, shutdown bool) {
item = q.queue[0]
q.queue = q.queue[1:]
// 从 dirty 移除,加入 processing
q.dirty.delete(item)
q.processing.insert(item)
return item, false
}
// Done 完成处理
func (q *Type) Done(item interface{}) {
q.processing.delete(item)
// 如果在 dirty 中(处理期间又被添加),重新入队
if q.dirty.has(item) {
q.queue = append(q.queue, item)
}
}
5. RateLimiter(速率限制器)
// staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go
type RateLimiter interface {
When(item interface{}) time.Duration
Forget(item interface{})
NumRequeues(item interface{}) int
}
// DefaultControllerRateLimiter 默认速率限制器(组合多种策略)
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
// 1. 指数退避(初始 5ms,最大 1000s)
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 2. 令牌桶(每秒 10 个,桶容量 100)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
指数退避算法:
重试次数 | 延迟时间
--------|----------
1 | 5ms
2 | 10ms
3 | 20ms
4 | 40ms
5 | 80ms
... | ...
N | min(5ms * 2^(N-1), 1000s)
Informer 工作流程
sequenceDiagram
participant R as Reflector
participant DF as DeltaFIFO
participant C as Controller
participant IDX as Indexer
participant H as EventHandler
Note over R,H: 启动阶段
R->>R: List 全量数据
R->>DF: Add(obj1), Add(obj2), ...
loop Pop 循环
C->>DF: Pop()
DF-->>C: Deltas [{Type: Added, Object: obj1}]
C->>IDX: Add(obj1)
C->>H: OnAdd(obj1)
end
Note over R,H: Watch 阶段
R->>R: Watch 增量数据
R->>DF: Update(obj1)
C->>DF: Pop()
DF-->>C: Deltas [{Type: Updated, Object: obj1}]
C->>IDX: Update(obj1)
C->>H: OnUpdate(oldObj1, obj1)
性能与容量考虑
1. Informer 缓存大小
内存占用:
- 每个对象:约 1-5 KB(取决于对象大小)
- 10000 个 Pod ≈ 10-50 MB
2. Workqueue 容量
默认配置:
- 无容量限制(内存允许范围内)
- 建议监控队列长度(
workqueue_depth)