概述

kube-proxy是Kubernetes集群中每个节点上运行的网络代理,负责实现Service的网络规则,提供集群内部的服务发现和负载均衡功能。它监听API服务器上Service和Endpoints的变化,并相应地配置网络规则来实现流量转发。本文将深入分析kube-proxy的源码实现,揭示其精妙的网络处理机制。

1. kube-proxy架构设计

1.1 整体架构概览

graph TB subgraph "kube-proxy 核心架构" subgraph "主控制器" MAIN[Proxy Main
主控制器
- 启动协调
- 配置管理
- 模式选择
- 优雅关闭] CONFIG_SYNC[Config Sync
配置同步
- Service监听
- Endpoints监听
- 配置变更
- 状态同步] end subgraph "代理模式" IPTABLES_MODE[iptables Mode
iptables模式
- 规则链管理
- DNAT转换
- 连接跟踪
- 随机负载均衡] IPVS_MODE[IPVS Mode
IPVS模式
- 虚拟服务器
- 真实服务器
- 调度算法
- 高性能转发] USERSPACE_MODE[Userspace Mode
用户空间模式
- 用户态代理
- 轮询负载均衡
- 协议处理
- 调试友好] end subgraph "规则管理" IPTABLES_MGR[iptables Manager
iptables规则管理
- 规则同步
- 链管理
- 原子更新
- 规则优化] IPVS_MGR[IPVS Manager
IPVS规则管理
- 虚拟服务管理
- 真实服务器管理
- 调度算法配置
- 健康检查] IPSET_MGR[ipset Manager
IP集合管理
- 地址集合
- 端口集合
- 高效匹配
- 动态更新] end subgraph "服务发现" SERVICE_HANDLER[Service Handler
服务处理器
- 服务监听
- 规则生成
- 负载均衡
- 会话亲和性] ENDPOINT_HANDLER[Endpoint Handler
端点处理器
- 后端发现
- 健康检查
- 权重分配
- 故障检测] end subgraph "网络处理" CONNECTION_TRACKER[Connection Tracker
连接跟踪
- 会话状态
- NAT映射
- 超时管理
- 状态同步] LOAD_BALANCER[Load Balancer
负载均衡器
- 算法选择
- 权重分配
- 健康检查
- 故障转移] end subgraph "监控和调试" METRICS[Metrics Collector
指标收集器
- 代理指标
- 性能统计
- 错误计数
- 健康状态] HEALTH_CHECK[Health Checker
健康检查器
- 端点健康
- 服务可用性
- 故障恢复
- 状态上报] end end %% 组件间连接关系 MAIN --> CONFIG_SYNC CONFIG_SYNC --> SERVICE_HANDLER CONFIG_SYNC --> ENDPOINT_HANDLER SERVICE_HANDLER --> IPTABLES_MODE SERVICE_HANDLER --> IPVS_MODE SERVICE_HANDLER --> USERSPACE_MODE IPTABLES_MODE --> IPTABLES_MGR IPVS_MODE --> IPVS_MGR IPVS_MODE --> IPSET_MGR IPTABLES_MGR --> CONNECTION_TRACKER IPVS_MGR --> LOAD_BALANCER CONFIG_SYNC --> METRICS CONFIG_SYNC --> HEALTH_CHECK %% 样式定义 classDef mainController fill:#e1f5fe,stroke:#01579b,stroke-width:2px classDef proxyMode fill:#f3e5f5,stroke:#4a148c,stroke-width:2px classDef ruleManager fill:#e8f5e8,stroke:#1b5e20,stroke-width:2px classDef serviceDiscovery fill:#fff3e0,stroke:#e65100,stroke-width:2px classDef networkProcessing fill:#fce4ec,stroke:#880e4f,stroke-width:2px classDef monitoring fill:#e0f2f1,stroke:#004d40,stroke-width:2px class MAIN,CONFIG_SYNC mainController class IPTABLES_MODE,IPVS_MODE,USERSPACE_MODE proxyMode class IPTABLES_MGR,IPVS_MGR,IPSET_MGR ruleManager class SERVICE_HANDLER,ENDPOINT_HANDLER serviceDiscovery class CONNECTION_TRACKER,LOAD_BALANCER networkProcessing class METRICS,HEALTH_CHECK monitoring

1.2 核心数据结构

1.2.1 Proxy主结构体

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Proxier 代理器接口定义
// 文件路径: pkg/proxy/types.go
type Proxier interface {
    // Sync 同步代理规则
    // 立即同步所有代理规则到底层实现
    Sync()
    
    // SyncLoop 运行同步循环
    // 定期同步代理规则,处理配置变更
    SyncLoop()
    
    // OnServiceAdd 处理Service添加事件
    OnServiceAdd(service *v1.Service)
    
    // OnServiceUpdate 处理Service更新事件  
    OnServiceUpdate(oldService, service *v1.Service)
    
    // OnServiceDelete 处理Service删除事件
    OnServiceDelete(service *v1.Service)
    
    // OnServiceSynced 处理Service同步完成事件
    OnServiceSynced()
    
    // OnEndpointsAdd 处理Endpoints添加事件
    OnEndpointsAdd(endpoints *v1.Endpoints)
    
    // OnEndpointsUpdate 处理Endpoints更新事件
    OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints)
    
    // OnEndpointsDelete 处理Endpoints删除事件
    OnEndpointsDelete(endpoints *v1.Endpoints)
    
    // OnEndpointsSynced 处理Endpoints同步完成事件
    OnEndpointsSynced()
}

// ProxyProvider 代理提供者接口
type ProxyProvider interface {
    // NewProxier 创建新的代理器实例
    NewProxier(
        ipt utiliptables.Interface,
        ipvs utilipvs.Interface,
        ipset utilipset.Interface,
        sysctl utilsysctl.Interface,
        exec utilexec.Interface,
        syncPeriod time.Duration,
        minSyncPeriod time.Duration,
        excludeCIDRs []string,
        strictARP bool,
        tcpTimeout time.Duration,
        tcpFinTimeout time.Duration,
        udpTimeout time.Duration,
        masqueradeAll bool,
        masqueradeBit int,
        localDetector proxyutiliptables.LocalTrafficDetector,
        hostname string,
        nodeIP net.IP,
        recorder events.EventRecorder,
        healthzServer healthcheck.ProxierHealthUpdater,
        scheduler string,
    ) (Proxier, error)
    
    // CleanupLeftovers 清理遗留的代理规则
    CleanupLeftovers(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface) (encounteredError bool)
}

