APISIX-04-插件与负载均衡

本文档深入剖析 APISIX 的插件系统和负载均衡机制,包括插件生命周期、核心插件实现、多种负载均衡算法和服务发现集成。


一、插件系统总览

1.1 职责与边界

核心职责:

  • 提供统一的插件加载、卸载、版本管理机制
  • 在 Nginx 各阶段按优先级编排插件执行
  • 支持 HTTP 和 Stream 两种协议的插件
  • 支持 WASM 插件动态加载
  • 提供插件配置的 Schema 校验和数据加密
  • 实现插件元数据管理和热更新
  • 支持插件级别的条件过滤(_meta.filter

边界:

  • 插件配置可绑定到 Route、Service、Consumer、Consumer Group、Global Rules
  • 插件按优先级(priority)排序,数值越大优先级越高
  • 支持的 Nginx 阶段:rewriteaccessheader_filterbody_filterlogdelayed_body_filter
  • Stream 插件支持:prereadlog

上下游依赖:

  • 依赖 apisix.core 提供基础工具(log、schema、config)
  • 依赖 apisix.admin.plugins 管理插件列表
  • apisix.init 在各阶段调用执行

1.2 插件系统架构图

flowchart TB
    subgraph 配置层
        A[config.yaml<br/>插件列表] --> B[Admin API<br/>插件配置]
        B --> C[Etcd/YAML<br/>配置存储]
    end

    subgraph 加载层
        D[plugin.load<br/>加载插件] --> E[插件优先级排序]
        E --> F[插件Schema注入]
        F --> G[插件init</>调用]
    end

    subgraph 执行层
        H[plugin.filter<br/>过滤插件] --> I[检查_meta.filter]
        I --> J[合并Route/Service/Consumer]
        J --> K[plugin.run_plugin<br/>phase执行]
    end

    subgraph 核心插件类型
        L1[认证插件<br/>key-auth,jwt-auth]
        L2[流量控制<br/>limit-count,limit-req]
        L3[转换插件<br/>proxy-rewrite,response-rewrite]
        L4[可观测性<br/>prometheus,skywalking]
        L5[安全插件<br/>cors,csrf,ip-restriction]
    end

    C --> D
    G --> H
    K --> L1
    K --> L2
    K --> L3
    K --> L4
    K --> L5

    style H fill:#e1f5ff
    style K fill:#fff4e1
    style D fill:#f0f0f0

架构说明:

  1. 配置层:插件列表在 config.yaml 中声明,插件配置通过 Admin API 下发到 Etcd
  2. 加载层init_worker 阶段加载插件,按优先级排序,注入 Schema 元数据
  3. 执行层:请求处理时过滤可用插件,按 phase 依次执行
  4. 插件类型:认证、流控、转换、可观测、安全五大类

1.3 插件生命周期时序图

sequenceDiagram
    autonumber
    participant C as config.yaml
    participant A as Admin API
    participant P as plugin.lua
    participant I as init.lua
    participant R as Route Match
    participant E as Plugin Execution

    Note over C,P: === 启动阶段 ===
    C->>P: 读取plugins配置列表
    P->>P: load() 加载所有插件模块
    P->>P: 校验插件priority/version/schema
    P->>P: 调用插件init()方法
    P->>P: priority排序存储到local_plugins

    Note over A,E: === 配置阶段 ===
    A->>P: check_schema() 校验插件配置
    A->>P: encrypt_conf() 加密敏感字段
    A->>Etcd: 存储加密后的配置

    Note over I,E: === 请求处理阶段 ===
    I->>R: 匹配Route
    R->>P: filter() 过滤可用插件
    P->>P: 检查_meta.filter表达式
    P->>P: 合并Route/Service/Consumer插件
    P->>E: run_plugin(phase='rewrite')
    E->>E: 执行各插件rewrite()方法
    P->>E: run_plugin(phase='access')
    E->>E: 执行各插件access()方法

    Note over E: === 响应阶段 ===
    I->>P: run_plugin(phase='header_filter')
    I->>P: run_plugin(phase='body_filter')
    I->>P: run_plugin(phase='log')

    Note over P: === 热更新阶段 ===
    Admin API->>P: POST /apisix/admin/plugins/reload
    P->>P: load() 重新加载插件
    P->>P: 同步配置到Etcd

时序说明:

  1. 启动阶段(1-5):从 config.yaml 加载插件列表,初始化插件模块
  2. 配置阶段(6-8):Admin API 校验和加密插件配置后存储
  3. 请求处理(9-14):匹配 Route 后过滤和执行插件
  4. 响应阶段(15-17):在响应过滤和日志阶段执行插件
  5. 热更新(18-20):支持不重启动态加载插件

二、插件核心模块详解

2.1 插件加载器(plugin.lua)

2.1.1 核心数据结构

-- 插件定义结构
local plugin_definition = {
    name = "example-plugin",     -- 插件名称
    version = 0.1,               -- 插件版本
    priority = 1000,             -- 优先级(越大越早执行)
    type = 'auth',               -- 插件类型:auth/transform/log
    schema = {...},              -- 插件配置Schema
    consumer_schema = {...},     -- Consumer配置Schema
    init = function() end,       -- 初始化函数
    rewrite = function(conf, ctx) end,   -- rewrite阶段处理
    access = function(conf, ctx) end,    -- access阶段处理
    header_filter = function(conf, ctx) end,
    body_filter = function(conf, ctx) end,
    log = function(conf, ctx) end,
}

-- 全局插件存储
local local_plugins = {}         -- 已加载插件列表(按priority排序)
local local_plugins_hash = {}    -- 插件名称->插件对象映射
local stream_local_plugins = {}  -- Stream插件列表
local stream_local_plugins_hash = {}

2.1.2 插件加载流程(load_plugin)

local function load_plugin(name, plugins_list, plugin_type)
    local pkg_name = "apisix.plugins." .. name
    local pkg = require(pkg_name)

    -- 基本属性校验
    if not pkg.priority then
        return "invalid plugin priority"
    end
    if not pkg.version then
        return "invalid plugin version"
    end

    -- Schema注入_meta字段(支持filter表达式)
    if pkg.schema then
        inject_conf_with_meta_schema(pkg)
    end

    -- 调用插件初始化函数
    if pkg.init then
        pkg.init()
    end

    -- 插件工作流处理器(高级特性)
    if pkg.workflow_handler then
        pkg.workflow_handler()
    end

    core.table.insert(plugins_list, pkg)
    return nil
end

功能说明:

  • 动态 require 插件模块
  • 校验插件必需属性(priority、version)
  • 注入 _meta 字段到 Schema,支持条件过滤
  • 调用插件的 init()workflow_handler()

异常处理:

  • 插件模块不存在:返回 require 错误
  • 缺少必需字段:返回校验错误信息
  • init 失败:记录错误但不阻止加载

2.1.3 插件列表加载(load)

function _M.load(plugin_names, wasm_plugin_names)
    local processed = {}
    local plugins_list = core.table.new(#plugin_names, 0)

    -- 加载HTTP插件
    for _, name in ipairs(plugin_names) do
        if processed[name] == nil then
            local err = load_plugin(name, plugins_list, "http")
            if err then
                core.log.error("failed to load plugin ", name, ": ", err)
            end
            processed[name] = true
        end
    end

    -- 卸载不再使用的插件
    local old_plugins = local_plugins
    for _, plugin in ipairs(old_plugins) do
        if processed[plugin.name] == nil and plugin.destroy then
            plugin.destroy()
        end
    end

    -- 按优先级降序排序
    core.table.sort(plugins_list, function(a, b)
        return a.priority > b.priority
    end)

    -- 更新全局插件列表
    local_plugins = plugins_list
    local_plugins_hash = {}
    for _, plugin in ipairs(plugins_list) do
        local_plugins_hash[plugin.name] = plugin
    end

    -- 加载WASM插件(此处省略WASM加载逻辑)

    return true
end

功能说明:

  • 遍历 plugin_names 加载插件
  • 卸载旧插件(调用 destroy() 清理资源)
  • 按优先级排序插件列表
  • 更新全局 local_pluginslocal_plugins_hash

性能优化:

  • 使用 processed 表避免重复加载
  • 预分配 plugins_list 大小(core.table.new
  • 排序后缓存,避免每次请求排序

2.2 插件过滤器(plugin.filter)

2.2.1 过滤逻辑

function _M.filter(ctx, conf, plugins, route_conf, phase)
    local user_plugin_conf = conf
    plugins = plugins or core.tablepool.fetch("plugins", 32, 0)
    local route_plugins = {}

    -- 遍历所有已加载插件
    for i = 1, #local_plugins, 2 do
        local plugin = local_plugins[i]
        local plugin_conf = user_plugin_conf[plugin.name]

        if plugin_conf then
            -- 检查_meta.filter条件
            local meta = plugin_conf._meta
            if meta and meta.filter then
                local match_result = expr_match(meta.filter, ctx.var)
                if not match_result then
                    -- 不满足filter条件,跳过此插件
                    goto CONTINUE
                end
            end

            -- 添加到待执行列表
            core.table.insert(route_plugins, plugin)
            core.table.insert(route_plugins, plugin_conf)
        end

        ::CONTINUE::
    end

    -- 合并到plugins列表
    for i = 1, #route_plugins do
        core.table.insert(plugins, route_plugins[i])
    end

    core.tablepool.release("plugins", route_plugins)
    return plugins
end

功能说明:

  • 遍历全局 local_plugins,检查 Route 配置中是否启用
  • 评估 _meta.filter 表达式(基于 Nginx 变量)
  • 返回符合条件的插件列表(保持优先级顺序)

filter表达式示例:

-- 只在特定路径执行插件
_meta.filter = {
    {"arg_debug", "==", "true"}  -- 仅当查询参数debug=true时执行
}

-- 组合条件
_meta.filter = {
    {"uri", "!", "~=", "^/static/"},
    {"remote_addr", "in", {"10.0.0.0/8"}}
}

2.2.2 配置合并(merge_service_route)

function _M.merge_service_route(service_conf, route_conf)
    local new_conf = core.table.clone(route_conf)

    for plugin_name, plugin_conf in pairs(service_conf) do
        if new_conf[plugin_name] then
            -- Route配置优先级高于Service
            if type(plugin_conf) ~= "table" or type(new_conf[plugin_name]) ~= "table" then
                goto CONTINUE
            end

            -- 深度合并配置对象
            local merged_conf = core.table.merge(plugin_conf, new_conf[plugin_name])
            new_conf[plugin_name] = merged_conf
        else
            new_conf[plugin_name] = plugin_conf
        end

        ::CONTINUE::
    end

    return new_conf
end

合并规则:

  1. Route 配置优先级 > Service 配置
  2. 深度合并对象类型配置
  3. 非对象类型直接覆盖

合并示例:

-- Service配置
{
    ["limit-count"] = {count = 100, time_window = 60}
}

-- Route配置
{
    ["limit-count"] = {count = 50}  -- 覆盖count,保留time_window
}

-- 合并结果
{
    ["limit-count"] = {count = 50, time_window = 60}
}

2.3 插件执行器(plugin.run_plugin)

2.3.1 执行逻辑

function _M.run_plugin(phase, plugins, api_ctx)
    local plugin_run = false

    -- 按优先级依次执行插件
    for i = 1, #plugins, 2 do
        local plugin = plugins[i]
        local plugin_conf = plugins[i + 1]

        -- 检查插件是否实现该phase
        local phase_func = plugin[phase]
        if not phase_func then
            goto CONTINUE
        end

        plugin_run = true

        -- 执行_meta.pre_function(前置钩子)
        local meta = plugin_conf._meta
        if meta and meta.pre_function then
            local code, body = meta.pre_function(api_ctx)
            if code or body then
                core.response.exit(code, body)
            end
        end

        -- 执行插件主函数
        local code, body = phase_func(plugin_conf, api_ctx)
        if code or body then
            if is_http then
                core.response.exit(code, body)
            else
                return code, body
            end
        end

        ::CONTINUE::
    end

    return plugin_run
end

执行特点:

  1. 双索引遍历:plugins[i] 是插件对象,plugins[i+1] 是配置
  2. 检查插件是否实现当前 phase
  3. 先执行 _meta.pre_function,再执行插件主函数
  4. 任意插件返回非 nil 的 code 或 body,立即中断并响应

阶段函数签名:

function plugin:rewrite(conf, ctx)
    -- conf: 插件配置对象
    -- ctx: api_ctx上下文
    -- 返回: code, body(可选,用于提前响应)
end

2.3.2 Global Rules 执行

function _M.run_global_rules(api_ctx, global_rules, phase_name)
    if not global_rules.values then
        return
    end

    for _, global_rule in config_util.iterate_values(global_rules.values) do
        local plugins = core.tablepool.fetch("plugins", 32, 0)

        -- 过滤global_rule的插件
        plugins = _M.filter(api_ctx, global_rule, plugins)

        -- 执行插件
        _M.run_plugin(phase_name, plugins, api_ctx)

        core.tablepool.release("plugins", plugins)
    end
end

Global Rules 特点:

  • 全局规则中的插件在 Route 插件之前执行
  • 每个 global_rule 独立过滤和执行插件
  • 支持多个 global_rule 并行生效

三、核心插件实现

3.1 认证插件:key-auth

3.1.1 插件Schema

local schema = {
    type = "object",
    properties = {
        header = {type = "string", default = "apikey"},
        query = {type = "string", default = "apikey"},
        hide_credentials = {type = "boolean", default = false},
        anonymous_consumer = {...},  -- 匿名Consumer配置
    },
}

local consumer_schema = {
    type = "object",
    properties = {
        key = {type = "string"},
    },
    encrypt_fields = {"key"},  -- 敏感字段加密
    required = {"key"},
}

字段说明:

字段 类型 默认值 说明
header string apikey 从哪个HTTP头读取API Key
query string apikey 从哪个查询参数读取API Key
hide_credentials boolean false 是否隐藏凭证(删除header/query)
anonymous_consumer string - 认证失败时使用的匿名Consumer

Consumer配置:

  • key:Consumer的唯一API Key(加密存储)

3.1.2 认证流程

function _M.rewrite(conf, ctx)
    -- 1. 提取API Key
    local key = core.request.header(ctx, conf.header)
    if not key then
        local uri_args = core.request.get_uri_args(ctx) or {}
        key = uri_args[conf.query]
        from_header = false
    end

    if not key then
        return 401, {message = "Missing API key in request"}
    end

    -- 2. 查找Consumer
    local consumer, consumer_conf, err = consumer_mod.find_consumer("key-auth", "key", key)
    if not consumer then
        -- 尝试使用匿名Consumer
        if not conf.anonymous_consumer then
            return 401, {message = "Invalid API key in request"}
        end
        consumer, consumer_conf = consumer_mod.get_anonymous_consumer(conf.anonymous_consumer)
    end

    -- 3. 隐藏凭证
    if conf.hide_credentials then
        if from_header then
            core.request.set_header(ctx, conf.header, nil)
        else
            local args = core.request.get_uri_args(ctx)
            args[conf.query] = nil
            core.request.set_uri_args(ctx, args)
        end
    end

    -- 4. 附加Consumer到上下文
    consumer_mod.attach_consumer(ctx, consumer, consumer_conf)
end

认证时序图:

sequenceDiagram
    autonumber
    participant C as Client
    participant K as key-auth插件
    participant CM as Consumer Manager
    participant U as Upstream

    C->>K: GET /api?apikey=abc123
    K->>K: 提取API Key(从header或query)
    K->>CM: find_consumer("key-auth", "key", "abc123")
    CM->>CM: 查询Consumer表(解密key字段)
    alt Consumer存在
        CM-->>K: 返回Consumer对象
        K->>K: attach_consumer(ctx, consumer)
        K->>U: 转发请求(附带Consumer信息)
        U-->>C: 200 OK
    else Consumer不存在
        CM-->>K: 返回nil
        alt 配置了anonymous_consumer
            K->>CM: get_anonymous_consumer()
            CM-->>K: 返回匿名Consumer
            K->>U: 转发请求
        else 未配置anonymous_consumer
            K-->>C: 401 Unauthorized
        end
    end

性能优化:

  • Consumer 查找结果缓存(LRU Cache)
  • 加密 key 延迟解密(仅在匹配时解密)
  • 使用 ngx.ctx 传递 Consumer,避免重复查找

3.2 流量控制插件:limit-count

3.2.1 插件Schema

local schema = {
    type = "object",
    properties = {
        count = {type = "integer", exclusiveMinimum = 0},
        time_window = {type = "integer", exclusiveMinimum = 0},
        key = {type = "string", default = "remote_addr"},
        key_type = {
            type = "string",
            enum = {"var", "var_combination", "constant"},
            default = "var",
        },
        rejected_code = {type = "integer", minimum = 200, maximum = 599, default = 503},
        rejected_msg = {type = "string"},
        policy = {
            type = "string",
            enum = {"local", "redis", "redis-cluster"},
            default = "local",
        },
        allow_degradation = {type = "boolean", default = false},
        show_limit_quota_header = {type = "boolean", default = true},
    },
    required = {"count", "time_window"},
}

字段说明:

字段 类型 必填 默认值 说明
count integer - 时间窗口内允许的请求数
time_window integer - 时间窗口大小(秒)
key string remote_addr 限流键(Nginx变量名)
key_type enum var 键类型:var/var_combination/constant
policy enum local 限流策略:local/redis/redis-cluster
allow_degradation boolean false Redis故障时是否降级(允许请求通过)
show_limit_quota_header boolean true 是否返回限流剩余额度头

3.2.2 限流核心逻辑

function _M.access(conf, ctx)
    -- 1. 生成限流键
    local key = gen_limit_key(conf, ctx, conf.key)

    -- 2. 获取限流对象(local/redis/redis-cluster)
    local limit_obj = lrucache(conf, nil, create_limit_obj, conf, plugin_name)

    -- 3. 执行限流检查
    local delay, remaining = limit_obj:incoming(key, true)

    if not delay then
        local err = remaining
        if err == "rejected" then
            -- 超出限流阈值
            if conf.rejected_msg then
                return conf.rejected_code, {error_msg = conf.rejected_msg}
            else
                return conf.rejected_code
            end
        end

        -- Redis故障
        if conf.allow_degradation then
            core.log.warn("limit-count failed: ", err, ", allow degradation")
            return  -- 允许请求通过
        end

        return 500, {error_msg = "failed to limit count: " .. err}
    end

    -- 4. 设置响应头(剩余额度)
    if conf.show_limit_quota_header then
        core.response.set_header("X-RateLimit-Limit", conf.count)
        core.response.set_header("X-RateLimit-Remaining", remaining)
        core.response.set_header("X-RateLimit-Reset", conf.time_window)
    end
end

限流策略对比:

策略 实现 适用场景 优点 缺点
local 基于 lua_shared_dict 单机部署 性能极高,无外部依赖 不支持分布式
redis 基于 Redis 单机 小规模集群 分布式一致性 单点故障风险
redis-cluster 基于 Redis 集群 大规模集群 高可用,水平扩展 配置复杂

3.2.3 本地限流实现(limit-count-local)

function _M.new(plugin_name, count, time_window)
    local limit_obj = {
        count = count,
        time_window = time_window,
    }

    function limit_obj:incoming(key, commit)
        local dict = ngx.shared["plugin-limit-count"]
        local ttl, remaining

        -- 获取当前计数
        local current, err = dict:get(key)
        if not current then
            current = 0
        end

        if current + 1 > count then
            return nil, "rejected"
        end

        -- 原子递增
        if commit then
            current, err = dict:incr(key, 1, 0, time_window)
            if not current then
                return nil, err
            end
        end

        remaining = count - current - 1
        return 0, remaining
    end

    return limit_obj
end

核心技术点:

  • 使用 ngx.shared.DICT 存储计数器
  • incr(key, value, init, expire) 原子操作保证并发安全
  • 首次访问时初始化计数器并设置 TTL

时序图:

sequenceDiagram
    autonumber
    participant C1 as Client1
    participant C2 as Client2
    participant L as limit-count插件
    participant D as shared_dict

    Note over L,D: count=3, time_window=60s

    C1->>L: 请求1
    L->>D: incr("10.0.0.1", 1, 0, 60)
    D-->>L: current=1, remaining=2
    L->>L: 设置X-RateLimit-Remaining: 2
    L-->>C1: 200 OK

    C2->>L: 请求2(同IP)
    L->>D: incr("10.0.0.1", 1)
    D-->>L: current=2, remaining=1
    L-->>C2: 200 OK

    C1->>L: 请求3
    L->>D: incr("10.0.0.1", 1)
    D-->>L: current=3, remaining=0
    L-->>C1: 200 OK

    C2->>L: 请求4(超限)
    L->>D: get("10.0.0.1")
    D-->>L: current=3
    L->>L: current + 1 > count
    L-->>C2: 503 Service Temporarily Unavailable

    Note over D: 60秒后key自动过期

3.3 转换插件:proxy-rewrite

3.3.1 插件Schema

local schema = {
    type = "object",
    properties = {
        uri = {type = "string", minLength = 1, maxLength = 4096},
        method = {
            type = "string",
            enum = {"GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"},
        },
        regex_uri = {
            description = "regex to match URI, [pattern, replacement, options]",
            type = "array",
            minItems = 2,
            items = {type = "string"},
        },
        host = {type = "string"},
        scheme = {type = "string", enum = {"http", "https", "grpc", "grpcs"}},
        headers = {
            description = "new headers for request",
            type = "object",
        },
        use_real_request_uri_unsafe = {type = "boolean", default = false},
    },
}

字段说明:

字段 类型 说明
uri string 替换请求URI
method enum 替换请求方法
regex_uri array 正则替换URI:[pattern, replacement, options]
host string 替换Host头
scheme enum 替换scheme(http/https/grpc/grpcs)
headers object 设置/添加/删除请求头

3.3.2 URI重写逻辑

function _M.rewrite(conf, ctx)
    -- 1. 重写URI
    if conf.uri then
        ctx.var.upstream_uri = core.utils.resolve_var(conf.uri, ctx.var)
    elseif conf.regex_uri then
        local uri = ctx.var.uri
        local regex_uri = conf.regex_uri
        local newuri, _, err = ngx.re.sub(uri, regex_uri[1], regex_uri[2], regex_uri[3] or "jo")
        if not newuri then
            core.log.error("failed to substitute regex_uri: ", err)
        else
            ctx.var.upstream_uri = newuri
        end
    end

    -- 2. 重写方法
    if conf.method then
        ngx.req.set_method(ngx["HTTP_" .. conf.method])
    end

    -- 3. 重写Host
    if conf.host then
        ctx.var.upstream_host = core.utils.resolve_var(conf.host, ctx.var)
    end

    -- 4. 重写Scheme
    if conf.scheme then
        ctx.var.upstream_scheme = conf.scheme
    end

    -- 5. 重写Headers
    if conf.headers then
        for key, value in pairs(conf.headers) do
            if value == "" then
                core.request.set_header(ctx, key, nil)  -- 删除header
            else
                core.request.set_header(ctx, key, core.utils.resolve_var(value, ctx.var))
            end
        end
    end
end

URI重写示例:

-- 示例1:静态替换
{
    "uri": "/api/v2$request_uri"
}
-- /users → /api/v2/users

-- 示例2:正则替换
{
    "regex_uri": ["^/api/(.*)", "/$1"]
}
-- /api/users → /users

-- 示例3:变量替换
{
    "uri": "/service/$host/$request_uri",
    "host": "backend-$arg_version.example.com"
}
-- 请求:GET /users?version=v2 Host: api.example.com
-- 转发:GET /service/api.example.com/users Host: backend-v2.example.com

四、负载均衡机制

4.1 负载均衡总览

职责:

  • balancer_by_lua 阶段选择上游服务器
  • 支持多种负载均衡算法:roundrobin、chash、ewma、least_conn、priority
  • 集成健康检查,自动剔除不健康节点
  • 支持失败重试和优先级切换

架构图:

flowchart TB
    subgraph 上游配置
        U1[Upstream定义<br/>nodes+算法] --> U2[健康检查配置<br/>active/passive]
        U2 --> U3[服务发现集成<br/>consul/nacos等]
    end

    subgraph 负载均衡器
        B1[balancer.pick_server] --> B2[fetch_health_nodes<br/>获取健康节点]
        B2 --> B3[create_server_picker<br/>创建选择器]
        B3 --> B4{选择算法}

        B4 -->|roundrobin| RR[轮询选择器]
        B4 -->|chash| CH[一致性哈希]
        B4 -->|ewma| EW[EWMA选择器]
        B4 -->|least_conn| LC[最少连接]
        B4 -->|priority| PR[优先级选择]
    end

    subgraph 健康检查
        H1[healthcheck_manager] --> H2[active主动探测]
        H1 --> H3[passive被动检查]
        H3 --> H4[失败上报]
    end

    subgraph Nginx Balancer
        N1[set_current_peer] --> N2[设置upstream地址]
        N2 --> N3[配置keepalive]
        N3 --> N4[设置超时/重试]
    end

    U3 --> B1
    B4 --> H1
    B4 --> N1

    style B3 fill:#e1f5ff
    style H1 fill:#fff4e1

4.2 负载均衡算法详解

4.2.1 轮询算法(roundrobin)

数据结构:

local picker = roundrobin:new(up_nodes)
-- up_nodes格式:{["127.0.0.1:8080"] = 10, ["127.0.0.1:8081"] = 20}

核心逻辑:

function _M.new(up_nodes, upstream)
    local picker = roundrobin:new(up_nodes)
    local nodes_count = nkeys(up_nodes)

    return {
        upstream = upstream,

        -- 选择服务器
        get = function (ctx)
            -- 检查是否所有节点都已尝试
            if ctx.balancer_tried_servers_count == nodes_count then
                return nil, "all upstream servers tried"
            end

            -- 循环选择,跳过已尝试的节点
            local server, err
            for i = 1, safe_limit do
                server, err = picker:find()
                if not server then
                    return nil, err
                end

                if not ctx.balancer_tried_servers or
                   not ctx.balancer_tried_servers[server] then
                    break
                end
            end

            return server
        end,

        -- 重试后回调
        after_balance = function (ctx, before_retry)
            if not before_retry then
                -- 请求完成,释放资源
                core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
                ctx.balancer_tried_servers = nil
                return
            end

            -- 记录已尝试的服务器
            if not ctx.balancer_tried_servers then
                ctx.balancer_tried_servers = core.tablepool.fetch("balancer_tried_servers", 0, 2)
            end
            ctx.balancer_tried_servers[ctx.balancer_server] = true
            ctx.balancer_tried_servers_count = (ctx.balancer_tried_servers_count or 0) + 1
        end,
    }
end

算法特点:

  • 基于权重的加权轮询(WRR)
  • 权重越高,被选中概率越大
  • 支持失败重试(跳过已尝试节点)
  • 权重可以为 0(备用节点)

时序图:

sequenceDiagram
    autonumber
    participant B as Balancer
    participant RR as RoundRobin Picker
    participant N1 as Node1(weight=10)
    participant N2 as Node2(weight=20)
    participant N3 as Node3(weight=10)

    B->>RR: find()
    RR->>RR: 按权重计算概率
    RR-->>B: Node2(权重最高)

    B->>N2: 转发请求
    N2-->>B: 失败

    B->>B: after_balance(before_retry=true)
    B->>B: 记录已尝试:{Node2: true}

    B->>RR: find()
    RR-->>B: Node2
    B->>B: Node2已尝试,跳过

    B->>RR: find()
    RR-->>B: Node1
    B->>N1: 转发请求
    N1-->>B: 成功

4.2.2 一致性哈希(chash)

核心逻辑:

function _M.new(up_nodes, upstream)
    local nodes = {}
    local servers = {}

    -- 构建一致性哈希环
    for serv, weight in pairs(up_nodes) do
        local id = str_gsub(serv, ":", str_null)
        servers[id] = serv
        nodes[id] = weight / gcd  -- 归一化权重
    end

    local picker = resty_chash:new(nodes)

    return {
        upstream = upstream,

        get = function (ctx)
            local id
            if ctx.balancer_tried_servers then
                -- 重试:使用next()获取下一个节点
                id, ctx.chash_last_server_index = picker:next(ctx.chash_last_server_index)
            else
                -- 首次:根据hash key查找
                local chash_key = fetch_chash_hash_key(ctx, upstream)
                id, ctx.chash_last_server_index = picker:find(chash_key)
            end

            return servers[id]
        end,
    }
end

Hash Key生成:

local function fetch_chash_hash_key(ctx, upstream)
    local hash_on = upstream.hash_on or "vars"
    local key = upstream.key
    local chash_key

    if hash_on == "consumer" then
        chash_key = ctx.consumer_name
    elseif hash_on == "vars" then
        chash_key = ctx.var[key]  -- 例如:remote_addr
    elseif hash_on == "header" then
        chash_key = ctx.var["http_" .. key]
    elseif hash_on == "cookie" then
        chash_key = ctx.var["cookie_" .. key]
    elseif hash_on == "vars_combinations" then
        chash_key = core.utils.resolve_var(key, ctx.var)  -- 例如:"$host$uri"
    end

    if not chash_key then
        chash_key = ctx.var["remote_addr"]  -- 默认使用客户端IP
    end

    return chash_key
end

hash_on配置示例:

-- 按客户端IP哈希
{
    "type": "chash",
    "hash_on": "vars",
    "key": "remote_addr"
}

-- 按URL哈希(缓存场景)
{
    "type": "chash",
    "hash_on": "vars",
    "key": "uri"
}

-- 按Consumer哈希(用户会话亲和性)
{
    "type": "chash",
    "hash_on": "consumer"
}

-- 按组合变量哈希
{
    "type": "chash",
    "hash_on": "vars_combinations",
    "key": "$host$request_uri"
}

一致性哈希优势:

  • 节点增减时,只影响相邻节点的流量
  • 同一 hash key 始终路由到同一节点(会话亲和性)
  • 支持虚拟节点(默认 160 个),更均匀分布

4.3 健康检查集成

4.3.1 健康节点过滤

local function fetch_health_nodes(upstream, checker)
    local nodes = upstream.nodes

    if not checker then
        return nodes
    end

    local host = upstream.checks and upstream.checks.active and upstream.checks.active.host
    local port = upstream.checks and upstream.checks.active and upstream.checks.active.port
    local healthy_nodes = core.table.new(0, nkeys(nodes))

    for addr, weight in pairs(nodes) do
        local ok, err = checker:get_target_status(host or parse_domain_from_addr(addr),
                                                   port or parse_port_from_addr(addr))
        if ok then
            healthy_nodes[addr] = weight
        else
            core.log.warn("exclude unhealthy node: ", addr, " ", err)
        end
    end

    if core.table.nkeys(healthy_nodes) == 0 then
        core.log.warn("all upstream nodes are unhealthy, fallback to full nodes")
        return nodes  -- 所有节点都不健康时,使用全部节点
    end

    return healthy_nodes
end

健康检查策略:

类型 机制 配置项 适用场景
active 定时主动探测 checks.active.http_path
checks.active.interval
对延迟敏感的服务
passive 根据请求结果判断 checks.passive.unhealthy.http_failures
checks.passive.unhealthy.tcp_failures
高并发场景

健康检查配置示例:

{
    "checks": {
        "active": {
            "type": "http",
            "http_path": "/health",
            "interval": 5,
            "timeout": 3,
            "unhealthy": {
                "http_statuses": [500, 502, 503, 504],
                "http_failures": 3
            },
            "healthy": {
                "http_statuses": [200, 302],
                "successes": 2
            }
        },
        "passive": {
            "unhealthy": {
                "http_statuses": [500, 502, 503, 504],
                "http_failures": 5,
                "tcp_failures": 3
            }
        }
    }
}

4.4 失败重试机制

function _M.pick_server(route, ctx)
    local up_conf = ctx.upstream_conf
    local checker = ctx.up_checker

    -- 最多重试:retries次(Route配置)或1次(默认)
    local retries = ctx.upstream_retries or 0

    for i = 0, retries do
        -- 获取健康节点
        local nodes = fetch_health_nodes(up_conf, checker)
        if not nodes or core.table.nkeys(nodes) == 0 then
            return nil, "no healthy upstream node"
        end

        -- 创建server picker
        local picker = lrucache(up_conf, up_conf.ver, create_server_picker, up_conf, checker)

        -- 选择服务器
        local server, err = picker.get(ctx)
        if not server then
            if i < retries then
                core.log.warn("failed to pick server, retry: ", err)
                picker.after_balance(ctx, true)
                goto CONTINUE
            else
                return nil, "failed to pick server: " .. err
            end
        end

        ctx.balancer_server = server
        ctx.balancer_picker = picker
        return server

        ::CONTINUE::
    end
end

重试策略:

  1. 默认重试 1 次(retries = 0 表示 1 次请求,不额外重试)
  2. 每次重试都会重新获取健康节点
  3. 重试时跳过已尝试的节点
  4. 所有节点都失败后,返回错误

失败上报(passive健康检查):

-- 在http_log_phase执行
function _M.http_log_phase(api_ctx)
    local checker = api_ctx.up_checker
    if not checker then
        return
    end

    local status = ngx.status
    local unhealthy_status = api_ctx.upstream_conf.checks.passive.unhealthy.http_statuses

    if unhealthy_status and core.table.array_find(unhealthy_status, status) then
        -- 上报失败
        checker:report_http_status(api_ctx.balancer_ip,
                                    api_ctx.balancer_port,
                                    status)
    else
        -- 上报成功
        checker:report_http_status(api_ctx.balancer_ip,
                                    api_ctx.balancer_port,
                                    200)
    end
end

五、服务发现集成

5.1 服务发现总览

支持的服务发现:

  • Consul
  • Consul KV
  • Nacos
  • Eureka
  • Kubernetes
  • DNS
  • Tars

工作原理:

flowchart LR
    subgraph 配置层
        A[Upstream配置<br/>discovery_type=nacos<br/>service_name=userservice]
    end

    subgraph 发现层
        B[discovery.nacos] --> C[定时查询Nacos]
        C --> D[解析服务实例列表]
        D --> E[更新nodes缓存]
    end

    subgraph 使用层
        F[upstream.set_by_route] --> G[调用discovery.nodes()]
        G --> H[返回实时节点列表]
        H --> I[负载均衡选择节点]
    end

    A --> B
    E --> G

    style C fill:#e1f5ff
    style H fill:#fff4e1

5.2 Nacos服务发现实现

5.2.1 初始化与同步

function _M.init_worker()
    local local_conf = core.config.local_conf()
    local nacos_conf = local_conf.discovery.nacos

    -- 配置Nacos服务器地址
    local host = nacos_conf.host
    local port = nacos_conf.port or 8848
    local namespace_id = nacos_conf.namespace_id or "public"

    -- 创建定时器,定期拉取服务列表
    local fetch_interval = nacos_conf.fetch_interval or 30
    local timer_err
    local function fetch_all_services()
        local services_list = http_get("/nacos/v1/ns/service/list", {
            pageNo = 1,
            pageSize = 10000,
            namespaceId = namespace_id,
        })

        for _, service in ipairs(services_list.doms) do
            local instances = http_get("/nacos/v1/ns/instance/list", {
                serviceName = service,
                namespaceId = namespace_id,
            })

            -- 更新本地缓存
            update_service_nodes(service, instances.hosts)
        end
    end

    timer_err = timer.every(fetch_interval, fetch_all_services)
    if timer_err then
        core.log.error("failed to create timer: ", timer_err)
    end
end

5.2.2 节点查询

function _M.nodes(service_name)
    -- 从缓存获取节点列表
    local nodes = service_cache[service_name]
    if not nodes then
        core.log.warn("service not found in discovery: ", service_name)
        return nil
    end

    -- 转换为APISIX nodes格式
    local apisix_nodes = {}
    for _, instance in ipairs(nodes) do
        if instance.enabled and instance.healthy then
            local addr = instance.ip .. ":" .. instance.port
            apisix_nodes[addr] = instance.weight or 1
        end
    end

    return apisix_nodes
end

5.2.3 使用服务发现的Upstream配置

{
    "id": "1",
    "type": "roundrobin",
    "discovery_type": "nacos",
    "service_name": "userservice",
    "discovery_args": {
        "namespace_id": "test",
        "group_name": "DEFAULT_GROUP"
    }
}

时序图:

sequenceDiagram
    autonumber
    participant T as Timer
    participant D as Discovery(Nacos)
    participant N as Nacos Server
    participant U as Upstream Handler
    participant B as Balancer

    Note over T,N: === 初始化阶段 ===
    T->>D: 每30秒触发
    D->>N: GET /nacos/v1/ns/service/list
    N-->>D: {"doms": ["userservice", "orderservice"]}

    loop 每个服务
        D->>N: GET /nacos/v1/ns/instance/list?serviceName=userservice
        N-->>D: {"hosts": [{"ip": "10.0.1.1", "port": 8080, "weight": 100, "healthy": true}]}
        D->>D: 更新service_cache["userservice"]
    end

    Note over U,B: === 请求处理阶段 ===
    U->>D: nodes("userservice")
    D-->>U: {"10.0.1.1:8080": 100, "10.0.1.2:8080": 100}
    U->>B: 传递节点列表
    B->>B: 负载均衡选择节点

服务发现优势:

  • 无需手动配置节点,自动感知服务实例变化
  • 支持多环境(通过 namespace/group 隔离)
  • 集成注册中心的健康检查结果

六、数据流与调用链

6.1 插件完整调用链

flowchart TD
    Start[请求到达] --> Init[init.http_access_phase]
    Init --> Match[router.match]
    Match --> Filter[plugin.filter<br/>过滤插件]

    Filter --> MergeService[merge_service_route<br/>合并Service配置]
    MergeService --> MergeConsumer[merge_consumer_route<br/>合并Consumer配置]
    MergeConsumer --> GlobalRules[run_global_rules<br/>全局规则]

    GlobalRules --> Rewrite[run_plugin<br/>phase=rewrite]
    Rewrite --> Access[run_plugin<br/>phase=access]

    Access --> Upstream[handle_upstream]
    Upstream --> Balancer[balancer.pick_server]
    Balancer --> Proxy[ngx.balancer.set_current_peer]

    Proxy --> HeaderFilter[run_plugin<br/>phase=header_filter]
    HeaderFilter --> BodyFilter[run_plugin<br/>phase=body_filter]
    BodyFilter --> Log[run_plugin<br/>phase=log]

    Log --> End[响应客户端]

    style Filter fill:#e1f5ff
    style Access fill:#fff4e1
    style Balancer fill:#ffe1e1

6.2 负载均衡完整调用链

flowchart TD
    Start[init.handle_upstream] --> GetUpstream[upstream.set_by_route]
    GetUpstream --> CheckDiscovery{配置了服务发现?}

    CheckDiscovery -->|是| Discovery[discovery.nodes<br/>获取实时节点]
    CheckDiscovery -->|否| StaticNodes[使用upstream.nodes]

    Discovery --> FillNodes[upstream.fill_node_info]
    StaticNodes --> FillNodes

    FillNodes --> HealthCheck[创建health_checker]
    HealthCheck --> SetContext[设置ctx.upstream_conf]

    SetContext --> BalancerPhase[balancer_by_lua_phase]
    BalancerPhase --> PickServer[balancer.pick_server]

    PickServer --> FetchHealthy[fetch_health_nodes<br/>过滤不健康节点]
    FetchHealthy --> CreatePicker[create_server_picker<br/>创建负载均衡器]

    CreatePicker --> Algorithm{负载均衡算法}
    Algorithm -->|roundrobin| RR[roundrobin.get]
    Algorithm -->|chash| CH[chash.get]
    Algorithm -->|ewma| EW[ewma.get]

    RR --> SetPeer[balancer.set_current_peer]
    CH --> SetPeer
    EW --> SetPeer

    SetPeer --> Proxy[proxy_pass到upstream]
    Proxy --> Result{请求结果}

    Result -->|成功| ReportSuccess[passive健康检查上报成功]
    Result -->|失败| Retry{是否重试?}

    Retry -->|是| PickServer
    Retry -->|否| ReportFail[passive健康检查上报失败]

    style CreatePicker fill:#e1f5ff
    style Algorithm fill:#fff4e1

七、关键设计与权衡

7.1 插件优先级设计

优先级范围:

  • 10000+:核心功能插件(如 real-iprequest-id
  • 5000-10000:安全插件(如 ip-restrictioncors
  • 2000-5000:认证插件(如 key-authjwt-auth
  • 1000-2000:流量控制(如 limit-countlimit-req
  • 500-1000:转换插件(如 proxy-rewriteresponse-rewrite
  • 0-500:日志插件(如 http-loggerprometheus

部分核心插件优先级:

real-ip:              priority = 23000
request-id:           priority = 12015
ip-restriction:       priority = 3000
consumer-restriction: priority = 2990
key-auth:             priority = 2500
jwt-auth:             priority = 2510
limit-count:          priority = 1002
limit-req:            priority = 1001
proxy-rewrite:        priority = 1008
response-rewrite:     priority = 899
http-logger:          priority = 410
prometheus:           priority = 500

优先级权衡:

  • 优点:插件执行顺序可预测,避免配置错误
  • 缺点:开发新插件需要合理选择优先级,避免冲突
  • 最佳实践:相同类型插件优先级接近,便于分类管理

7.2 负载均衡算法选择

算法 适用场景 优点 缺点
roundrobin 通用场景 简单高效,权重分配均衡 无会话亲和性
chash 缓存/会话 会话亲和性,节点变化影响小 权重调整效果不明显
ewma 低延迟要求 根据实际响应时间动态调整 需要统计响应时间,有额外开销
least_conn 长连接 连接分配均衡 需要维护连接计数
priority 主备模式 故障自动切换 主节点成为瓶颈

选择建议:

  • 默认选择roundrobin(性能最优)
  • 缓存场景chash(按 URI 哈希)
  • 用户会话chash(按 Consumer 哈希)
  • gRPC/长连接least_connewma
  • 主备架构priority

7.3 健康检查策略

Active vs Passive:

类型 探测方式 资源消耗 检测延迟 适用场景
Active 定时发送健康检查请求 高(每节点独立探测) 低(interval配置) 对延迟敏感的服务
Passive 根据真实请求结果判断 低(无额外请求) 高(需累积失败次数) 高并发场景

推荐配置:

-- 同时启用Active和Passive,互补优势
{
    "checks": {
        "active": {
            "interval": 5,          -- 每5秒探测一次
            "http_path": "/health",
            "timeout": 3,
            "unhealthy": {
                "http_failures": 2  -- 连续2次失败标记不健康
            }
        },
        "passive": {
            "unhealthy": {
                "http_failures": 5, -- 真实请求失败5次标记不健康
                "tcp_failures": 3
            }
        }
    }
}

7.4 服务发现集成权衡

静态 Nodes vs 服务发现:

方案 优点 缺点 适用场景
静态 Nodes 配置简单,性能最优 节点变更需手动更新 节点固定的小规模部署
服务发现 自动感知节点变化 依赖外部组件,有同步延迟 云原生/微服务架构

服务发现注意事项:

  1. 同步延迟:默认 30 秒拉取一次,节点变更有延迟
  2. 故障处理:注册中心故障时,使用缓存的节点列表
  3. 权重映射:注册中心的权重需要映射到 APISIX 的权重范围

八、异常处理与性能要点

8.1 插件异常处理

Schema 校验失败:

  • 触发时机:Admin API 创建/更新配置时
  • 处理方式:返回 400 错误,附带详细校验信息
  • 恢复策略:无需恢复,拒绝非法配置

插件执行异常:

-- 插件代码错误
function _M.access(conf, ctx)
    local ok, err = pcall(function()
        -- 插件逻辑
    end)

    if not ok then
        core.log.error("plugin error: ", err)
        -- 不阻断请求,继续执行后续插件
    end
end

_meta.filter 评估失败:

local match_result, err = expr_match(meta.filter, ctx.var)
if err then
    core.log.error("failed to evaluate filter: ", err)
    -- 默认允许插件执行
    match_result = true
end

8.2 负载均衡异常处理

所有节点不健康:

if core.table.nkeys(healthy_nodes) == 0 then
    core.log.warn("all upstream nodes are unhealthy, fallback to full nodes")
    return nodes  -- 降级:使用全部节点
end

服务发现故障:

local nodes, err = discovery.nodes(service_name)
if not nodes then
    core.log.error("failed to fetch nodes from discovery: ", err)
    -- 使用上次缓存的节点列表
    nodes = last_known_nodes[service_name]
end

重试耗尽:

for i = 0, retries do
    local server, err = picker.get(ctx)
    if not server then
        if i < retries then
            core.log.warn("failed to pick server, retry: ", err)
            goto CONTINUE
        else
            return 503, {error_msg = "no available upstream server"}
        end
    end
end

8.3 性能优化要点

插件加载优化:

  • 按需加载:只加载 config.yaml 中声明的插件
  • 缓存插件对象:避免重复 require
  • 优先级预排序:启动时排序一次,避免每次请求排序

插件过滤优化:

-- 使用tablepool减少GC
local plugins = core.tablepool.fetch("plugins", 32, 0)
-- ... 使用plugins
core.tablepool.release("plugins", plugins)

负载均衡优化:

-- 缓存server picker,避免重复创建
local picker = lrucache(up_conf, up_conf.ver, create_server_picker, up_conf, checker)

-- 预计算安全限制,避免无限循环
local safe_limit = 0
for _, weight in pairs(up_nodes) do
    safe_limit = safe_limit + weight + 1
end

健康检查优化:

  • 共享健康检查结果:同一 upstream 的多个 route 共享 checker
  • 被动检查优先:减少主动探测开销
  • 合理设置 interval:避免频繁探测

九、配置示例与最佳实践

9.1 插件配置最佳实践

9.1.1 使用 _meta.filter 条件执行

{
    "uri": "/api/*",
    "plugins": {
        "limit-count": {
            "count": 100,
            "time_window": 60,
            "_meta": {
                "filter": [
                    ["arg_debug", "!=", "true"]
                ]
            }
        }
    }
}

说明:仅对非调试请求应用限流,调试请求不限流。

9.1.2 插件配置继承

// Service配置
{
    "id": "1",
    "plugins": {
        "key-auth": {},
        "limit-count": {"count": 1000, "time_window": 60}
    }
}

// Route配置
{
    "id": "1",
    "service_id": "1",
    "uri": "/vip/*",
    "plugins": {
        "limit-count": {"count": 5000}  // 覆盖Service的count
    }
}

说明:Route 的 VIP 路径提高限流阈值,继承 Service 的 time_window

9.2 负载均衡配置最佳实践

9.2.1 带健康检查的 Upstream

{
    "type": "roundrobin",
    "nodes": {
        "10.0.1.1:8080": 100,
        "10.0.1.2:8080": 100,
        "10.0.1.3:8080": 50
    },
    "checks": {
        "active": {
            "type": "http",
            "http_path": "/health",
            "interval": 5,
            "timeout": 3,
            "unhealthy": {
                "http_statuses": [500, 502, 503, 504],
                "http_failures": 3
            },
            "healthy": {
                "successes": 2
            }
        },
        "passive": {
            "unhealthy": {
                "http_statuses": [500, 502, 503, 504],
                "http_failures": 5
            }
        }
    },
    "retries": 2,
    "timeout": {
        "connect": 5,
        "send": 5,
        "read": 10
    }
}

9.2.2 一致性哈希实现缓存亲和性

{
    "type": "chash",
    "hash_on": "vars",
    "key": "request_uri",
    "nodes": {
        "cache1:8080": 100,
        "cache2:8080": 100,
        "cache3:8080": 100
    }
}

说明:按 URI 哈希,同一 URI 始终路由到同一缓存节点,提高缓存命中率。

9.2.3 服务发现集成

{
    "type": "roundrobin",
    "discovery_type": "nacos",
    "service_name": "userservice",
    "discovery_args": {
        "namespace_id": "production",
        "group_name": "DEFAULT_GROUP"
    }
}

9.3 多插件组合场景

9.3.1 认证 + 限流 + 转换

{
    "uri": "/api/users",
    "plugins": {
        "key-auth": {
            "header": "X-API-Key"
        },
        "limit-count": {
            "count": "$consumer_name == 'vip' and 10000 or 1000",
            "time_window": 60,
            "key": "consumer_name",
            "key_type": "var"
        },
        "proxy-rewrite": {
            "uri": "/v2/users",
            "headers": {
                "X-Consumer": "$consumer_name"
            }
        }
    }
}

说明

  1. 先执行 key-auth 认证(priority 2500)
  2. 根据 Consumer 身份限流(priority 1002)
  3. 重写 URI 和 Header(priority 1008)

十、总结

本文档详细剖析了 APISIX 的插件系统和负载均衡机制:

插件系统核心特点:

  1. 统一的生命周期管理(加载、过滤、执行)
  2. 灵活的配置合并和条件过滤
  3. 丰富的插件类型(认证、流控、转换、可观测、安全)
  4. 支持热更新和动态加载

负载均衡核心特点:

  1. 多种算法支持(roundrobin、chash、ewma、least_conn、priority)
  2. 健康检查集成(active/passive)
  3. 失败重试和优先级切换
  4. 服务发现无缝集成

关键设计权衡:

  • 插件优先级设计保证执行顺序可预测
  • 负载均衡算法根据场景选择
  • 健康检查策略兼顾准确性和性能
  • 服务发现提供动态节点管理能力

性能优化要点:

  • 插件按需加载和缓存
  • 使用 tablepool 减少 GC
  • 负载均衡器缓存和预计算
  • 健康检查结果共享

通过深入理解这些机制,可以高效配置和扩展 APISIX,满足复杂的业务需求。