APISIX-04-插件与负载均衡
本文档深入剖析 APISIX 的插件系统和负载均衡机制,包括插件生命周期、核心插件实现、多种负载均衡算法和服务发现集成。
一、插件系统总览
1.1 职责与边界
核心职责:
- 提供统一的插件加载、卸载、版本管理机制
- 在 Nginx 各阶段按优先级编排插件执行
- 支持 HTTP 和 Stream 两种协议的插件
- 支持 WASM 插件动态加载
- 提供插件配置的 Schema 校验和数据加密
- 实现插件元数据管理和热更新
- 支持插件级别的条件过滤(
_meta.filter)
边界:
- 插件配置可绑定到 Route、Service、Consumer、Consumer Group、Global Rules
- 插件按优先级(priority)排序,数值越大优先级越高
- 支持的 Nginx 阶段:
rewrite、access、header_filter、body_filter、log、delayed_body_filter - Stream 插件支持:
preread、log
上下游依赖:
- 依赖
apisix.core提供基础工具(log、schema、config) - 依赖
apisix.admin.plugins管理插件列表 - 被
apisix.init在各阶段调用执行
1.2 插件系统架构图
flowchart TB
subgraph 配置层
A[config.yaml<br/>插件列表] --> B[Admin API<br/>插件配置]
B --> C[Etcd/YAML<br/>配置存储]
end
subgraph 加载层
D[plugin.load<br/>加载插件] --> E[插件优先级排序]
E --> F[插件Schema注入]
F --> G[插件init</>调用]
end
subgraph 执行层
H[plugin.filter<br/>过滤插件] --> I[检查_meta.filter]
I --> J[合并Route/Service/Consumer]
J --> K[plugin.run_plugin<br/>按phase执行]
end
subgraph 核心插件类型
L1[认证插件<br/>key-auth,jwt-auth]
L2[流量控制<br/>limit-count,limit-req]
L3[转换插件<br/>proxy-rewrite,response-rewrite]
L4[可观测性<br/>prometheus,skywalking]
L5[安全插件<br/>cors,csrf,ip-restriction]
end
C --> D
G --> H
K --> L1
K --> L2
K --> L3
K --> L4
K --> L5
style H fill:#e1f5ff
style K fill:#fff4e1
style D fill:#f0f0f0
架构说明:
- 配置层:插件列表在
config.yaml中声明,插件配置通过 Admin API 下发到 Etcd - 加载层:
init_worker阶段加载插件,按优先级排序,注入 Schema 元数据 - 执行层:请求处理时过滤可用插件,按 phase 依次执行
- 插件类型:认证、流控、转换、可观测、安全五大类
1.3 插件生命周期时序图
sequenceDiagram
autonumber
participant C as config.yaml
participant A as Admin API
participant P as plugin.lua
participant I as init.lua
participant R as Route Match
participant E as Plugin Execution
Note over C,P: === 启动阶段 ===
C->>P: 读取plugins配置列表
P->>P: load() 加载所有插件模块
P->>P: 校验插件priority/version/schema
P->>P: 调用插件init()方法
P->>P: 按priority排序存储到local_plugins
Note over A,E: === 配置阶段 ===
A->>P: check_schema() 校验插件配置
A->>P: encrypt_conf() 加密敏感字段
A->>Etcd: 存储加密后的配置
Note over I,E: === 请求处理阶段 ===
I->>R: 匹配Route
R->>P: filter() 过滤可用插件
P->>P: 检查_meta.filter表达式
P->>P: 合并Route/Service/Consumer插件
P->>E: run_plugin(phase='rewrite')
E->>E: 执行各插件rewrite()方法
P->>E: run_plugin(phase='access')
E->>E: 执行各插件access()方法
Note over E: === 响应阶段 ===
I->>P: run_plugin(phase='header_filter')
I->>P: run_plugin(phase='body_filter')
I->>P: run_plugin(phase='log')
Note over P: === 热更新阶段 ===
Admin API->>P: POST /apisix/admin/plugins/reload
P->>P: load() 重新加载插件
P->>P: 同步配置到Etcd
时序说明:
- 启动阶段(1-5):从
config.yaml加载插件列表,初始化插件模块 - 配置阶段(6-8):Admin API 校验和加密插件配置后存储
- 请求处理(9-14):匹配 Route 后过滤和执行插件
- 响应阶段(15-17):在响应过滤和日志阶段执行插件
- 热更新(18-20):支持不重启动态加载插件
二、插件核心模块详解
2.1 插件加载器(plugin.lua)
2.1.1 核心数据结构
-- 插件定义结构
local plugin_definition = {
name = "example-plugin", -- 插件名称
version = 0.1, -- 插件版本
priority = 1000, -- 优先级(越大越早执行)
type = 'auth', -- 插件类型:auth/transform/log
schema = {...}, -- 插件配置Schema
consumer_schema = {...}, -- Consumer配置Schema
init = function() end, -- 初始化函数
rewrite = function(conf, ctx) end, -- rewrite阶段处理
access = function(conf, ctx) end, -- access阶段处理
header_filter = function(conf, ctx) end,
body_filter = function(conf, ctx) end,
log = function(conf, ctx) end,
}
-- 全局插件存储
local local_plugins = {} -- 已加载插件列表(按priority排序)
local local_plugins_hash = {} -- 插件名称->插件对象映射
local stream_local_plugins = {} -- Stream插件列表
local stream_local_plugins_hash = {}
2.1.2 插件加载流程(load_plugin)
local function load_plugin(name, plugins_list, plugin_type)
local pkg_name = "apisix.plugins." .. name
local pkg = require(pkg_name)
-- 基本属性校验
if not pkg.priority then
return "invalid plugin priority"
end
if not pkg.version then
return "invalid plugin version"
end
-- Schema注入_meta字段(支持filter表达式)
if pkg.schema then
inject_conf_with_meta_schema(pkg)
end
-- 调用插件初始化函数
if pkg.init then
pkg.init()
end
-- 插件工作流处理器(高级特性)
if pkg.workflow_handler then
pkg.workflow_handler()
end
core.table.insert(plugins_list, pkg)
return nil
end
功能说明:
- 动态 require 插件模块
- 校验插件必需属性(priority、version)
- 注入
_meta字段到 Schema,支持条件过滤 - 调用插件的
init()和workflow_handler()
异常处理:
- 插件模块不存在:返回 require 错误
- 缺少必需字段:返回校验错误信息
- init 失败:记录错误但不阻止加载
2.1.3 插件列表加载(load)
function _M.load(plugin_names, wasm_plugin_names)
local processed = {}
local plugins_list = core.table.new(#plugin_names, 0)
-- 加载HTTP插件
for _, name in ipairs(plugin_names) do
if processed[name] == nil then
local err = load_plugin(name, plugins_list, "http")
if err then
core.log.error("failed to load plugin ", name, ": ", err)
end
processed[name] = true
end
end
-- 卸载不再使用的插件
local old_plugins = local_plugins
for _, plugin in ipairs(old_plugins) do
if processed[plugin.name] == nil and plugin.destroy then
plugin.destroy()
end
end
-- 按优先级降序排序
core.table.sort(plugins_list, function(a, b)
return a.priority > b.priority
end)
-- 更新全局插件列表
local_plugins = plugins_list
local_plugins_hash = {}
for _, plugin in ipairs(plugins_list) do
local_plugins_hash[plugin.name] = plugin
end
-- 加载WASM插件(此处省略WASM加载逻辑)
return true
end
功能说明:
- 遍历
plugin_names加载插件 - 卸载旧插件(调用
destroy()清理资源) - 按优先级排序插件列表
- 更新全局
local_plugins和local_plugins_hash
性能优化:
- 使用
processed表避免重复加载 - 预分配
plugins_list大小(core.table.new) - 排序后缓存,避免每次请求排序
2.2 插件过滤器(plugin.filter)
2.2.1 过滤逻辑
function _M.filter(ctx, conf, plugins, route_conf, phase)
local user_plugin_conf = conf
plugins = plugins or core.tablepool.fetch("plugins", 32, 0)
local route_plugins = {}
-- 遍历所有已加载插件
for i = 1, #local_plugins, 2 do
local plugin = local_plugins[i]
local plugin_conf = user_plugin_conf[plugin.name]
if plugin_conf then
-- 检查_meta.filter条件
local meta = plugin_conf._meta
if meta and meta.filter then
local match_result = expr_match(meta.filter, ctx.var)
if not match_result then
-- 不满足filter条件,跳过此插件
goto CONTINUE
end
end
-- 添加到待执行列表
core.table.insert(route_plugins, plugin)
core.table.insert(route_plugins, plugin_conf)
end
::CONTINUE::
end
-- 合并到plugins列表
for i = 1, #route_plugins do
core.table.insert(plugins, route_plugins[i])
end
core.tablepool.release("plugins", route_plugins)
return plugins
end
功能说明:
- 遍历全局
local_plugins,检查 Route 配置中是否启用 - 评估
_meta.filter表达式(基于 Nginx 变量) - 返回符合条件的插件列表(保持优先级顺序)
filter表达式示例:
-- 只在特定路径执行插件
_meta.filter = {
{"arg_debug", "==", "true"} -- 仅当查询参数debug=true时执行
}
-- 组合条件
_meta.filter = {
{"uri", "!", "~=", "^/static/"},
{"remote_addr", "in", {"10.0.0.0/8"}}
}
2.2.2 配置合并(merge_service_route)
function _M.merge_service_route(service_conf, route_conf)
local new_conf = core.table.clone(route_conf)
for plugin_name, plugin_conf in pairs(service_conf) do
if new_conf[plugin_name] then
-- Route配置优先级高于Service
if type(plugin_conf) ~= "table" or type(new_conf[plugin_name]) ~= "table" then
goto CONTINUE
end
-- 深度合并配置对象
local merged_conf = core.table.merge(plugin_conf, new_conf[plugin_name])
new_conf[plugin_name] = merged_conf
else
new_conf[plugin_name] = plugin_conf
end
::CONTINUE::
end
return new_conf
end
合并规则:
- Route 配置优先级 > Service 配置
- 深度合并对象类型配置
- 非对象类型直接覆盖
合并示例:
-- Service配置
{
["limit-count"] = {count = 100, time_window = 60}
}
-- Route配置
{
["limit-count"] = {count = 50} -- 覆盖count,保留time_window
}
-- 合并结果
{
["limit-count"] = {count = 50, time_window = 60}
}
2.3 插件执行器(plugin.run_plugin)
2.3.1 执行逻辑
function _M.run_plugin(phase, plugins, api_ctx)
local plugin_run = false
-- 按优先级依次执行插件
for i = 1, #plugins, 2 do
local plugin = plugins[i]
local plugin_conf = plugins[i + 1]
-- 检查插件是否实现该phase
local phase_func = plugin[phase]
if not phase_func then
goto CONTINUE
end
plugin_run = true
-- 执行_meta.pre_function(前置钩子)
local meta = plugin_conf._meta
if meta and meta.pre_function then
local code, body = meta.pre_function(api_ctx)
if code or body then
core.response.exit(code, body)
end
end
-- 执行插件主函数
local code, body = phase_func(plugin_conf, api_ctx)
if code or body then
if is_http then
core.response.exit(code, body)
else
return code, body
end
end
::CONTINUE::
end
return plugin_run
end
执行特点:
- 双索引遍历:
plugins[i]是插件对象,plugins[i+1]是配置 - 检查插件是否实现当前 phase
- 先执行
_meta.pre_function,再执行插件主函数 - 任意插件返回非 nil 的 code 或 body,立即中断并响应
阶段函数签名:
function plugin:rewrite(conf, ctx)
-- conf: 插件配置对象
-- ctx: api_ctx上下文
-- 返回: code, body(可选,用于提前响应)
end
2.3.2 Global Rules 执行
function _M.run_global_rules(api_ctx, global_rules, phase_name)
if not global_rules.values then
return
end
for _, global_rule in config_util.iterate_values(global_rules.values) do
local plugins = core.tablepool.fetch("plugins", 32, 0)
-- 过滤global_rule的插件
plugins = _M.filter(api_ctx, global_rule, plugins)
-- 执行插件
_M.run_plugin(phase_name, plugins, api_ctx)
core.tablepool.release("plugins", plugins)
end
end
Global Rules 特点:
- 全局规则中的插件在 Route 插件之前执行
- 每个 global_rule 独立过滤和执行插件
- 支持多个 global_rule 并行生效
三、核心插件实现
3.1 认证插件:key-auth
3.1.1 插件Schema
local schema = {
type = "object",
properties = {
header = {type = "string", default = "apikey"},
query = {type = "string", default = "apikey"},
hide_credentials = {type = "boolean", default = false},
anonymous_consumer = {...}, -- 匿名Consumer配置
},
}
local consumer_schema = {
type = "object",
properties = {
key = {type = "string"},
},
encrypt_fields = {"key"}, -- 敏感字段加密
required = {"key"},
}
字段说明:
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
| header | string | apikey | 从哪个HTTP头读取API Key |
| query | string | apikey | 从哪个查询参数读取API Key |
| hide_credentials | boolean | false | 是否隐藏凭证(删除header/query) |
| anonymous_consumer | string | - | 认证失败时使用的匿名Consumer |
Consumer配置:
key:Consumer的唯一API Key(加密存储)
3.1.2 认证流程
function _M.rewrite(conf, ctx)
-- 1. 提取API Key
local key = core.request.header(ctx, conf.header)
if not key then
local uri_args = core.request.get_uri_args(ctx) or {}
key = uri_args[conf.query]
from_header = false
end
if not key then
return 401, {message = "Missing API key in request"}
end
-- 2. 查找Consumer
local consumer, consumer_conf, err = consumer_mod.find_consumer("key-auth", "key", key)
if not consumer then
-- 尝试使用匿名Consumer
if not conf.anonymous_consumer then
return 401, {message = "Invalid API key in request"}
end
consumer, consumer_conf = consumer_mod.get_anonymous_consumer(conf.anonymous_consumer)
end
-- 3. 隐藏凭证
if conf.hide_credentials then
if from_header then
core.request.set_header(ctx, conf.header, nil)
else
local args = core.request.get_uri_args(ctx)
args[conf.query] = nil
core.request.set_uri_args(ctx, args)
end
end
-- 4. 附加Consumer到上下文
consumer_mod.attach_consumer(ctx, consumer, consumer_conf)
end
认证时序图:
sequenceDiagram
autonumber
participant C as Client
participant K as key-auth插件
participant CM as Consumer Manager
participant U as Upstream
C->>K: GET /api?apikey=abc123
K->>K: 提取API Key(从header或query)
K->>CM: find_consumer("key-auth", "key", "abc123")
CM->>CM: 查询Consumer表(解密key字段)
alt Consumer存在
CM-->>K: 返回Consumer对象
K->>K: attach_consumer(ctx, consumer)
K->>U: 转发请求(附带Consumer信息)
U-->>C: 200 OK
else Consumer不存在
CM-->>K: 返回nil
alt 配置了anonymous_consumer
K->>CM: get_anonymous_consumer()
CM-->>K: 返回匿名Consumer
K->>U: 转发请求
else 未配置anonymous_consumer
K-->>C: 401 Unauthorized
end
end
性能优化:
- Consumer 查找结果缓存(LRU Cache)
- 加密 key 延迟解密(仅在匹配时解密)
- 使用
ngx.ctx传递 Consumer,避免重复查找
3.2 流量控制插件:limit-count
3.2.1 插件Schema
local schema = {
type = "object",
properties = {
count = {type = "integer", exclusiveMinimum = 0},
time_window = {type = "integer", exclusiveMinimum = 0},
key = {type = "string", default = "remote_addr"},
key_type = {
type = "string",
enum = {"var", "var_combination", "constant"},
default = "var",
},
rejected_code = {type = "integer", minimum = 200, maximum = 599, default = 503},
rejected_msg = {type = "string"},
policy = {
type = "string",
enum = {"local", "redis", "redis-cluster"},
default = "local",
},
allow_degradation = {type = "boolean", default = false},
show_limit_quota_header = {type = "boolean", default = true},
},
required = {"count", "time_window"},
}
字段说明:
| 字段 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| count | integer | 是 | - | 时间窗口内允许的请求数 |
| time_window | integer | 是 | - | 时间窗口大小(秒) |
| key | string | 否 | remote_addr | 限流键(Nginx变量名) |
| key_type | enum | 否 | var | 键类型:var/var_combination/constant |
| policy | enum | 否 | local | 限流策略:local/redis/redis-cluster |
| allow_degradation | boolean | 否 | false | Redis故障时是否降级(允许请求通过) |
| show_limit_quota_header | boolean | 否 | true | 是否返回限流剩余额度头 |
3.2.2 限流核心逻辑
function _M.access(conf, ctx)
-- 1. 生成限流键
local key = gen_limit_key(conf, ctx, conf.key)
-- 2. 获取限流对象(local/redis/redis-cluster)
local limit_obj = lrucache(conf, nil, create_limit_obj, conf, plugin_name)
-- 3. 执行限流检查
local delay, remaining = limit_obj:incoming(key, true)
if not delay then
local err = remaining
if err == "rejected" then
-- 超出限流阈值
if conf.rejected_msg then
return conf.rejected_code, {error_msg = conf.rejected_msg}
else
return conf.rejected_code
end
end
-- Redis故障
if conf.allow_degradation then
core.log.warn("limit-count failed: ", err, ", allow degradation")
return -- 允许请求通过
end
return 500, {error_msg = "failed to limit count: " .. err}
end
-- 4. 设置响应头(剩余额度)
if conf.show_limit_quota_header then
core.response.set_header("X-RateLimit-Limit", conf.count)
core.response.set_header("X-RateLimit-Remaining", remaining)
core.response.set_header("X-RateLimit-Reset", conf.time_window)
end
end
限流策略对比:
| 策略 | 实现 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|---|
| local | 基于 lua_shared_dict |
单机部署 | 性能极高,无外部依赖 | 不支持分布式 |
| redis | 基于 Redis 单机 | 小规模集群 | 分布式一致性 | 单点故障风险 |
| redis-cluster | 基于 Redis 集群 | 大规模集群 | 高可用,水平扩展 | 配置复杂 |
3.2.3 本地限流实现(limit-count-local)
function _M.new(plugin_name, count, time_window)
local limit_obj = {
count = count,
time_window = time_window,
}
function limit_obj:incoming(key, commit)
local dict = ngx.shared["plugin-limit-count"]
local ttl, remaining
-- 获取当前计数
local current, err = dict:get(key)
if not current then
current = 0
end
if current + 1 > count then
return nil, "rejected"
end
-- 原子递增
if commit then
current, err = dict:incr(key, 1, 0, time_window)
if not current then
return nil, err
end
end
remaining = count - current - 1
return 0, remaining
end
return limit_obj
end
核心技术点:
- 使用
ngx.shared.DICT存储计数器 incr(key, value, init, expire)原子操作保证并发安全- 首次访问时初始化计数器并设置 TTL
时序图:
sequenceDiagram
autonumber
participant C1 as Client1
participant C2 as Client2
participant L as limit-count插件
participant D as shared_dict
Note over L,D: count=3, time_window=60s
C1->>L: 请求1
L->>D: incr("10.0.0.1", 1, 0, 60)
D-->>L: current=1, remaining=2
L->>L: 设置X-RateLimit-Remaining: 2
L-->>C1: 200 OK
C2->>L: 请求2(同IP)
L->>D: incr("10.0.0.1", 1)
D-->>L: current=2, remaining=1
L-->>C2: 200 OK
C1->>L: 请求3
L->>D: incr("10.0.0.1", 1)
D-->>L: current=3, remaining=0
L-->>C1: 200 OK
C2->>L: 请求4(超限)
L->>D: get("10.0.0.1")
D-->>L: current=3
L->>L: current + 1 > count
L-->>C2: 503 Service Temporarily Unavailable
Note over D: 60秒后key自动过期
3.3 转换插件:proxy-rewrite
3.3.1 插件Schema
local schema = {
type = "object",
properties = {
uri = {type = "string", minLength = 1, maxLength = 4096},
method = {
type = "string",
enum = {"GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"},
},
regex_uri = {
description = "regex to match URI, [pattern, replacement, options]",
type = "array",
minItems = 2,
items = {type = "string"},
},
host = {type = "string"},
scheme = {type = "string", enum = {"http", "https", "grpc", "grpcs"}},
headers = {
description = "new headers for request",
type = "object",
},
use_real_request_uri_unsafe = {type = "boolean", default = false},
},
}
字段说明:
| 字段 | 类型 | 说明 |
|---|---|---|
| uri | string | 替换请求URI |
| method | enum | 替换请求方法 |
| regex_uri | array | 正则替换URI:[pattern, replacement, options] |
| host | string | 替换Host头 |
| scheme | enum | 替换scheme(http/https/grpc/grpcs) |
| headers | object | 设置/添加/删除请求头 |
3.3.2 URI重写逻辑
function _M.rewrite(conf, ctx)
-- 1. 重写URI
if conf.uri then
ctx.var.upstream_uri = core.utils.resolve_var(conf.uri, ctx.var)
elseif conf.regex_uri then
local uri = ctx.var.uri
local regex_uri = conf.regex_uri
local newuri, _, err = ngx.re.sub(uri, regex_uri[1], regex_uri[2], regex_uri[3] or "jo")
if not newuri then
core.log.error("failed to substitute regex_uri: ", err)
else
ctx.var.upstream_uri = newuri
end
end
-- 2. 重写方法
if conf.method then
ngx.req.set_method(ngx["HTTP_" .. conf.method])
end
-- 3. 重写Host
if conf.host then
ctx.var.upstream_host = core.utils.resolve_var(conf.host, ctx.var)
end
-- 4. 重写Scheme
if conf.scheme then
ctx.var.upstream_scheme = conf.scheme
end
-- 5. 重写Headers
if conf.headers then
for key, value in pairs(conf.headers) do
if value == "" then
core.request.set_header(ctx, key, nil) -- 删除header
else
core.request.set_header(ctx, key, core.utils.resolve_var(value, ctx.var))
end
end
end
end
URI重写示例:
-- 示例1:静态替换
{
"uri": "/api/v2$request_uri"
}
-- /users → /api/v2/users
-- 示例2:正则替换
{
"regex_uri": ["^/api/(.*)", "/$1"]
}
-- /api/users → /users
-- 示例3:变量替换
{
"uri": "/service/$host/$request_uri",
"host": "backend-$arg_version.example.com"
}
-- 请求:GET /users?version=v2 Host: api.example.com
-- 转发:GET /service/api.example.com/users Host: backend-v2.example.com
四、负载均衡机制
4.1 负载均衡总览
职责:
- 在
balancer_by_lua阶段选择上游服务器 - 支持多种负载均衡算法:roundrobin、chash、ewma、least_conn、priority
- 集成健康检查,自动剔除不健康节点
- 支持失败重试和优先级切换
架构图:
flowchart TB
subgraph 上游配置
U1[Upstream定义<br/>nodes+算法] --> U2[健康检查配置<br/>active/passive]
U2 --> U3[服务发现集成<br/>consul/nacos等]
end
subgraph 负载均衡器
B1[balancer.pick_server] --> B2[fetch_health_nodes<br/>获取健康节点]
B2 --> B3[create_server_picker<br/>创建选择器]
B3 --> B4{选择算法}
B4 -->|roundrobin| RR[轮询选择器]
B4 -->|chash| CH[一致性哈希]
B4 -->|ewma| EW[EWMA选择器]
B4 -->|least_conn| LC[最少连接]
B4 -->|priority| PR[优先级选择]
end
subgraph 健康检查
H1[healthcheck_manager] --> H2[active主动探测]
H1 --> H3[passive被动检查]
H3 --> H4[失败上报]
end
subgraph Nginx Balancer
N1[set_current_peer] --> N2[设置upstream地址]
N2 --> N3[配置keepalive]
N3 --> N4[设置超时/重试]
end
U3 --> B1
B4 --> H1
B4 --> N1
style B3 fill:#e1f5ff
style H1 fill:#fff4e1
4.2 负载均衡算法详解
4.2.1 轮询算法(roundrobin)
数据结构:
local picker = roundrobin:new(up_nodes)
-- up_nodes格式:{["127.0.0.1:8080"] = 10, ["127.0.0.1:8081"] = 20}
核心逻辑:
function _M.new(up_nodes, upstream)
local picker = roundrobin:new(up_nodes)
local nodes_count = nkeys(up_nodes)
return {
upstream = upstream,
-- 选择服务器
get = function (ctx)
-- 检查是否所有节点都已尝试
if ctx.balancer_tried_servers_count == nodes_count then
return nil, "all upstream servers tried"
end
-- 循环选择,跳过已尝试的节点
local server, err
for i = 1, safe_limit do
server, err = picker:find()
if not server then
return nil, err
end
if not ctx.balancer_tried_servers or
not ctx.balancer_tried_servers[server] then
break
end
end
return server
end,
-- 重试后回调
after_balance = function (ctx, before_retry)
if not before_retry then
-- 请求完成,释放资源
core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
ctx.balancer_tried_servers = nil
return
end
-- 记录已尝试的服务器
if not ctx.balancer_tried_servers then
ctx.balancer_tried_servers = core.tablepool.fetch("balancer_tried_servers", 0, 2)
end
ctx.balancer_tried_servers[ctx.balancer_server] = true
ctx.balancer_tried_servers_count = (ctx.balancer_tried_servers_count or 0) + 1
end,
}
end
算法特点:
- 基于权重的加权轮询(WRR)
- 权重越高,被选中概率越大
- 支持失败重试(跳过已尝试节点)
- 权重可以为 0(备用节点)
时序图:
sequenceDiagram
autonumber
participant B as Balancer
participant RR as RoundRobin Picker
participant N1 as Node1(weight=10)
participant N2 as Node2(weight=20)
participant N3 as Node3(weight=10)
B->>RR: find()
RR->>RR: 按权重计算概率
RR-->>B: Node2(权重最高)
B->>N2: 转发请求
N2-->>B: 失败
B->>B: after_balance(before_retry=true)
B->>B: 记录已尝试:{Node2: true}
B->>RR: find()
RR-->>B: Node2
B->>B: Node2已尝试,跳过
B->>RR: find()
RR-->>B: Node1
B->>N1: 转发请求
N1-->>B: 成功
4.2.2 一致性哈希(chash)
核心逻辑:
function _M.new(up_nodes, upstream)
local nodes = {}
local servers = {}
-- 构建一致性哈希环
for serv, weight in pairs(up_nodes) do
local id = str_gsub(serv, ":", str_null)
servers[id] = serv
nodes[id] = weight / gcd -- 归一化权重
end
local picker = resty_chash:new(nodes)
return {
upstream = upstream,
get = function (ctx)
local id
if ctx.balancer_tried_servers then
-- 重试:使用next()获取下一个节点
id, ctx.chash_last_server_index = picker:next(ctx.chash_last_server_index)
else
-- 首次:根据hash key查找
local chash_key = fetch_chash_hash_key(ctx, upstream)
id, ctx.chash_last_server_index = picker:find(chash_key)
end
return servers[id]
end,
}
end
Hash Key生成:
local function fetch_chash_hash_key(ctx, upstream)
local hash_on = upstream.hash_on or "vars"
local key = upstream.key
local chash_key
if hash_on == "consumer" then
chash_key = ctx.consumer_name
elseif hash_on == "vars" then
chash_key = ctx.var[key] -- 例如:remote_addr
elseif hash_on == "header" then
chash_key = ctx.var["http_" .. key]
elseif hash_on == "cookie" then
chash_key = ctx.var["cookie_" .. key]
elseif hash_on == "vars_combinations" then
chash_key = core.utils.resolve_var(key, ctx.var) -- 例如:"$host$uri"
end
if not chash_key then
chash_key = ctx.var["remote_addr"] -- 默认使用客户端IP
end
return chash_key
end
hash_on配置示例:
-- 按客户端IP哈希
{
"type": "chash",
"hash_on": "vars",
"key": "remote_addr"
}
-- 按URL哈希(缓存场景)
{
"type": "chash",
"hash_on": "vars",
"key": "uri"
}
-- 按Consumer哈希(用户会话亲和性)
{
"type": "chash",
"hash_on": "consumer"
}
-- 按组合变量哈希
{
"type": "chash",
"hash_on": "vars_combinations",
"key": "$host$request_uri"
}
一致性哈希优势:
- 节点增减时,只影响相邻节点的流量
- 同一 hash key 始终路由到同一节点(会话亲和性)
- 支持虚拟节点(默认 160 个),更均匀分布
4.3 健康检查集成
4.3.1 健康节点过滤
local function fetch_health_nodes(upstream, checker)
local nodes = upstream.nodes
if not checker then
return nodes
end
local host = upstream.checks and upstream.checks.active and upstream.checks.active.host
local port = upstream.checks and upstream.checks.active and upstream.checks.active.port
local healthy_nodes = core.table.new(0, nkeys(nodes))
for addr, weight in pairs(nodes) do
local ok, err = checker:get_target_status(host or parse_domain_from_addr(addr),
port or parse_port_from_addr(addr))
if ok then
healthy_nodes[addr] = weight
else
core.log.warn("exclude unhealthy node: ", addr, " ", err)
end
end
if core.table.nkeys(healthy_nodes) == 0 then
core.log.warn("all upstream nodes are unhealthy, fallback to full nodes")
return nodes -- 所有节点都不健康时,使用全部节点
end
return healthy_nodes
end
健康检查策略:
| 类型 | 机制 | 配置项 | 适用场景 |
|---|---|---|---|
| active | 定时主动探测 | checks.active.http_pathchecks.active.interval |
对延迟敏感的服务 |
| passive | 根据请求结果判断 | checks.passive.unhealthy.http_failureschecks.passive.unhealthy.tcp_failures |
高并发场景 |
健康检查配置示例:
{
"checks": {
"active": {
"type": "http",
"http_path": "/health",
"interval": 5,
"timeout": 3,
"unhealthy": {
"http_statuses": [500, 502, 503, 504],
"http_failures": 3
},
"healthy": {
"http_statuses": [200, 302],
"successes": 2
}
},
"passive": {
"unhealthy": {
"http_statuses": [500, 502, 503, 504],
"http_failures": 5,
"tcp_failures": 3
}
}
}
}
4.4 失败重试机制
function _M.pick_server(route, ctx)
local up_conf = ctx.upstream_conf
local checker = ctx.up_checker
-- 最多重试:retries次(Route配置)或1次(默认)
local retries = ctx.upstream_retries or 0
for i = 0, retries do
-- 获取健康节点
local nodes = fetch_health_nodes(up_conf, checker)
if not nodes or core.table.nkeys(nodes) == 0 then
return nil, "no healthy upstream node"
end
-- 创建server picker
local picker = lrucache(up_conf, up_conf.ver, create_server_picker, up_conf, checker)
-- 选择服务器
local server, err = picker.get(ctx)
if not server then
if i < retries then
core.log.warn("failed to pick server, retry: ", err)
picker.after_balance(ctx, true)
goto CONTINUE
else
return nil, "failed to pick server: " .. err
end
end
ctx.balancer_server = server
ctx.balancer_picker = picker
return server
::CONTINUE::
end
end
重试策略:
- 默认重试 1 次(
retries = 0表示 1 次请求,不额外重试) - 每次重试都会重新获取健康节点
- 重试时跳过已尝试的节点
- 所有节点都失败后,返回错误
失败上报(passive健康检查):
-- 在http_log_phase执行
function _M.http_log_phase(api_ctx)
local checker = api_ctx.up_checker
if not checker then
return
end
local status = ngx.status
local unhealthy_status = api_ctx.upstream_conf.checks.passive.unhealthy.http_statuses
if unhealthy_status and core.table.array_find(unhealthy_status, status) then
-- 上报失败
checker:report_http_status(api_ctx.balancer_ip,
api_ctx.balancer_port,
status)
else
-- 上报成功
checker:report_http_status(api_ctx.balancer_ip,
api_ctx.balancer_port,
200)
end
end
五、服务发现集成
5.1 服务发现总览
支持的服务发现:
- Consul
- Consul KV
- Nacos
- Eureka
- Kubernetes
- DNS
- Tars
工作原理:
flowchart LR
subgraph 配置层
A[Upstream配置<br/>discovery_type=nacos<br/>service_name=userservice]
end
subgraph 发现层
B[discovery.nacos] --> C[定时查询Nacos]
C --> D[解析服务实例列表]
D --> E[更新nodes缓存]
end
subgraph 使用层
F[upstream.set_by_route] --> G[调用discovery.nodes()]
G --> H[返回实时节点列表]
H --> I[负载均衡选择节点]
end
A --> B
E --> G
style C fill:#e1f5ff
style H fill:#fff4e1
5.2 Nacos服务发现实现
5.2.1 初始化与同步
function _M.init_worker()
local local_conf = core.config.local_conf()
local nacos_conf = local_conf.discovery.nacos
-- 配置Nacos服务器地址
local host = nacos_conf.host
local port = nacos_conf.port or 8848
local namespace_id = nacos_conf.namespace_id or "public"
-- 创建定时器,定期拉取服务列表
local fetch_interval = nacos_conf.fetch_interval or 30
local timer_err
local function fetch_all_services()
local services_list = http_get("/nacos/v1/ns/service/list", {
pageNo = 1,
pageSize = 10000,
namespaceId = namespace_id,
})
for _, service in ipairs(services_list.doms) do
local instances = http_get("/nacos/v1/ns/instance/list", {
serviceName = service,
namespaceId = namespace_id,
})
-- 更新本地缓存
update_service_nodes(service, instances.hosts)
end
end
timer_err = timer.every(fetch_interval, fetch_all_services)
if timer_err then
core.log.error("failed to create timer: ", timer_err)
end
end
5.2.2 节点查询
function _M.nodes(service_name)
-- 从缓存获取节点列表
local nodes = service_cache[service_name]
if not nodes then
core.log.warn("service not found in discovery: ", service_name)
return nil
end
-- 转换为APISIX nodes格式
local apisix_nodes = {}
for _, instance in ipairs(nodes) do
if instance.enabled and instance.healthy then
local addr = instance.ip .. ":" .. instance.port
apisix_nodes[addr] = instance.weight or 1
end
end
return apisix_nodes
end
5.2.3 使用服务发现的Upstream配置
{
"id": "1",
"type": "roundrobin",
"discovery_type": "nacos",
"service_name": "userservice",
"discovery_args": {
"namespace_id": "test",
"group_name": "DEFAULT_GROUP"
}
}
时序图:
sequenceDiagram
autonumber
participant T as Timer
participant D as Discovery(Nacos)
participant N as Nacos Server
participant U as Upstream Handler
participant B as Balancer
Note over T,N: === 初始化阶段 ===
T->>D: 每30秒触发
D->>N: GET /nacos/v1/ns/service/list
N-->>D: {"doms": ["userservice", "orderservice"]}
loop 每个服务
D->>N: GET /nacos/v1/ns/instance/list?serviceName=userservice
N-->>D: {"hosts": [{"ip": "10.0.1.1", "port": 8080, "weight": 100, "healthy": true}]}
D->>D: 更新service_cache["userservice"]
end
Note over U,B: === 请求处理阶段 ===
U->>D: nodes("userservice")
D-->>U: {"10.0.1.1:8080": 100, "10.0.1.2:8080": 100}
U->>B: 传递节点列表
B->>B: 负载均衡选择节点
服务发现优势:
- 无需手动配置节点,自动感知服务实例变化
- 支持多环境(通过 namespace/group 隔离)
- 集成注册中心的健康检查结果
六、数据流与调用链
6.1 插件完整调用链
flowchart TD
Start[请求到达] --> Init[init.http_access_phase]
Init --> Match[router.match]
Match --> Filter[plugin.filter<br/>过滤插件]
Filter --> MergeService[merge_service_route<br/>合并Service配置]
MergeService --> MergeConsumer[merge_consumer_route<br/>合并Consumer配置]
MergeConsumer --> GlobalRules[run_global_rules<br/>全局规则]
GlobalRules --> Rewrite[run_plugin<br/>phase=rewrite]
Rewrite --> Access[run_plugin<br/>phase=access]
Access --> Upstream[handle_upstream]
Upstream --> Balancer[balancer.pick_server]
Balancer --> Proxy[ngx.balancer.set_current_peer]
Proxy --> HeaderFilter[run_plugin<br/>phase=header_filter]
HeaderFilter --> BodyFilter[run_plugin<br/>phase=body_filter]
BodyFilter --> Log[run_plugin<br/>phase=log]
Log --> End[响应客户端]
style Filter fill:#e1f5ff
style Access fill:#fff4e1
style Balancer fill:#ffe1e1
6.2 负载均衡完整调用链
flowchart TD
Start[init.handle_upstream] --> GetUpstream[upstream.set_by_route]
GetUpstream --> CheckDiscovery{配置了服务发现?}
CheckDiscovery -->|是| Discovery[discovery.nodes<br/>获取实时节点]
CheckDiscovery -->|否| StaticNodes[使用upstream.nodes]
Discovery --> FillNodes[upstream.fill_node_info]
StaticNodes --> FillNodes
FillNodes --> HealthCheck[创建health_checker]
HealthCheck --> SetContext[设置ctx.upstream_conf]
SetContext --> BalancerPhase[balancer_by_lua_phase]
BalancerPhase --> PickServer[balancer.pick_server]
PickServer --> FetchHealthy[fetch_health_nodes<br/>过滤不健康节点]
FetchHealthy --> CreatePicker[create_server_picker<br/>创建负载均衡器]
CreatePicker --> Algorithm{负载均衡算法}
Algorithm -->|roundrobin| RR[roundrobin.get]
Algorithm -->|chash| CH[chash.get]
Algorithm -->|ewma| EW[ewma.get]
RR --> SetPeer[balancer.set_current_peer]
CH --> SetPeer
EW --> SetPeer
SetPeer --> Proxy[proxy_pass到upstream]
Proxy --> Result{请求结果}
Result -->|成功| ReportSuccess[passive健康检查上报成功]
Result -->|失败| Retry{是否重试?}
Retry -->|是| PickServer
Retry -->|否| ReportFail[passive健康检查上报失败]
style CreatePicker fill:#e1f5ff
style Algorithm fill:#fff4e1
七、关键设计与权衡
7.1 插件优先级设计
优先级范围:
10000+:核心功能插件(如real-ip、request-id)5000-10000:安全插件(如ip-restriction、cors)2000-5000:认证插件(如key-auth、jwt-auth)1000-2000:流量控制(如limit-count、limit-req)500-1000:转换插件(如proxy-rewrite、response-rewrite)0-500:日志插件(如http-logger、prometheus)
部分核心插件优先级:
real-ip: priority = 23000
request-id: priority = 12015
ip-restriction: priority = 3000
consumer-restriction: priority = 2990
key-auth: priority = 2500
jwt-auth: priority = 2510
limit-count: priority = 1002
limit-req: priority = 1001
proxy-rewrite: priority = 1008
response-rewrite: priority = 899
http-logger: priority = 410
prometheus: priority = 500
优先级权衡:
- 优点:插件执行顺序可预测,避免配置错误
- 缺点:开发新插件需要合理选择优先级,避免冲突
- 最佳实践:相同类型插件优先级接近,便于分类管理
7.2 负载均衡算法选择
| 算法 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| roundrobin | 通用场景 | 简单高效,权重分配均衡 | 无会话亲和性 |
| chash | 缓存/会话 | 会话亲和性,节点变化影响小 | 权重调整效果不明显 |
| ewma | 低延迟要求 | 根据实际响应时间动态调整 | 需要统计响应时间,有额外开销 |
| least_conn | 长连接 | 连接分配均衡 | 需要维护连接计数 |
| priority | 主备模式 | 故障自动切换 | 主节点成为瓶颈 |
选择建议:
- 默认选择:
roundrobin(性能最优) - 缓存场景:
chash(按 URI 哈希) - 用户会话:
chash(按 Consumer 哈希) - gRPC/长连接:
least_conn或ewma - 主备架构:
priority
7.3 健康检查策略
Active vs Passive:
| 类型 | 探测方式 | 资源消耗 | 检测延迟 | 适用场景 |
|---|---|---|---|---|
| Active | 定时发送健康检查请求 | 高(每节点独立探测) | 低(interval配置) | 对延迟敏感的服务 |
| Passive | 根据真实请求结果判断 | 低(无额外请求) | 高(需累积失败次数) | 高并发场景 |
推荐配置:
-- 同时启用Active和Passive,互补优势
{
"checks": {
"active": {
"interval": 5, -- 每5秒探测一次
"http_path": "/health",
"timeout": 3,
"unhealthy": {
"http_failures": 2 -- 连续2次失败标记不健康
}
},
"passive": {
"unhealthy": {
"http_failures": 5, -- 真实请求失败5次标记不健康
"tcp_failures": 3
}
}
}
}
7.4 服务发现集成权衡
静态 Nodes vs 服务发现:
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 静态 Nodes | 配置简单,性能最优 | 节点变更需手动更新 | 节点固定的小规模部署 |
| 服务发现 | 自动感知节点变化 | 依赖外部组件,有同步延迟 | 云原生/微服务架构 |
服务发现注意事项:
- 同步延迟:默认 30 秒拉取一次,节点变更有延迟
- 故障处理:注册中心故障时,使用缓存的节点列表
- 权重映射:注册中心的权重需要映射到 APISIX 的权重范围
八、异常处理与性能要点
8.1 插件异常处理
Schema 校验失败:
- 触发时机:Admin API 创建/更新配置时
- 处理方式:返回 400 错误,附带详细校验信息
- 恢复策略:无需恢复,拒绝非法配置
插件执行异常:
-- 插件代码错误
function _M.access(conf, ctx)
local ok, err = pcall(function()
-- 插件逻辑
end)
if not ok then
core.log.error("plugin error: ", err)
-- 不阻断请求,继续执行后续插件
end
end
_meta.filter 评估失败:
local match_result, err = expr_match(meta.filter, ctx.var)
if err then
core.log.error("failed to evaluate filter: ", err)
-- 默认允许插件执行
match_result = true
end
8.2 负载均衡异常处理
所有节点不健康:
if core.table.nkeys(healthy_nodes) == 0 then
core.log.warn("all upstream nodes are unhealthy, fallback to full nodes")
return nodes -- 降级:使用全部节点
end
服务发现故障:
local nodes, err = discovery.nodes(service_name)
if not nodes then
core.log.error("failed to fetch nodes from discovery: ", err)
-- 使用上次缓存的节点列表
nodes = last_known_nodes[service_name]
end
重试耗尽:
for i = 0, retries do
local server, err = picker.get(ctx)
if not server then
if i < retries then
core.log.warn("failed to pick server, retry: ", err)
goto CONTINUE
else
return 503, {error_msg = "no available upstream server"}
end
end
end
8.3 性能优化要点
插件加载优化:
- 按需加载:只加载
config.yaml中声明的插件 - 缓存插件对象:避免重复 require
- 优先级预排序:启动时排序一次,避免每次请求排序
插件过滤优化:
-- 使用tablepool减少GC
local plugins = core.tablepool.fetch("plugins", 32, 0)
-- ... 使用plugins
core.tablepool.release("plugins", plugins)
负载均衡优化:
-- 缓存server picker,避免重复创建
local picker = lrucache(up_conf, up_conf.ver, create_server_picker, up_conf, checker)
-- 预计算安全限制,避免无限循环
local safe_limit = 0
for _, weight in pairs(up_nodes) do
safe_limit = safe_limit + weight + 1
end
健康检查优化:
- 共享健康检查结果:同一 upstream 的多个 route 共享 checker
- 被动检查优先:减少主动探测开销
- 合理设置 interval:避免频繁探测
九、配置示例与最佳实践
9.1 插件配置最佳实践
9.1.1 使用 _meta.filter 条件执行
{
"uri": "/api/*",
"plugins": {
"limit-count": {
"count": 100,
"time_window": 60,
"_meta": {
"filter": [
["arg_debug", "!=", "true"]
]
}
}
}
}
说明:仅对非调试请求应用限流,调试请求不限流。
9.1.2 插件配置继承
// Service配置
{
"id": "1",
"plugins": {
"key-auth": {},
"limit-count": {"count": 1000, "time_window": 60}
}
}
// Route配置
{
"id": "1",
"service_id": "1",
"uri": "/vip/*",
"plugins": {
"limit-count": {"count": 5000} // 覆盖Service的count
}
}
说明:Route 的 VIP 路径提高限流阈值,继承 Service 的 time_window。
9.2 负载均衡配置最佳实践
9.2.1 带健康检查的 Upstream
{
"type": "roundrobin",
"nodes": {
"10.0.1.1:8080": 100,
"10.0.1.2:8080": 100,
"10.0.1.3:8080": 50
},
"checks": {
"active": {
"type": "http",
"http_path": "/health",
"interval": 5,
"timeout": 3,
"unhealthy": {
"http_statuses": [500, 502, 503, 504],
"http_failures": 3
},
"healthy": {
"successes": 2
}
},
"passive": {
"unhealthy": {
"http_statuses": [500, 502, 503, 504],
"http_failures": 5
}
}
},
"retries": 2,
"timeout": {
"connect": 5,
"send": 5,
"read": 10
}
}
9.2.2 一致性哈希实现缓存亲和性
{
"type": "chash",
"hash_on": "vars",
"key": "request_uri",
"nodes": {
"cache1:8080": 100,
"cache2:8080": 100,
"cache3:8080": 100
}
}
说明:按 URI 哈希,同一 URI 始终路由到同一缓存节点,提高缓存命中率。
9.2.3 服务发现集成
{
"type": "roundrobin",
"discovery_type": "nacos",
"service_name": "userservice",
"discovery_args": {
"namespace_id": "production",
"group_name": "DEFAULT_GROUP"
}
}
9.3 多插件组合场景
9.3.1 认证 + 限流 + 转换
{
"uri": "/api/users",
"plugins": {
"key-auth": {
"header": "X-API-Key"
},
"limit-count": {
"count": "$consumer_name == 'vip' and 10000 or 1000",
"time_window": 60,
"key": "consumer_name",
"key_type": "var"
},
"proxy-rewrite": {
"uri": "/v2/users",
"headers": {
"X-Consumer": "$consumer_name"
}
}
}
}
说明:
- 先执行
key-auth认证(priority 2500) - 根据 Consumer 身份限流(priority 1002)
- 重写 URI 和 Header(priority 1008)
十、总结
本文档详细剖析了 APISIX 的插件系统和负载均衡机制:
插件系统核心特点:
- 统一的生命周期管理(加载、过滤、执行)
- 灵活的配置合并和条件过滤
- 丰富的插件类型(认证、流控、转换、可观测、安全)
- 支持热更新和动态加载
负载均衡核心特点:
- 多种算法支持(roundrobin、chash、ewma、least_conn、priority)
- 健康检查集成(active/passive)
- 失败重试和优先级切换
- 服务发现无缝集成
关键设计权衡:
- 插件优先级设计保证执行顺序可预测
- 负载均衡算法根据场景选择
- 健康检查策略兼顾准确性和性能
- 服务发现提供动态节点管理能力
性能优化要点:
- 插件按需加载和缓存
- 使用 tablepool 减少 GC
- 负载均衡器缓存和预计算
- 健康检查结果共享
通过深入理解这些机制,可以高效配置和扩展 APISIX,满足复杂的业务需求。