1.2.2 iptables代理器结构体

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// Proxier iptables代理器实现
// 文件路径: pkg/proxy/iptables/proxier.go
type Proxier struct {
    // 基础配置
    hostname        string
    nodeIP          net.IP
    recorder        events.EventRecorder
    healthChecker   healthcheck.ProxierHealthUpdater
    healthzServer   healthcheck.ProxierHealthUpdater
    
    // iptables接口
    iptables utiliptables.Interface
    
    // 同步配置
    syncPeriod    time.Duration
    minSyncPeriod time.Duration
    
    // 伪装配置
    masqueradeAll bool
    masqueradeBit int
    
    // 本地流量检测器
    localDetector proxyutiliptables.LocalTrafficDetector
    
    // 服务和端点映射
    serviceMap proxyutil.ServiceMap
    endpointsMap proxyutil.EndpointsMap
    
    // 端口映射
    nodePortAddresses []string
    
    // 网络配置
    clusterCIDR string
    
    // 同步控制
    syncRunner      *async.BoundedFrequencyRunner
    syncRunnerMutex sync.Mutex
    
    // 规则缓存
    iptablesData    *bytes.Buffer
    existingFilterChains map[utiliptables.Chain][]byte
    filterChains    map[utiliptables.Chain][]byte
    filterRules     map[utiliptables.Chain][]byte
    natChains       map[utiliptables.Chain][]byte
    natRules        map[utiliptables.Chain][]byte
    
    // 初始化状态
    initialized int32
    syncPeriodCh chan struct{}
    
    // 优雅关闭
    stopCh <-chan struct{}
    
    // 指标统计
    syncProxyRulesLatency prometheus.Summary
    deprecatedSyncProxyRulesLatency prometheus.Summary
    networkProgrammingLatency prometheus.Histogram
}

// ServicePortName Service端口名称结构
type ServicePortName struct {
    types.NamespacedName
    Port     string
    Protocol v1.Protocol
}

// ServiceInfo Service信息结构
type ServiceInfo struct {
    *BaseServiceInfo
    // iptables特定字段
    clusterIP                net.IP
    port                     int
    nodePort                 int
    loadBalancerStatus       v1.LoadBalancerStatus
    sessionAffinityType      v1.ServiceAffinity
    stickyMaxAgeSeconds      int
    externalIPs              []string
    loadBalancerSourceRanges []string
    onlyNodeLocalEndpoints   bool
    topologyKeys             []string
    externalPolicyLocal      bool
    internalPolicyLocal      bool
    hintsAnnotation          string
}

2. 启动流程深度解析

2.1 main函数入口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// main kube-proxy主函数入口
// 文件路径: cmd/kube-proxy/proxy.go
func main() {
    // 创建kube-proxy命令对象
    command := app.NewProxyCommand()
    
    // 使用component-base的CLI运行器执行命令
    code := cli.Run(command)
    
    // 以返回的退出码结束程序
    os.Exit(code)
}

2.2 命令创建和配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// NewProxyCommand 创建kube-proxy命令
// 文件路径: cmd/kube-proxy/app/server.go
func NewProxyCommand() *cobra.Command {
    // 创建默认配置选项
    opts := NewOptions()

    cmd := &cobra.Command{
        Use: "kube-proxy",
        Long: `The Kubernetes network proxy runs on each node. This
reflects services as defined in the Kubernetes API on each node and can do simple
TCP, UDP, and SCTP stream forwarding or round robin TCP, UDP, and SCTP forwarding across
a set of backends. Service cluster IPs and ports are currently found through Docker-links-compatible
environment variables specifying ports opened by the service proxy. There is an optional
addon that provides cluster DNS for these cluster IPs. The user must create a service
with the apiserver API to configure the proxy.`,

        // 禁用在命令错误时打印使用说明
        SilenceUsage: true,
        
        // 主运行函数
        RunE: func(cmd *cobra.Command, args []string) error {
            // 验证和完成选项配置
            if err := opts.Complete(); err != nil {
                return fmt.Errorf("failed complete: %w", err)
            }
            
            if err := opts.Validate(); err != nil {
                return fmt.Errorf("failed validate: %w", err)
            }
            
            // 运行代理服务器
            if err := opts.Run(); err != nil {
                klog.ErrorS(err, "Error running ProxyServer")
                return err
            }

            return nil
        },
        Args: func(cmd *cobra.Command, args []string) error {
            for _, arg := range args {
                if len(arg) > 0 {
                    return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
                }
            }
            return nil
        },
    }

    // 添加标志
    var err error
    opts.config, err = opts.ApplyDefaults(opts.config)
    if err != nil {
        klog.ErrorS(err, "Unable to create flag defaults")
        // ACTION REQUIRED: Exit code changed from 255 to 1
        os.Exit(1)
    }

    opts.AddFlags(cmd.Flags())

    return cmd
}

// Options 包含kube-proxy的配置选项
type Options struct {
    // 配置文件路径
    ConfigFile string
    
    // 写入默认配置并退出
    WriteConfigTo string
    
    // 清理iptables规则并退出
    CleanupAndExit bool
    
    // 主配置结构
    config *kubeproxyconfig.KubeProxyConfiguration
    
    // Kubernetes客户端配置
    master     string
    kubeconfig string
    
    // 健康检查和指标配置
    healthzPort     int32
    metricsPort     int32
    metricsBindAddr string
    
    // 日志配置
    loggingConfig *logsapi.LoggingConfiguration
    
    // 特性门控
    featureGates map[string]bool
    
    // 错误通道
    errCh chan error
}

2.3 代理服务器运行主流程

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Run 运行kube-proxy的主函数
// 文件路径: cmd/kube-proxy/app/server.go
func (o *Options) Run() error {
    defer close(o.errCh)
    
    if len(o.WriteConfigTo) > 0 {
        return o.writeConfigFile()
    }
    
    // 创建代理服务器
    proxyServer, err := NewProxyServer(o)
    if err != nil {
        return err
    }
    
    if o.CleanupAndExit {
        return proxyServer.CleanupAndExit()
    }
    
    o.proxyServer = proxyServer
    return o.runLoop()
}

// NewProxyServer 创建代理服务器实例
func NewProxyServer(o *Options) (*ProxyServer, error) {
    config := o.config
    
    if config.ClientConnection.Kubeconfig == "" && config.ClientConnection.KubeconfigFile == "" {
        klog.InfoS("Neither kubeconfig file nor kubeconfig data was specified. Using default API client. This might not work.")
    }

    // 创建客户端配置
    clientConfig := &restclient.Config{}
    if configFile := config.ClientConnection.KubeconfigFile; len(configFile) > 0 {
        loadingRules := &clientcmd.ClientConfigLoadingRules{ExplicitPath: configFile}
        loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, &clientcmd.ConfigOverrides{})

        var err error
        clientConfig, err = loader.ClientConfig()
        if err != nil {
            return nil, fmt.Errorf("invalid kubeconfig: %v", err)
        }
    } else if len(config.ClientConnection.Kubeconfig) > 0 {
        clientConfig, err := clientcmd.NewClientConfigFromBytes([]byte(config.ClientConnection.Kubeconfig))
        if err != nil {
            return nil, fmt.Errorf("invalid kubeconfig: %v", err)
        }
        clientConfig, err = clientConfig.ClientConfig()
        if err != nil {
            return nil, fmt.Errorf("invalid kubeconfig: %v", err)
        }
    }

    // 应用客户端连接配置
    if config.ClientConnection.AcceptContentTypes != "" {
        clientConfig.AcceptContentTypes = config.ClientConnection.AcceptContentTypes
    }
    if config.ClientConnection.ContentType != "" {
        clientConfig.ContentType = config.ClientConnection.ContentType
    }
    if config.ClientConnection.QPS > 0 {
        clientConfig.QPS = config.ClientConnection.QPS
    }
    if config.ClientConnection.Burst > 0 {
        clientConfig.Burst = int(config.ClientConnection.Burst)
    }

    // 创建Kubernetes客户端
    client, err := clientset.NewForConfig(clientConfig)
    if err != nil {
        return nil, fmt.Errorf("unable to create k8s client: %v", err)
    }

    // 创建事件客户端
    eventClient, err := clientset.NewForConfig(clientConfig)
    if err != nil {
        return nil, fmt.Errorf("unable to create event client: %v", err)
    }
    eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: eventClient.EventsV1()})
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "kube-proxy")

    // 获取节点IP
    nodeIP := net.ParseIP(config.BindAddress)
    if nodeIP.IsUnspecified() {
        nodeIP = utilnode.GetNodeIP(client, config.HostnameOverride)
    }

    // 创建代理服务器
    return &ProxyServer{
        Client:                 client,
        EventClient:           eventClient,
        IptInterface:          utiliptables.New(utilexec.New(), utiliptables.ProtocolIPv4),
        IpvsInterface:         utilipvs.New(),
        IpsetInterface:        utilipset.New(utilexec.New()),
        execer:                utilexec.New(),
        Proxier:               nil,
        Broadcaster:           eventBroadcaster,
        Recorder:              recorder,
        ConntrackConfiguration: config.Conntrack,
        Conntracker:           &realConntracker{},
        ProxyMode:             config.Mode,
        NodeRef: &v1.ObjectReference{
            Kind:      "Node",
            Name:      config.HostnameOverride,
            UID:       types.UID(config.HostnameOverride),
            Namespace: "",
        },
        MetricsBindAddr:        config.MetricsBindAddr,
        BindAddressHardFail:    config.BindAddressHardFail,
        EnableProfiling:        config.EnableProfiling,
        OOMScoreAdj:           config.OOMScoreAdj,
        ConfigSyncPeriod:      config.ConfigSyncPeriod.Duration,
        ServiceEventHandler:   config.NodePortAddresses,
        HealthzServer:         healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef),
        UseEndpointSlices:     utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying),
    }, nil
}

// runLoop 运行主循环
func (o *Options) runLoop() error {
    if o.watcher != nil {
        o.watcher.Run()
    }

    // 运行代理服务器
    select {
    case err := <-o.errCh:
        return err
    case <-o.proxyServer.birthCry:
        return o.proxyServer.Run()
    }
}

2.4 启动时序图

sequenceDiagram participant Main as main() participant Cmd as cobra.Command participant Options as Options participant ProxyServer as ProxyServer participant Proxier as Proxier participant API as API Server Note over Main,API: kube-proxy启动完整流程 Main->>Cmd: 1. NewProxyCommand() Cmd->>Options: 2. NewOptions() 创建配置 Options->>Options: 3. 设置默认配置值 Main->>Cmd: 4. cli.Run(command) Cmd->>Options: 5. Complete() 完成配置 Options->>Options: 6. 验证和处理配置 Cmd->>Options: 7. Validate() 验证配置 Options->>Options: 8. 检查配置有效性 Cmd->>Options: 9. Run() 运行代理 Options->>ProxyServer: 10. NewProxyServer() 创建服务器 ProxyServer->>ProxyServer: 11. 创建Kubernetes客户端 ProxyServer->>ProxyServer: 12. 创建事件记录器 ProxyServer->>ProxyServer: 13. 初始化网络接口 Options->>ProxyServer: 14. Run() 启动服务器 ProxyServer->>ProxyServer: 15. 确定代理模式 ProxyServer->>Proxier: 16. 创建对应的Proxier实例 Note over Proxier: 根据模式创建不同的Proxier alt iptables模式 Proxier->>Proxier: 17a. 创建iptables Proxier else IPVS模式 Proxier->>Proxier: 17b. 创建IPVS Proxier else 用户空间模式 Proxier->>Proxier: 17c. 创建Userspace Proxier end ProxyServer->>API: 18. 建立API Server连接 API->>ProxyServer: 19. 返回连接确认 ProxyServer->>ProxyServer: 20. 启动Informer ProxyServer->>ProxyServer: 21. 启动健康检查服务器 ProxyServer->>ProxyServer: 22. 启动指标服务器 ProxyServer->>Proxier: 23. 启动同步循环 Note over Main,API: kube-proxy运行中 loop 主要工作循环 API->>ProxyServer: 24. Service/Endpoints变更通知 ProxyServer->>Proxier: 25. 处理配置变更 Proxier->>Proxier: 26. 同步代理规则 Proxier->>Proxier: 27. 更新网络规则 end

3. 代理模式实现

3.1 iptables模式实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// syncProxyRules iptables模式同步代理规则的核心函数
// 文件路径: pkg/proxy/iptables/proxier.go
func (proxier *Proxier) syncProxyRules() {
    proxier.mu.Lock()
    defer proxier.mu.Unlock()
    
    // 记录同步开始时间
    start := time.Now()
    defer func() {
        metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
        klog.V(2).InfoS("syncProxyRules complete", "elapsed", time.Since(start))
    }()
    
    // 不要在同步过程中记录iptables规则到事件中,因为这会很嘈杂
    proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
    proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.iptablesData)
    
    // 重置链映射,它们将在下面重新填充
    proxier.natChains.Reset()
    proxier.natRules.Reset()
    proxier.filterChains.Reset()
    proxier.filterRules.Reset()
    
    // 写入表头
    proxier.filterChains.Write("*filter")
    proxier.natChains.Write("*nat")
    
    // 确保我们保留重要的"基础"链
    for _, chainName := range []utiliptables.Chain{
        kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain,
        kubeProxyFirewallChain, kubeProxyCanaryChain,
    } {
        proxier.natChains.Write(utiliptables.MakeChainLine(chainName))
    }
    for _, chainName := range []utiliptables.Chain{kubeForwardChain, kubeProxyFirewallChain} {
        proxier.filterChains.Write(utiliptables.MakeChainLine(chainName))
    }
    
    // 安装基础规则
    proxier.writeIptablesRules()
    
    // 遍历每个服务,创建相应的规则
    for svcName, svc := range proxier.serviceMap {
        svcInfo, ok := svc.(*serviceInfo)
        if !ok {
            klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
            continue
        }
        protocol := strings.ToLower(string(svcInfo.Protocol()))
        
        // 为每个服务创建服务链
        svcChain := svcInfo.serviceNamedPortChain()
        if hasEndpoints := len(proxier.endpointsMap[svcName]) > 0; hasEndpoints {
            // 创建服务链
            proxier.natChains.Write(utiliptables.MakeChainLine(svcChain))
            
            // 从KUBE-SERVICES跳转到服务链
            args := []string{
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcName.String()),
                "-m", protocol, "-p", protocol,
                "-d", svcInfo.ClusterIP().String(),
                "--dport", strconv.Itoa(svcInfo.Port()),
            }
            if proxier.masqueradeAll {
                proxier.natRules.Write(
                    "-A", string(svcChain),
                    "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcName.String()),
                    "-j", string(kubeMarkMasqChain))
            }
            
            // 处理会话亲和性
            if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
                args = append(args,
                    "-m", "recent", "--name", string(svcChain),
                    "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap")
            }
            args = append(args, "-j", string(svcChain))
            proxier.natRules.Write(args)
            
            // 为每个端点创建端点链
            endpointChains := make([]utiliptables.Chain, 0)
            for _, ep := range proxier.endpointsMap[svcName] {
                epInfo, ok := ep.(*endpointsInfo)
                if !ok {
                    klog.ErrorS(nil, "Failed to cast endpointsInfo", "endpointsName", ep)
                    continue
                }
                
                endpoints = append(endpoints, epInfo)
                endpointChain := epInfo.endpointChain(svcName, protocol)
                endpointChains = append(endpointChains, endpointChain)
                
                // 创建端点链
                proxier.natChains.Write(utiliptables.MakeChainLine(endpointChain))
            }
            
            // 在服务链中创建负载均衡规则
            n := len(endpointChains)
            for i, endpointChain := range endpointChains {
                // 使用统计模块进行随机负载均衡
                args := []string{
                    "-A", string(svcChain),
                    "-m", "comment", "--comment", svcName.String(),
                }
                if i < (n - 1) {
                    // 对于前n-1个端点,使用概率1/n, 1/(n-1), ..., 1/2
                    args = append(args,
                        "-m", "statistic",
                        "--mode", "random",
                        "--probability", proxier.probability(n-i))
                }
                args = append(args, "-j", string(endpointChain))
                proxier.natRules.Write(args)
            }
            
            // 为每个端点创建DNAT规则
            for i, ep := range endpoints {
                epInfo := ep.(*endpointsInfo)
                endpointChain := endpointChains[i]
                
                // 创建DNAT规则
                args := []string{
                    "-A", string(endpointChain),
                    "-m", "comment", "--comment", svcName.String(),
                    "-m", protocol, "-p", protocol,
                    "-j", "DNAT",
                    "--to-destination", epInfo.Endpoint(),
                }
                proxier.natRules.Write(args)
            }
        } else {
            // 没有端点,创建拒绝规则
            proxier.filterChains.Write(utiliptables.MakeChainLine(svcChain))
            proxier.filterRules.Write(
                "-A", string(svcChain),
                "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcName.String()),
                "-j", "REJECT",
                "--reject-with", "icmp-port-unreachable")
        }
    }
    
    // 写入表尾
    proxier.filterRules.Write("COMMIT")
    proxier.natRules.Write("COMMIT")
    
    // 原子性地应用所有规则
    lines := append(proxier.natChains.Bytes(), proxier.natRules.Bytes()...)
    lines = append(lines, proxier.filterChains.Bytes()...)
    lines = append(lines, proxier.filterRules.Bytes()...)
    
    klog.V(3).InfoS("Restoring iptables", "rules", lines)
    err := proxier.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
    if err != nil {
        klog.ErrorS(err, "Failed to execute iptables-restore")
        metrics.IptablesRestoreFailuresTotal.Inc()
        return
    }
    success = true
    
    // 更新健康检查时间戳
    proxier.healthzServer.Updated()
}

// probability 计算概率,用于iptables随机负载均衡
func (proxier *Proxier) probability(n int) string {
    return fmt.Sprintf("%0.10f", 1.0/float64(n))
}

// writeIptablesRules 写入基础iptables规则
func (proxier *Proxier) writeIptablesRules() {
    // nat表规则
    proxier.natRules.Write(
        "-A", string(kubeServicesChain),
        "-m", "comment", "--comment", `"kubernetes service portals"`,
        "-m", "addrtype", "--dst-type", "LOCAL",
        "-j", string(kubeNodePortsChain))
    proxier.natRules.Write(
        "-A", string(kubeServicesChain),
        "-m", "comment", "--comment", `"kubernetes service portals"`,
        "-j", string(kubeClusterIPChain))
    
    // 伪装规则
    proxier.natRules.Write(
        "-A", string(kubeMarkMasqChain),
        "-j", "MARK", "--set-xmark", proxier.masqueradeMark)
    
    // SNAT规则
    masqRule := []string{
        "-A", string(kubePostroutingChain),
        "-m", "comment", "--comment", `"kubernetes postrouting rules"`,
        "-m", "mark", "--mark", proxier.masqueradeMark,
        "-j", "MASQUERADE",
    }
    if proxier.iptables.HasRandomFully() {
        masqRule = append(masqRule, "--random-fully")
    }
    proxier.natRules.Write(masqRule)
    
    // filter表规则
    proxier.filterRules.Write(
        "-A", string(kubeForwardChain),
        "-m", "comment", "--comment", `"kubernetes forwarding rules"`,
        "-m", "mark", "--mark", proxier.masqueradeMark,
        "-j", "ACCEPT")
}

3.2 IPVS模式实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
// syncProxyRules IPVS模式同步代理规则的核心函数
// 文件路径: pkg/proxy/ipvs/proxier.go
func (proxier *Proxier) syncProxyRules() {
    proxier.mu.Lock()
    defer proxier.mu.Unlock()
    
    // 记录同步开始时间
    start := time.Now()
    defer func() {
        metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
        klog.V(2).InfoS("syncProxyRules complete", "elapsed", time.Since(start))
    }()
    
    // 构建IPVS规则
    staleServices := make(map[string]*utilipvs.VirtualServer)
    
    // 获取当前IPVS虚拟服务器
    virtualServers, err := proxier.ipvs.GetVirtualServers()
    if err != nil {
        klog.ErrorS(err, "Failed to get IPVS virtual servers")
        return
    }
    
    // 将现有的虚拟服务器标记为过期
    for _, vs := range virtualServers {
        staleServices[vs.String()] = vs
    }
    
    // 构建当前活跃的服务映射
    activeServices := make(map[string]bool)
    activeBindAddrs := make(map[string]bool)
    
    // 遍历每个服务
    for svcName, svc := range proxier.serviceMap {
        svcInfo, ok := svc.(*serviceInfo)
        if !ok {
            klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
            continue
        }
        
        protocol := utilipvs.Protocol(svcInfo.Protocol())
        
        // 创建虚拟服务器
        virtualServer := &utilipvs.VirtualServer{
            Address:   svcInfo.ClusterIP(),
            Port:      uint16(svcInfo.Port()),
            Protocol:  string(protocol),
            Scheduler: proxier.scheduler,
        }
        
        // 设置会话亲和性
        if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
            virtualServer.Flags |= utilipvs.FlagPersistent
            virtualServer.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
        }
        
        // 确保虚拟服务器存在
        if err := proxier.syncService(svcName, virtualServer, false); err != nil {
            klog.ErrorS(err, "Failed to sync service", "serviceName", svcName)
            continue
        }
        
        // 标记为活跃服务
        activeServices[virtualServer.String()] = true
        delete(staleServices, virtualServer.String())
        
        // 绑定服务IP到虚拟接口
        if err := proxier.syncServiceBinding(svcInfo.ClusterIP(), svcName); err != nil {
            klog.ErrorS(err, "Failed to sync service binding", "serviceName", svcName)
            continue
        }
        activeBindAddrs[svcInfo.ClusterIP().String()] = true
        
        // 同步真实服务器(端点)
        curEndpoints := proxier.endpointsMap[svcName]
        if err := proxier.syncEndpoint(svcName, curEndpoints); err != nil {
            klog.ErrorS(err, "Failed to sync endpoint", "serviceName", svcName)
            continue
        }
        
        // 处理NodePort服务
        if svcInfo.NodePort() != 0 {
            nodePortVirtualServer := &utilipvs.VirtualServer{
                Address:   proxier.nodeIP,
                Port:      uint16(svcInfo.NodePort()),
                Protocol:  string(protocol),
                Scheduler: proxier.scheduler,
            }
            
            if err := proxier.syncService(svcName, nodePortVirtualServer, false); err != nil {
                klog.ErrorS(err, "Failed to sync NodePort service", "serviceName", svcName)
                continue
            }
            
            activeServices[nodePortVirtualServer.String()] = true
            delete(staleServices, nodePortVirtualServer.String())
        }
        
        // 处理LoadBalancer服务
        for _, ingress := range svcInfo.LoadBalancerIPStrings() {
            if ingress != "" {
                loadBalancerVirtualServer := &utilipvs.VirtualServer{
                    Address:   net.ParseIP(ingress),
                    Port:      uint16(svcInfo.Port()),
                    Protocol:  string(protocol),
                    Scheduler: proxier.scheduler,
                }
                
                if err := proxier.syncService(svcName, loadBalancerVirtualServer, false); err != nil {
                    klog.ErrorS(err, "Failed to sync LoadBalancer service", "serviceName", svcName)
                    continue
                }
                
                activeServices[loadBalancerVirtualServer.String()] = true
                delete(staleServices, loadBalancerVirtualServer.String())
            }
        }
    }
    
    // 清理过期的虚拟服务器
    for _, vs := range staleServices {
        klog.V(2).InfoS("Removing stale virtual server", "virtualServer", vs)
        if err := proxier.ipvs.DeleteVirtualServer(vs); err != nil {
            klog.ErrorS(err, "Failed to delete virtual server", "virtualServer", vs)
        }
    }
    
    // 清理过期的绑定地址
    proxier.cleanupStaleBindAddrs(activeBindAddrs)
    
    // 同步iptables规则(IPVS模式仍需要一些iptables规则)
    proxier.syncIPTablesRules()
    
    // 更新健康检查时间戳
    proxier.healthzServer.Updated()
}

// syncService 同步IPVS虚拟服务器
func (proxier *Proxier) syncService(svcName proxy.ServicePortName, vs *utilipvs.VirtualServer, bindAddr bool) error {
    appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
    if appliedVirtualServer == nil {
        klog.V(3).InfoS("Adding virtual server", "serviceName", svcName, "virtualServer", vs)
        if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
            return err
        }
    } else if !appliedVirtualServer.Equal(vs) {
        klog.V(3).InfoS("Updating virtual server", "serviceName", svcName, "virtualServer", vs)
        if err := proxier.ipvs.UpdateVirtualServer(vs); err != nil {
            return err
        }
    }
    
    return nil
}

// syncEndpoint 同步IPVS真实服务器
func (proxier *Proxier) syncEndpoint(svcName proxy.ServicePortName, endpoints []proxy.Endpoint) error {
    appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(&utilipvs.VirtualServer{
        Address:  svcInfo.ClusterIP(),
        Port:     uint16(svcInfo.Port()),
        Protocol: string(utilipvs.Protocol(svcInfo.Protocol())),
    })
    if err != nil || appliedVirtualServer == nil {
        return fmt.Errorf("failed to get virtual server: %v", err)
    }
    
    // 获取当前真实服务器
    curRealServers, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
    if err != nil {
        return fmt.Errorf("failed to get real servers: %v", err)
    }
    
    // 构建期望的真实服务器集合
    expectedRealServers := make(map[string]*utilipvs.RealServer)
    for _, ep := range endpoints {
        epInfo, ok := ep.(*endpointsInfo)
        if !ok {
            klog.ErrorS(nil, "Failed to cast endpointsInfo", "endpoint", ep)
            continue
        }
        
        realServer := &utilipvs.RealServer{
            Address: epInfo.IP(),
            Port:    uint16(epInfo.Port()),
            Weight:  1,
        }
        
        expectedRealServers[realServer.String()] = realServer
    }
    
    // 删除不需要的真实服务器
    for _, rs := range curRealServers {
        if _, exists := expectedRealServers[rs.String()]; !exists {
            klog.V(3).InfoS("Removing real server", "serviceName", svcName, "realServer", rs)
            if err := proxier.ipvs.DeleteRealServer(appliedVirtualServer, rs); err != nil {
                klog.ErrorS(err, "Failed to delete real server", "realServer", rs)
            }
        }
    }
    
    // 添加新的真实服务器
    for _, rs := range expectedRealServers {
        if err := proxier.ipvs.AddRealServer(appliedVirtualServer, rs); err != nil {
            klog.ErrorS(err, "Failed to add real server", "realServer", rs)
        }
    }
    
    return nil
}

// syncIPTablesRules 同步IPVS模式需要的iptables规则
func (proxier *Proxier) syncIPTablesRules() {
    // IPVS模式仍然需要一些iptables规则用于:
    // 1. 标记需要SNAT的包
    // 2. 处理NodePort流量
    // 3. 处理LoadBalancer流量
    
    // 构建iptables规则
    natRules := bytes.NewBuffer(nil)
    natChains := bytes.NewBuffer(nil)
    
    natChains.Write("*nat\n")
    natChains.Write(utiliptables.MakeChainLine(kubeServicesChain))
    natChains.Write(utiliptables.MakeChainLine(kubeNodePortsChain))
    natChains.Write(utiliptables.MakeChainLine(kubeFireWallChain))
    natChains.Write(utiliptables.MakeChainLine(kubeMarkMasqChain))
    
    // 基础规则
    natRules.Write("-A", string(kubeServicesChain), 
        "-m", "comment", "--comment", "kubernetes service portals",
        "-m", "addrtype", "--dst-type", "LOCAL",
        "-j", string(kubeNodePortsChain), "\n")
    
    natRules.Write("-A", string(kubeMarkMasqChain),
        "-j", "MARK", "--set-xmark", proxier.masqueradeMark, "\n")
    
    // SNAT规则
    natRules.Write("-A", string(kubePostroutingChain),
        "-m", "comment", "--comment", "kubernetes postrouting rules",
        "-m", "mark", "--mark", proxier.masqueradeMark,
        "-j", "MASQUERADE", "\n")
    
    natRules.Write("COMMIT\n")
    
    // 应用iptables规则
    lines := append(natChains.Bytes(), natRules.Bytes()...)
    if err := proxier.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters); err != nil {
        klog.ErrorS(err, "Failed to execute iptables-restore for IPVS")
        return
    }
}

4. 服务发现和负载均衡

4.1 服务处理器实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// OnServiceAdd 处理Service添加事件
// 文件路径: pkg/proxy/iptables/proxier.go
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
    proxier.OnServiceUpdate(nil, service)
}

// OnServiceUpdate 处理Service更新事件
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
    if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
        proxier.Sync()
    }
}

// OnServiceDelete 处理Service删除事件
func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
    proxier.OnServiceUpdate(service, nil)
}

// serviceChange 服务变更跟踪器
type serviceChange struct {
    current  *v1.Service
    previous *v1.Service
}

// ServiceChangeTracker 服务变更跟踪器
type ServiceChangeTracker struct {
    // lock 保护items映射的读写
    lock sync.Mutex
    
    // items 跟踪待处理的服务变更
    items map[types.NamespacedName]*serviceChange
    
    // makeServiceInfo 创建ServiceInfo的工厂函数
    makeServiceInfo makeServicePortFunc
    
    // isIPv6Mode 是否为IPv6模式
    isIPv6Mode *bool
    
    // recorder 事件记录器
    recorder events.EventRecorder
}

// Update 更新服务变更跟踪器
func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool {
    svc := current
    if svc == nil {
        svc = previous
    }
    
    // 忽略无效服务
    if svc == nil {
        return false
    }
    
    namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}
    
    sct.lock.Lock()
    defer sct.lock.Unlock()
    
    change, exists := sct.items[namespacedName]
    if !exists {
        change = &serviceChange{}
        change.previous = previous
        sct.items[namespacedName] = change
    }
    change.current = current
    
    // 如果这是删除操作且之前没有服务,则删除此条目
    if current == nil && change.previous == nil {
        delete(sct.items, namespacedName)
    }
    
    metrics.ServiceChangesPending.Set(float64(len(sct.items)))
    return len(sct.items) > 0
}

// checkoutChanges 检出所有待处理的服务变更
func (sct *ServiceChangeTracker) checkoutChanges() []*serviceChange {
    sct.lock.Lock()
    defer sct.lock.Unlock()
    
    changes := make([]*serviceChange, 0, len(sct.items))
    for _, change := range sct.items {
        changes = append(changes, change)
    }
    sct.items = make(map[types.NamespacedName]*serviceChange)
    metrics.ServiceChangesPending.Set(0)
    return changes
}

// serviceToServiceMap 将服务变更应用到服务映射
func serviceToServiceMap(service *v1.Service, change *serviceChange, serviceMap ServiceMap) {
    if service == nil {
        return
    }
    
    svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
    
    for i := range service.Spec.Ports {
        servicePort := &service.Spec.Ports[i]
        svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol}
        
        // 创建ServiceInfo
        info := &serviceInfo{
            BaseServiceInfo: &BaseServiceInfo{
                clusterIP:                service.Spec.ClusterIP,
                port:                     int(servicePort.Port),
                protocol:                 servicePort.Protocol,
                nodePort:                 int(servicePort.NodePort),
                loadBalancerStatus:       service.Status.LoadBalancer,
                sessionAffinityType:      service.Spec.SessionAffinity,
                stickyMaxAgeSeconds:      int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds),
                externalIPs:              service.Spec.ExternalIPs,
                loadBalancerSourceRanges: service.Spec.LoadBalancerSourceRanges,
                onlyNodeLocalEndpoints:   apiservice.RequestsOnlyLocalTraffic(service),
                topologyKeys:             topologyKeys,
                externalPolicyLocal:      apiservice.ExternalPolicyLocal(service),
                internalPolicyLocal:      apiservice.InternalPolicyLocal(service),
                hintsAnnotation:          service.Annotations[v1.AnnotationTopologyAwareHints],
            },
        }
        
        serviceMap[svcPortName] = info
        klog.V(1).InfoS("Added service", "serviceName", svcPortName, "clusterIP", info.ClusterIP(), "port", info.Port(), "protocol", info.Protocol())
    }
}

4.2 端点处理器实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// OnEndpointsAdd 处理Endpoints添加事件
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
    proxier.OnEndpointsUpdate(nil, endpoints)
}

// OnEndpointsUpdate 处理Endpoints更新事件
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
    if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
        proxier.Sync()
    }
}

// OnEndpointsDelete 处理Endpoints删除事件
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
    proxier.OnEndpointsUpdate(endpoints, nil)
}

// EndpointsChangeTracker 端点变更跟踪器
type EndpointsChangeTracker struct {
    // lock 保护items映射的读写
    lock sync.Mutex
    
    // items 跟踪待处理的端点变更
    items map[types.NamespacedName]*endpointsChange
    
    // makeEndpointInfo 创建EndpointInfo的工厂函数
    makeEndpointInfo makeEndpointFunc
    
    // hostname 当前节点主机名
    hostname string
    
    // isIPv6Mode 是否为IPv6模式
    isIPv6Mode *bool
    
    // recorder 事件记录器
    recorder events.EventRecorder
}

// endpointsChange 端点变更
type endpointsChange struct {
    current  *v1.Endpoints
    previous *v1.Endpoints
}

// Update 更新端点变更跟踪器
func (ect *EndpointsChangeTracker) Update(previous, current *v1.Endpoints) bool {
    endpoints := current
    if endpoints == nil {
        endpoints = previous
    }
    
    // 忽略无效端点
    if endpoints == nil {
        return false
    }
    
    namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
    
    ect.lock.Lock()
    defer ect.lock.Unlock()
    
    change, exists := ect.items[namespacedName]
    if !exists {
        change = &endpointsChange{}
        change.previous = previous
        ect.items[namespacedName] = change
    }
    change.current = current
    
    // 如果这是删除操作且之前没有端点,则删除此条目
    if current == nil && change.previous == nil {
        delete(ect.items, namespacedName)
    }
    
    metrics.EndpointChangesPending.Set(float64(len(ect.items)))
    return len(ect.items) > 0
}

// endpointsToEndpointsMap 将端点变更应用到端点映射
func endpointsToEndpointsMap(endpoints *v1.Endpoints, change *endpointsChange, endpointsMap EndpointsMap) {
    if endpoints == nil {
        return
    }
    
    svcName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
    
    for i := range endpoints.Subsets {
        ss := &endpoints.Subsets[i]
        for i := range ss.Ports {
            port := &ss.Ports[i]
            if port.Port == 0 {
                klog.WarningS(nil, "Ignoring invalid endpoint port", "port", port)
                continue
            }
            
            svcPortName := ServicePortName{
                NamespacedName: svcName,
                Port:           port.Name,
                Protocol:       port.Protocol,
            }
            
            for i := range ss.Addresses {
                addr := &ss.Addresses[i]
                if addr.IP == "" {
                    klog.WarningS(nil, "Ignoring invalid endpoint address", "address", addr)
                    continue
                }
                
                // 创建EndpointInfo
                info := &endpointsInfo{
                    BaseEndpointInfo: &BaseEndpointInfo{
                        endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))),
                        isLocal:  addr.NodeName != nil && *addr.NodeName == change.hostname,
                        ready:    true,
                        serving:  true,
                        terminating: false,
                    },
                }
                
                endpointsMap[svcPortName] = append(endpointsMap[svcPortName], info)
                klog.V(1).InfoS("Added endpoint", "serviceName", svcPortName, "endpoint", info.String(), "isLocal", info.IsLocal())
            }
            
            // 处理NotReadyAddresses
            for i := range ss.NotReadyAddresses {
                addr := &ss.NotReadyAddresses[i]
                if addr.IP == "" {
                    klog.WarningS(nil, "Ignoring invalid not-ready endpoint address", "address", addr)
                    continue
                }
                
                info := &endpointsInfo{
                    BaseEndpointInfo: &BaseEndpointInfo{
                        endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))),
                        isLocal:  addr.NodeName != nil && *addr.NodeName == change.hostname,
                        ready:    false,
                        serving:  false,
                        terminating: false,
                    },
                }
                
                endpointsMap[svcPortName] = append(endpointsMap[svcPortName], info)
                klog.V(1).InfoS("Added not-ready endpoint", "serviceName", svcPortName, "endpoint", info.String(), "isLocal", info.IsLocal())
            }
        }
    }
}

5. 性能优化和最佳实践

5.1 IPVS vs iptables性能对比

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 性能对比分析
type ProxyModeComparison struct {
    Mode           string
    RuleComplexity string  // 规则复杂度
    Scalability    string  // 扩展性
    Performance    string  // 性能
    LoadBalancing  string  // 负载均衡算法
    SessionAffinity string // 会话亲和性
}

var ProxyModeComparisons = []ProxyModeComparison{
    {
        Mode:           "iptables",
        RuleComplexity: "O(n) - 线性增长",
        Scalability:    "中等 - 适合中小规模集群",
        Performance:    "中等 - 随服务数量线性下降",
        LoadBalancing:  "随机 - 基于iptables统计模块",
        SessionAffinity: "支持 - 基于recent模块",
    },
    {
        Mode:           "IPVS",
        RuleComplexity: "O(1) - 常量时间",
        Scalability:    "高 - 适合大规模集群",
        Performance:    "高 - 内核态负载均衡",
        LoadBalancing:  "多种算法 - rr/lc/dh/sh/sed/nq",
        SessionAffinity: "支持 - 内核态会话保持",
    },
    {
        Mode:           "userspace",
        RuleComplexity: "简单 - 用户态处理",
        Scalability:    "低 - 不适合生产环境",
        Performance:    "低 - 用户态代理开销大",
        LoadBalancing:  "轮询 - 简单轮询算法",
        SessionAffinity: "支持 - 用户态实现",
    },
}

5.2 连接跟踪优化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// 优化conntrack配置
type ConntrackOptimizer struct {
    // conntrack表大小
    maxConnections int
    
    // TCP超时配置
    tcpTimeouts map[string]time.Duration
    
    // UDP超时配置
    udpTimeout time.Duration
    
    // 清理间隔
    cleanupInterval time.Duration
}

func NewConntrackOptimizer() *ConntrackOptimizer {
    return &ConntrackOptimizer{
        maxConnections: 1048576, // 1M连接
        tcpTimeouts: map[string]time.Duration{
            "tcp_timeout_established": 7440 * time.Second,  // 2小时4分钟
            "tcp_timeout_close_wait":  60 * time.Second,    // 1分钟
            "tcp_timeout_fin_wait":    120 * time.Second,   // 2分钟
            "tcp_timeout_time_wait":   120 * time.Second,   // 2分钟
        },
        udpTimeout:      30 * time.Second, // 30秒
        cleanupInterval: 300 * time.Second, // 5分钟
    }
}

// OptimizeConntrack 优化conntrack配置
func (co *ConntrackOptimizer) OptimizeConntrack() error {
    // 设置conntrack最大连接数
    if err := co.setConntrackMax(); err != nil {
        return fmt.Errorf("failed to set conntrack max: %v", err)
    }
    
    // 设置TCP超时
    for param, timeout := range co.tcpTimeouts {
        if err := co.setConntrackTimeout(param, int(timeout.Seconds())); err != nil {
            klog.ErrorS(err, "Failed to set conntrack timeout", "param", param)
        }
    }
    
    // 设置UDP超时
    if err := co.setConntrackTimeout("udp_timeout", int(co.udpTimeout.Seconds())); err != nil {
        klog.ErrorS(err, "Failed to set UDP conntrack timeout")
    }
    
    return nil
}

// setConntrackMax 设置conntrack最大连接数
func (co *ConntrackOptimizer) setConntrackMax() error {
    maxFile := "/proc/sys/net/netfilter/nf_conntrack_max"
    return ioutil.WriteFile(maxFile, []byte(strconv.Itoa(co.maxConnections)), 0644)
}

// setConntrackTimeout 设置conntrack超时
func (co *ConntrackOptimizer) setConntrackTimeout(param string, timeout int) error {
    timeoutFile := fmt.Sprintf("/proc/sys/net/netfilter/nf_conntrack_%s", param)
    return ioutil.WriteFile(timeoutFile, []byte(strconv.Itoa(timeout)), 0644)
}

5.3 iptables规则优化

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// iptables规则优化器
type IptablesOptimizer struct {
    // 规则缓存
    ruleCache map[string][]string
    
    // 链优化
    chainOptimizer *ChainOptimizer
    
    // 批量操作
    batchSize int
}

type ChainOptimizer struct {
    // 自定义链映射
    customChains map[string]bool
    
    // 规则去重
    ruleDeduplication bool
    
    // 规则排序
    ruleSorting bool
}

// OptimizeRules 优化iptables规则
func (io *IptablesOptimizer) OptimizeRules(rules []string) []string {
    if len(rules) == 0 {
        return rules
    }
    
    // 1. 去重规则
    rules = io.deduplicateRules(rules)
    
    // 2. 排序规则以提高匹配效率
    rules = io.sortRules(rules)
    
    // 3. 合并相似规则
    rules = io.mergeRules(rules)
    
    // 4. 优化链结构
    rules = io.optimizeChains(rules)
    
    return rules
}

// deduplicateRules 去重规则
func (io *IptablesOptimizer) deduplicateRules(rules []string) []string {
    seen := make(map[string]bool)
    var result []string
    
    for _, rule := range rules {
        if !seen[rule] {
            seen[rule] = true
            result = append(result, rule)
        }
    }
    
    return result
}

// sortRules 排序规则,将更具体的规则放在前面
func (io *IptablesOptimizer) sortRules(rules []string) []string {
    sort.Slice(rules, func(i, j int) bool {
        // 计算规则的具体程度(更多匹配条件 = 更具体)
        scoreI := io.calculateRuleSpecificity(rules[i])
        scoreJ := io.calculateRuleSpecificity(rules[j])
        return scoreI > scoreJ
    })
    
    return rules
}

// calculateRuleSpecificity 计算规则的具体程度
func (io *IptablesOptimizer) calculateRuleSpecificity(rule string) int {
    score := 0
    
    // 具体IP地址比子网更具体
    if strings.Contains(rule, "/32") {
        score += 10
    } else if strings.Contains(rule, "/") {
        score += 5
    }
    
    // 具体端口比端口范围更具体
    if strings.Contains(rule, "--dport") && !strings.Contains(rule, ":") {
        score += 8
    }
    
    // 协议匹配
    if strings.Contains(rule, "-p tcp") || strings.Contains(rule, "-p udp") {
        score += 3
    }
    
    // 状态匹配
    if strings.Contains(rule, "--state") {
        score += 5
    }
    
    return score
}

// mergeRules 合并相似规则
func (io *IptablesOptimizer) mergeRules(rules []string) []string {
    // 简化实现:将具有相同动作和类似条件的规则合并
    merged := make(map[string][]string)
    var result []string
    
    for _, rule := range rules {
        key := io.getRuleMergeKey(rule)
        merged[key] = append(merged[key], rule)
    }
    
    for _, ruleGroup := range merged {
        if len(ruleGroup) > 1 {
            // 尝试合并规则组
            if mergedRule := io.tryMergeRuleGroup(ruleGroup); mergedRule != "" {
                result = append(result, mergedRule)
                continue
            }
        }
        result = append(result, ruleGroup...)
    }
    
    return result
}

// getRuleMergeKey 获取规则合并键
func (io *IptablesOptimizer) getRuleMergeKey(rule string) string {
    // 提取规则的关键部分用于分组
    parts := strings.Fields(rule)
    var key []string
    
    for i, part := range parts {
        if part == "-j" && i+1 < len(parts) {
            key = append(key, part, parts[i+1])
            break
        }
        if part == "-A" && i+1 < len(parts) {
            key = append(key, part, parts[i+1])
        }
    }
    
    return strings.Join(key, " ")
}

6. 总结与最佳实践

6.1 架构设计原则

kube-proxy的设计体现了以下核心原则:

  1. 模式多样性:支持多种代理模式适应不同场景
  2. 事件驱动:响应Service和Endpoints变化
  3. 高性能:IPVS模式提供内核级负载均衡
  4. 可扩展性:支持大规模集群部署
  5. 容错设计:具备故障恢复和规则同步机制

6.2 性能优化建议

  1. 代理模式选择

    • 小规模集群:iptables模式
    • 大规模集群:IPVS模式
    • 调试环境:userspace模式
  2. 网络优化

    • 优化conntrack配置
    • 调整内核网络参数
    • 使用高性能网络插件
  3. 规则优化

    • 定期清理无用规则
    • 优化规则顺序
    • 使用ipset提高匹配效率

6.3 运维最佳实践

  1. 监控指标

    • 代理规则同步延迟
    • 网络连接数统计
    • 负载均衡分布情况
  2. 故障排查

    • 检查Service和Endpoints状态
    • 验证网络规则配置
    • 分析连接跟踪表
  3. 安全配置

    • 限制NodePort范围
    • 配置网络策略
    • 启用审计日志

7. 最新网络代理特性

7.1 EndpointSlice支持

基于最新的技术文章,Kubernetes 1.29+引入了EndpointSlice的重要改进:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// EndpointSliceHandler EndpointSlice处理器
// 支持更高效的端点管理和拓扑感知路由
type EndpointSliceHandler struct {
    // proxier 代理器实例
    proxier Proxier
    
    // endpointSliceCache EndpointSlice缓存
    endpointSliceCache *EndpointSliceCache
    
    // servicePortMap 服务端口映射
    servicePortMap map[ServicePortName]ServicePort
    
    // lock 保护并发访问的互斥锁
    lock sync.RWMutex
}

// OnEndpointSliceUpdate 处理EndpointSlice更新事件
func (esh *EndpointSliceHandler) OnEndpointSliceUpdate(oldEndpointSlice, endpointSlice *discovery.EndpointSlice) {
    esh.lock.Lock()
    defer esh.lock.Unlock()
    
    // 1. 验证EndpointSlice有效性
    if endpointSlice == nil {
        klog.ErrorS(nil, "Received nil EndpointSlice")
        return
    }
    
    // 2. 检查是否为IPv4/IPv6双栈支持
    addressType := endpointSlice.AddressType
    if !esh.isValidAddressType(addressType) {
        klog.V(4).InfoS("Skipping EndpointSlice with unsupported address type", 
            "endpointSlice", klog.KObj(endpointSlice), "addressType", addressType)
        return
    }
    
    // 3. 更新EndpointSlice缓存
    serviceName := types.NamespacedName{
        Namespace: endpointSlice.Namespace,
        Name:      endpointSlice.Labels[discovery.LabelServiceName],
    }
    
    esh.endpointSliceCache.updatePending(endpointSlice, true)
    
    // 4. 触发代理规则同步
    if esh.shouldSyncService(serviceName) {
        esh.proxier.Sync()
    }
}

通过深入理解kube-proxy的源码实现,特别是最新的EndpointSlice支持和负载均衡算法优化,我们能够更好地配置和优化Kubernetes网络,充分发挥服务网格的优势。


创建时间: 2025年09月13日

本文深入分析了kube-proxy的源码实现,为理解和优化Kubernetes网络代理提供了全面的技术指南。