Nginx-02-Event
模块概览
模块职责
Event 模块是 Nginx 的事件驱动核心,负责 I/O 多路复用和定时器管理,提供以下核心能力:
- I/O 多路复用: 封装 epoll、kqueue、select、poll 等事件通知机制
- 定时器管理: 基于红黑树的高效定时器实现
- 连接事件管理: 管理连接的读写事件
- Accept 互斥锁: 避免惊群问题的负载均衡机制
- Posted 事件队列: 延迟事件处理,优化事件处理顺序
输入/输出
输入:
- 文件描述符(套接字)
- 事件类型(读/写/定时器)
- 超时时间
- 事件回调函数
输出:
- 就绪事件通知
- 定时器触发通知
- 事件处理结果
上下游依赖
上游(调用 Event):
- Core 模块(主循环)
- HTTP 模块(注册连接事件)
- Stream 模块
- Mail 模块
下游(Event 调用):
- 操作系统(epoll_wait、kevent、select)
- Core 模块(连接管理、定时器红黑树)
模块级架构图
flowchart TB
subgraph Core["Core 层"]
MainLoop[主循环<br/>ngx_process_events_and_timers]
end
subgraph EventCore["Event 核心"]
EventModule[事件模块<br/>ngx_events_module]
EventConf[事件配置<br/>worker_connections等]
Actions[事件操作接口<br/>ngx_event_actions_t]
end
subgraph EventImpl["事件实现"]
Epoll[epoll模块<br/>Linux]
Kqueue[kqueue模块<br/>BSD]
Select[select模块<br/>通用]
Poll[poll模块<br/>通用]
end
subgraph EventTypes["事件类型"]
ReadEvent[读事件<br/>ngx_event_t]
WriteEvent[写事件<br/>ngx_event_t]
Timer[定时器事件<br/>红黑树]
end
subgraph EventQueue["事件队列"]
PostedAccept[Accept队列<br/>ngx_posted_accept_events]
PostedNext[延迟队列<br/>ngx_posted_next_events]
Posted[普通队列<br/>ngx_posted_events]
end
subgraph Accept["Accept 处理"]
AcceptMutex[Accept互斥锁<br/>ngx_accept_mutex]
AcceptHandler[接受连接<br/>ngx_event_accept]
end
subgraph Protocol["协议层"]
HTTP[HTTP处理器]
Stream[Stream处理器]
Mail[Mail处理器]
end
MainLoop -->|调用| EventCore
EventCore -->|配置| Actions
Actions -.->|实现| EventImpl
Epoll -->|通知就绪| EventTypes
Kqueue -->|通知就绪| EventTypes
EventTypes -->|延迟处理| EventQueue
ReadEvent -->|监听套接字可读| AcceptHandler
AcceptHandler -->|竞争| AcceptMutex
AcceptHandler -->|接受连接| Protocol
MainLoop -->|处理| EventQueue
EventQueue -->|回调| Protocol
Timer -->|超时触发| Protocol
图解说明
1. 事件驱动流程:
- 主循环调用
ngx_process_events_and_timers() - 根据配置选择事件实现(epoll/kqueue/select)
- 等待 I/O 事件或定时器超时
- 处理就绪事件和过期定时器
2. 事件模型抽象:
ngx_event_actions_t定义统一接口- 不同平台实现各自的 add/del/process 函数
- 编译时或运行时选择最优实现
3. 事件处理优化:
- Accept 事件优先处理(posted_accept_events)
- 延迟事件允许在当前事件循环中多次触发(posted_next_events)
- 普通事件在下一轮循环处理(posted_events)
4. Accept 负载均衡:
accept_mutex互斥锁控制 Worker 竞争- 只有获得锁的 Worker 才能 accept 新连接
- 避免惊群问题,实现负载均衡
核心算法与流程
算法 1:事件循环主函数
目的: 统一的事件处理循环,支持多种事件模型
核心代码 (src/event/ngx_event.c):
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t flags;
ngx_msec_t timer, delta;
// 获取最近的定时器超时时间
timer = ngx_event_find_timer();
// 如果启用了accept_mutex
if (ngx_use_accept_mutex) {
// 检查是否需要延迟accept(负载均衡)
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--;
} else {
// 尝试获取accept互斥锁
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
flags |= NGX_POST_EVENTS; // 延迟处理accept事件
} else {
// 未获得锁,延长等待时间
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
timer = ngx_accept_mutex_delay;
}
}
}
}
// 调用事件模型的process函数(如epoll_wait)
// 这里会阻塞等待事件或超时
(void) ngx_process_events(cycle, timer, flags);
// 处理accept事件队列(优先)
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
// 释放accept互斥锁
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
// 处理定时器事件
if (delta) {
ngx_event_expire_timers();
}
// 处理普通posted事件
ngx_event_process_posted(cycle, &ngx_posted_events);
}
流程说明:
-
计算等待超时时间:
- 查找最近的定时器超时时间
- 如果没有定时器,等待时间为无限大
- Accept mutex 未获得时,限制最大等待时间为
accept_mutex_delay(默认 500ms)
-
Accept 互斥锁处理:
ngx_accept_disabled> 0 表示当前 Worker 连接数过多,主动放弃竞争- 尝试获取锁,成功则设置 POST_EVENTS 标志,延迟处理 accept 事件
- 失败则缩短超时时间,下次循环再试
-
等待事件:
- 调用底层事件模型的
process函数(epoll_wait/kevent) - 阻塞等待 I/O 事件或超时
- 返回就绪事件数量
- 调用底层事件模型的
-
处理事件队列:
- 优先处理 accept 事件(建立新连接)
- 释放 accept 互斥锁
- 处理过期定时器
- 处理普通 posted 事件(读写事件)
算法 2:定时器管理(红黑树)
目的: 高效管理大量定时器,快速查找最近超时
数据结构:
typedef struct {
ngx_rbtree_t rbtree; // 红黑树根节点
ngx_rbtree_node_t sentinel; // 哨兵节点
} ngx_event_timer_t;
extern ngx_event_timer_t ngx_event_timer_rbtree;
核心操作:
添加定时器:
void
ngx_event_add_timer(ngx_event_t *ev, ngx_msec_t timer)
{
ngx_msec_t key;
ngx_msec_int_t diff;
// 计算绝对超时时间
key = ngx_current_msec + timer;
if (ev->timer_set) {
// 已经在定时器树中,计算时间差
diff = (ngx_msec_int_t) (key - ev->timer.key);
if (diff == 0) {
return; // 超时时间未变
}
// 先删除旧定时器
ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer);
ev->timer_set = 0;
}
ev->timer.key = key;
// 插入红黑树
ngx_rbtree_insert(&ngx_event_timer_rbtree, &ev->timer);
ev->timer_set = 1;
}
删除定时器:
static ngx_inline void
ngx_event_del_timer(ngx_event_t *ev)
{
if (ev->timer_set) {
ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer);
ev->timer_set = 0;
}
}
查找最近超时时间:
ngx_msec_t
ngx_event_find_timer(void)
{
ngx_msec_int_t timer;
ngx_rbtree_node_t *node, *root, *sentinel;
// 红黑树为空
if (ngx_event_timer_rbtree.root == &ngx_event_timer_rbtree.sentinel) {
return NGX_TIMER_INFINITE;
}
root = ngx_event_timer_rbtree.root;
sentinel = ngx_event_timer_rbtree.sentinel;
// 找到最左节点(最小key)
node = ngx_rbtree_min(root, sentinel);
timer = (ngx_msec_int_t) (node->key - ngx_current_msec);
return (ngx_msec_t) (timer > 0 ? timer : 0);
}
处理过期定时器:
void
ngx_event_expire_timers(void)
{
ngx_event_t *ev;
ngx_rbtree_node_t *node, *root, *sentinel;
sentinel = ngx_event_timer_rbtree.sentinel;
for ( ;; ) {
root = ngx_event_timer_rbtree.root;
if (root == sentinel) {
return; // 定时器树为空
}
// 获取最小key的节点
node = ngx_rbtree_min(root, sentinel);
if ((ngx_msec_int_t) (node->key - ngx_current_msec) > 0) {
return; // 最小的定时器都未超时,退出
}
// 获取事件对象
ev = ngx_rbtree_data(node, ngx_event_t, timer);
// 从树中删除
ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer);
ev->timer_set = 0;
// 标记超时
ev->timedout = 1;
// 调用事件处理函数
ev->handler(ev);
}
}
时间复杂度:
- 插入:O(log n)
- 删除:O(log n)
- 查找最近超时:O(1)(红黑树最左节点)
- 处理过期定时器:O(k log n),k 为过期定时器数量
算法 3:Epoll 事件模型实现
目的: Linux 平台上的高效事件通知机制
初始化 (src/event/modules/ngx_epoll_module.c):
static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
ngx_event_conf_t *ecf;
ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module);
if (ep == -1) {
// 创建epoll实例
ep = epoll_create(cycle->connection_n / 2);
if (ep == -1) {
return NGX_ERROR;
}
}
// 分配事件列表
if (nevents < ecf->events) {
if (event_list) {
ngx_free(event_list);
}
event_list = ngx_alloc(sizeof(struct epoll_event) * ecf->events,
cycle->log);
if (event_list == NULL) {
return NGX_ERROR;
}
}
nevents = ecf->events;
// 设置I/O操作函数
ngx_io = ngx_os_io;
// 设置事件操作接口
ngx_event_actions = ngx_epoll_module_ctx.actions;
ngx_event_flags = NGX_USE_CLEAR_EVENT // EPOLLET
|NGX_USE_GREEDY_EVENT // 尽可能多地读写
|NGX_USE_EPOLL_EVENT; // epoll特有标志
return NGX_OK;
}
添加事件:
static ngx_int_t
ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
int op;
uint32_t events, prev;
ngx_event_t *e;
ngx_connection_t *c;
struct epoll_event ee;
c = ev->data;
events = (uint32_t) event;
if (event == NGX_READ_EVENT) {
e = c->write;
prev = EPOLLOUT;
events = EPOLLIN|EPOLLRDHUP;
} else {
e = c->read;
prev = EPOLLIN|EPOLLRDHUP;
events = EPOLLOUT;
}
// 边缘触发
if (flags & NGX_USE_CLEAR_EVENT) {
events |= EPOLLET;
}
// 判断是添加还是修改
if (e->active) {
op = EPOLL_CTL_MOD;
events |= prev;
} else {
op = EPOLL_CTL_ADD;
}
ee.events = events;
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
return NGX_ERROR;
}
ev->active = 1;
return NGX_OK;
}
处理事件:
static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
int events;
uint32_t revents;
ngx_int_t instance, i;
ngx_uint_t level;
ngx_err_t err;
ngx_event_t *rev, *wev;
ngx_queue_t *queue;
ngx_connection_t *c;
// 等待事件
events = epoll_wait(ep, event_list, (int) nevents, timer);
err = (events == -1) ? ngx_errno : 0;
// 更新时间缓存
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
ngx_time_update();
}
if (err) {
if (err == NGX_EINTR) {
return NGX_OK; // 被信号中断,正常
}
return NGX_ERROR;
}
if (events == 0) {
if (timer != NGX_TIMER_INFINITE) {
return NGX_OK; // 超时,正常
}
return NGX_ERROR;
}
// 处理就绪事件
for (i = 0; i < events; i++) {
c = event_list[i].data.ptr;
instance = (uintptr_t) c & 1;
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
rev = c->read;
// 检查instance,防止过期事件
if (c->fd == -1 || rev->instance != instance) {
continue;
}
revents = event_list[i].events;
// 读事件就绪
if ((revents & EPOLLIN) && rev->active) {
rev->ready = 1;
rev->available = -1;
if (flags & NGX_POST_EVENTS) {
// 延迟处理
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(rev, queue);
} else {
// 立即处理
rev->handler(rev);
}
}
wev = c->write;
// 写事件就绪
if ((revents & EPOLLOUT) && wev->active) {
wev->ready = 1;
if (flags & NGX_POST_EVENTS) {
ngx_post_event(wev, &ngx_posted_events);
} else {
wev->handler(wev);
}
}
}
return NGX_OK;
}
配置与可观测
核心配置项
| 配置项 | 上下文 | 默认值 | 说明 |
|---|---|---|---|
| worker_connections | events | 512 | 每个Worker最大连接数 |
| use | events | 自动 | 事件模型选择(epoll/kqueue/select) |
| multi_accept | events | off | 是否一次accept多个连接 |
| accept_mutex | events | off | 是否启用accept互斥锁 |
| accept_mutex_delay | events | 500ms | 获取锁失败后重试延迟 |
性能指标
事件处理性能:
- epoll_wait 平均耗时
- 单次循环处理的事件数
- 定时器触发频率
连接负载:
- 活跃连接数 / worker_connections
- 空闲连接数
- Accept 队列长度
相关文档:
API接口
API 总览
Event 模块提供的 API 主要分为以下几类:
- 事件核心 API - 事件创建、添加、删除
- 定时器 API - 定时器添加、删除、查找
- 事件循环 API - 主循环、事件处理
- Accept API - 连接接受处理
- 事件模型 API - epoll/kqueue/select 实现接口
1. 事件核心 API
1.1 ngx_add_event
功能说明: 将事件添加到事件模型中(如 epoll)
函数签名:
ngx_int_t ngx_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
参数:
| 参数 | 类型 | 方向 | 说明 |
|---|---|---|---|
| ev | ngx_event_t * | 输入 | 事件对象指针 |
| event | ngx_int_t | 输入 | 事件类型(NGX_READ_EVENT/NGX_WRITE_EVENT) |
| flags | ngx_uint_t | 输入 | 标志位(NGX_CLEAR_EVENT 表示边缘触发) |
返回值:
| 类型 | 说明 |
|---|---|
| ngx_int_t | 成功返回 NGX_OK,失败返回 NGX_ERROR |
核心代码 (src/event/modules/ngx_epoll_module.c):
static ngx_int_t
ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
int op;
uint32_t events, prev;
ngx_event_t *e;
ngx_connection_t *c;
struct epoll_event ee;
c = ev->data; // 获取关联的连接对象
events = (uint32_t) event;
if (event == NGX_READ_EVENT) {
e = c->write;
prev = EPOLLOUT;
events = EPOLLIN|EPOLLRDHUP;
} else {
e = c->read;
prev = EPOLLIN|EPOLLRDHUP;
events = EPOLLOUT;
}
// 边缘触发模式
if (flags & NGX_CLEAR_EVENT) {
events |= EPOLLET;
}
// 判断是添加还是修改
if (e->active) {
op = EPOLL_CTL_MOD; // 另一个事件已存在,修改
events |= prev;
} else {
op = EPOLL_CTL_ADD; // 新添加
}
ee.events = events;
// 保存连接指针和instance标志
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
return NGX_ERROR;
}
ev->active = 1;
return NGX_OK;
}
调用链:
- 用户代码 →
ngx_add_event() ngx_add_event()→ngx_event_actions.add()(函数指针)ngx_event_actions.add()→ngx_epoll_add_event()(epoll 实现)
时序图:
sequenceDiagram
autonumber
participant User as 用户代码
participant Event as ngx_event_t
participant Actions as ngx_event_actions
participant Epoll as epoll模块
participant Kernel as 内核
User->>Event: ngx_add_event(ev, NGX_READ_EVENT, flags)
Event->>Actions: actions.add(ev, event, flags)
Actions->>Epoll: ngx_epoll_add_event()
Epoll->>Epoll: 构造 epoll_event 结构
Epoll->>Epoll: ee.events = EPOLLIN|EPOLLET
Epoll->>Epoll: ee.data.ptr = connection | instance
Epoll->>Kernel: epoll_ctl(ep, EPOLL_CTL_ADD, fd, &ee)
Kernel-->>Epoll: 返回0(成功)
Epoll->>Event: ev->active = 1
Epoll-->>User: 返回 NGX_OK
1.2 ngx_del_event
功能说明: 从事件模型中删除事件
函数签名:
ngx_int_t ngx_del_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
核心代码:
static ngx_int_t
ngx_epoll_del_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
int op;
uint32_t prev;
ngx_event_t *e;
ngx_connection_t *c;
struct epoll_event ee;
if (flags & NGX_CLOSE_EVENT) {
ev->active = 0;
return NGX_OK; // 关闭连接时,fd 会自动从 epoll 移除
}
c = ev->data;
if (event == NGX_READ_EVENT) {
e = c->write;
prev = EPOLLOUT;
} else {
e = c->read;
prev = EPOLLIN|EPOLLRDHUP;
}
if (e->active) {
// 另一个事件仍在,修改而非删除
op = EPOLL_CTL_MOD;
ee.events = prev;
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);
} else {
// 删除
op = EPOLL_CTL_DEL;
ee.events = 0;
ee.data.ptr = NULL;
}
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
return NGX_ERROR;
}
ev->active = 0;
return NGX_OK;
}
1.3 ngx_process_events
功能说明: 处理就绪事件(调用底层事件模型的 wait 函数)
函数签名:
ngx_int_t ngx_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags);
参数:
| 参数 | 类型 | 方向 | 说明 |
|---|---|---|---|
| cycle | ngx_cycle_t * | 输入 | 全局状态对象 |
| timer | ngx_msec_t | 输入 | 超时时间(毫秒),NGX_TIMER_INFINITE 表示无限等待 |
| flags | ngx_uint_t | 输入 | 标志位(NGX_POST_EVENTS 表示延迟处理事件) |
核心代码 (src/event/modules/ngx_epoll_module.c):
static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
int events;
uint32_t revents;
ngx_int_t instance, i;
ngx_event_t *rev, *wev;
ngx_connection_t *c;
// 等待事件(阻塞或超时)
events = epoll_wait(ep, event_list, (int) nevents, timer);
err = (events == -1) ? ngx_errno : 0;
// 更新时间缓存
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
ngx_time_update();
}
if (err) {
if (err == NGX_EINTR) {
return NGX_OK; // 被信号中断
}
return NGX_ERROR;
}
if (events == 0) {
return NGX_OK; // 超时
}
// 遍历就绪事件
for (i = 0; i < events; i++) {
c = event_list[i].data.ptr;
instance = (uintptr_t) c & 1;
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
rev = c->read;
// 检查 instance,防止过期事件
if (c->fd == -1 || rev->instance != instance) {
continue;
}
revents = event_list[i].events;
// 处理读事件
if ((revents & EPOLLIN) && rev->active) {
rev->ready = 1;
if (flags & NGX_POST_EVENTS) {
// 延迟处理
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(rev, queue);
} else {
// 立即调用处理函数
rev->handler(rev);
}
}
wev = c->write;
// 处理写事件
if ((revents & EPOLLOUT) && wev->active) {
wev->ready = 1;
if (flags & NGX_POST_EVENTS) {
ngx_post_event(wev, &ngx_posted_events);
} else {
wev->handler(wev);
}
}
}
return NGX_OK;
}
2. 定时器 API
2.1 ngx_event_add_timer
功能说明: 添加定时器事件到红黑树
函数签名:
void ngx_event_add_timer(ngx_event_t *ev, ngx_msec_t timer);
参数:
| 参数 | 类型 | 方向 | 说明 |
|---|---|---|---|
| ev | ngx_event_t * | 输入 | 事件对象 |
| timer | ngx_msec_t | 输入 | 相对超时时间(毫秒) |
核心代码 (src/event/ngx_event_timer.c):
void
ngx_event_add_timer(ngx_event_t *ev, ngx_msec_t timer)
{
ngx_msec_t key;
ngx_msec_int_t diff;
// 计算绝对超时时间
key = ngx_current_msec + timer;
if (ev->timer_set) {
// 已经在定时器树中
diff = (ngx_msec_int_t) (key - ev->timer.key);
if (diff == 0) {
return; // 超时时间未变
}
// 先删除旧定时器
ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer);
ev->timer_set = 0;
}
ev->timer.key = key;
// 插入红黑树
ngx_rbtree_insert(&ngx_event_timer_rbtree, &ev->timer);
ev->timer_set = 1;
}
使用示例:
// 设置10秒超时
ngx_event_t *rev = c->read;
ngx_add_timer(rev, 10000);
// 事件处理函数中检查超时
void ngx_http_process_request(ngx_event_t *rev)
{
ngx_connection_t *c = rev->data;
ngx_http_request_t *r = c->data;
if (rev->timedout) {
// 超时处理
ngx_http_finalize_request(r, NGX_HTTP_REQUEST_TIME_OUT);
return;
}
// 正常处理
// ...
}
2.2 ngx_event_del_timer
功能说明: 从红黑树删除定时器
函数签名:
static inline void ngx_event_del_timer(ngx_event_t *ev);
核心代码:
static ngx_inline void
ngx_event_del_timer(ngx_event_t *ev)
{
if (ev->timer_set) {
ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer);
ev->timer_set = 0;
}
}
2.3 ngx_event_find_timer
功能说明: 查找最近的超时时间
函数签名:
ngx_msec_t ngx_event_find_timer(void);
返回值:
| 类型 | 说明 |
|---|---|
| ngx_msec_t | 最近的超时时间(毫秒),如果没有定时器返回 NGX_TIMER_INFINITE |
核心代码:
ngx_msec_t
ngx_event_find_timer(void)
{
ngx_msec_int_t timer;
ngx_rbtree_node_t *node, *root, *sentinel;
if (ngx_event_timer_rbtree.root == &ngx_event_timer_rbtree.sentinel) {
return NGX_TIMER_INFINITE; // 红黑树为空
}
root = ngx_event_timer_rbtree.root;
sentinel = ngx_event_timer_rbtree.sentinel;
// 找到最小key(最左节点)
node = ngx_rbtree_min(root, sentinel);
timer = (ngx_msec_int_t) (node->key - ngx_current_msec);
return (ngx_msec_t) (timer > 0 ? timer : 0);
}
2.4 ngx_event_expire_timers
功能说明: 处理所有过期的定时器
函数签名:
void ngx_event_expire_timers(void);
核心代码:
void
ngx_event_expire_timers(void)
{
ngx_event_t *ev;
ngx_rbtree_node_t *node, *root, *sentinel;
sentinel = ngx_event_timer_rbtree.sentinel;
for ( ;; ) {
root = ngx_event_timer_rbtree.root;
if (root == sentinel) {
return;
}
node = ngx_rbtree_min(root, sentinel);
// 检查是否超时
if ((ngx_msec_int_t) (node->key - ngx_current_msec) > 0) {
return; // 最小的都未超时,退出
}
ev = ngx_rbtree_data(node, ngx_event_t, timer);
// 从树中删除
ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer);
ev->timer_set = 0;
// 标记超时
ev->timedout = 1;
// 调用事件处理函数
ev->handler(ev);
}
}
3. 事件循环 API
3.1 ngx_process_events_and_timers
功能说明: 事件循环主函数,处理 I/O 事件和定时器
函数签名:
void ngx_process_events_and_timers(ngx_cycle_t *cycle);
核心代码 (src/event/ngx_event.c):
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t flags;
ngx_msec_t timer, delta;
timer = ngx_event_find_timer();
if (ngx_use_accept_mutex) {
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--;
} else {
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
flags |= NGX_POST_EVENTS;
} else {
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
timer = ngx_accept_mutex_delay;
}
}
}
}
delta = ngx_current_msec;
// 调用底层事件模型的 process 函数
(void) ngx_process_events(cycle, timer, flags);
delta = ngx_current_msec - delta;
// 处理 accept 事件队列
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
// 释放 accept 互斥锁
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
// 处理定时器
if (delta) {
ngx_event_expire_timers();
}
// 处理普通事件队列
ngx_event_process_posted(cycle, &ngx_posted_events);
}
3.2 ngx_event_process_posted
功能说明: 处理 posted 事件队列
函数签名:
void ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted);
核心代码:
void
ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted)
{
ngx_queue_t *q;
ngx_event_t *ev;
while (!ngx_queue_empty(posted)) {
q = ngx_queue_head(posted);
ev = ngx_queue_data(q, ngx_event_t, queue);
ngx_queue_remove(q);
ev->posted = 0;
// 调用事件处理函数
ev->handler(ev);
}
}
4. Accept API
4.1 ngx_event_accept
功能说明: 接受新连接
函数签名:
void ngx_event_accept(ngx_event_t *ev);
核心代码 (src/event/ngx_event_accept.c):
void
ngx_event_accept(ngx_event_t *ev)
{
socklen_t socklen;
ngx_err_t err;
ngx_log_t *log;
ngx_uint_t level;
ngx_socket_t s;
ngx_event_t *rev, *wev;
ngx_listening_t *ls;
ngx_connection_t *c, *lc;
struct sockaddr *sa;
u_char sa_buf[NGX_SOCKADDRLEN];
if (ev->timedout) {
// accept 事件不应该超时
return;
}
lc = ev->data;
ls = lc->listening;
ev->ready = 0;
do {
socklen = NGX_SOCKADDRLEN;
// accept 新连接
s = accept4(lc->fd, (struct sockaddr *) sa_buf, &socklen,
SOCK_NONBLOCK);
if (s == (ngx_socket_t) -1) {
err = ngx_socket_errno;
if (err == NGX_EAGAIN) {
return; // 没有新连接了
}
if (err == NGX_ECONNABORTED) {
continue; // 客户端中止,继续 accept
}
return;
}
// 获取连接对象
c = ngx_get_connection(s, ev->log);
if (c == NULL) {
if (ngx_close_socket(s) == -1) {
// 日志记录
}
return;
}
// 创建连接内存池
c->pool = ngx_create_pool(ls->pool_size, ev->log);
if (c->pool == NULL) {
ngx_close_accepted_connection(c);
return;
}
// 保存客户端地址
c->sockaddr = ngx_palloc(c->pool, socklen);
if (c->sockaddr == NULL) {
ngx_close_accepted_connection(c);
return;
}
ngx_memcpy(c->sockaddr, sa_buf, socklen);
// 设置日志对象
log = ngx_palloc(c->pool, sizeof(ngx_log_t));
if (log == NULL) {
ngx_close_accepted_connection(c);
return;
}
*log = ls->log;
c->log = log;
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
c->listening = ls;
c->local_sockaddr = ls->sockaddr;
c->local_socklen = ls->socklen;
// 设置套接字选项
if (ngx_nonblocking(s) == -1) {
ngx_close_accepted_connection(c);
return;
}
if (ls->tcp_nodelay && ngx_tcp_nodelay(c) != NGX_OK) {
ngx_close_accepted_connection(c);
return;
}
rev = c->read;
wev = c->write;
wev->ready = 1;
// 调用监听对象的 handler
ls->handler(c);
} while (ev->available); // multi_accept
}
5. Posted 事件队列 API
5.1 ngx_post_event
功能说明: 将事件添加到 posted 队列
函数签名:
void ngx_post_event(ngx_event_t *ev, ngx_queue_t *queue);
核心代码:
static ngx_inline void
ngx_post_event(ngx_event_t *ev, ngx_queue_t *queue)
{
if (!ev->posted) {
ev->posted = 1;
ngx_queue_insert_tail(queue, &ev->queue);
}
}
使用场景:
// 延迟处理 accept 事件
if (flags & NGX_POST_EVENTS) {
if (ev->accept) {
ngx_post_event(ev, &ngx_posted_accept_events);
} else {
ngx_post_event(ev, &ngx_posted_events);
}
}
6. Accept 互斥锁 API
6.1 ngx_trylock_accept_mutex
功能说明: 尝试获取 accept 互斥锁
函数签名:
ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle);
核心代码:
ngx_int_t
ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
if (ngx_shmtx_trylock(&ngx_accept_mutex)) {
if (ngx_accept_mutex_held && ngx_accept_events == 0) {
return NGX_OK;
}
// 获得锁后,启用监听套接字的事件
if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
ngx_shmtx_unlock(&ngx_accept_mutex);
return NGX_ERROR;
}
ngx_accept_events = 0;
ngx_accept_mutex_held = 1;
return NGX_OK;
}
// 未获得锁,禁用监听套接字的事件
if (ngx_accept_mutex_held) {
if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {
return NGX_ERROR;
}
ngx_accept_mutex_held = 0;
}
return NGX_OK;
}
相关文档:
数据结构
数据结构总览
Event 模块的核心数据结构包括:
- 事件对象: ngx_event_t
- 事件操作接口: ngx_event_actions_t
- 事件配置: ngx_event_conf_t
- 定时器树: ngx_event_timer_rbtree
- Posted 事件队列: ngx_queue_t
UML 类图
classDiagram
class ngx_event_t {
+void* data
+unsigned write:1
+unsigned accept:1
+unsigned instance:1
+unsigned active:1
+unsigned disabled:1
+unsigned ready:1
+unsigned oneshot:1
+unsigned complete:1
+unsigned eof:1
+unsigned error:1
+unsigned timedout:1
+unsigned timer_set:1
+unsigned posted:1
+int available
+ngx_event_handler_pt handler
+ngx_uint_t index
+ngx_log_t* log
+ngx_rbtree_node_t timer
+ngx_queue_t queue
}
class ngx_event_actions_t {
+ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
+ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
+ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
+ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
+ngx_int_t (*add_conn)(ngx_connection_t *c)
+ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags)
+ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
+ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer)
+void (*done)(ngx_cycle_t *cycle)
}
class ngx_event_conf_t {
+ngx_uint_t connections
+ngx_uint_t use
+ngx_flag_t multi_accept
+ngx_flag_t accept_mutex
+ngx_msec_t accept_mutex_delay
+u_char* name
}
class ngx_epoll_conf_t {
+ngx_uint_t events
+ngx_uint_t aio_requests
}
class ngx_connection_t {
+void* data
+ngx_event_t* read
+ngx_event_t* write
+ngx_socket_t fd
}
class ngx_rbtree_node_t {
+ngx_rbtree_key_t key
+ngx_rbtree_node_t* left
+ngx_rbtree_node_t* right
+ngx_rbtree_node_t* parent
+u_char color
}
class ngx_queue_t {
+ngx_queue_t* prev
+ngx_queue_t* next
}
ngx_event_t "1" --> "1" ngx_connection_t : data
ngx_event_t "1" --> "1" ngx_rbtree_node_t : timer
ngx_event_t "1" --> "1" ngx_queue_t : queue
ngx_connection_t "1" --> "2" ngx_event_t : read/write
ngx_event_actions_t ..> ngx_event_t : 操作
ngx_event_conf_t ..> ngx_event_actions_t : 配置
1. 事件对象 ngx_event_t
1.1 结构体定义
定义位置: src/event/ngx_event.h
struct ngx_event_s {
void *data; // 关联的数据指针(通常是 ngx_connection_t)
unsigned write:1; // 写事件标志
unsigned accept:1; // accept 事件标志
/* 用于检测过期事件(kqueue/epoll 中)*/
unsigned instance:1;
/*
* 事件已传递或将传递给内核;
* 在 aio 模式中 - 操作已提交
*/
unsigned active:1;
unsigned disabled:1; // 禁用标志
/* 就绪事件;aio 模式中 0 表示无法提交操作 */
unsigned ready:1;
unsigned oneshot:1; // 一次性事件
/* aio 操作完成 */
unsigned complete:1;
unsigned eof:1; // 连接关闭
unsigned error:1; // 错误标志
unsigned timedout:1; // 超时标志
unsigned timer_set:1; // 定时器已设置
unsigned delayed:1; // 延迟处理
unsigned deferred_accept:1; // 延迟 accept
/* kqueue/epoll 中的待处理 eof 或 aio 链操作中 */
unsigned pending_eof:1;
unsigned posted:1; // 事件在 posted 队列中
unsigned closed:1; // 已关闭标志
/* 用于 worker 退出时的测试 */
unsigned channel:1;
unsigned resolver:1;
unsigned cancelable:1;
#if (NGX_HAVE_KQUEUE)
unsigned kq_vnode:1;
int kq_errno; // kqueue 报告的待处理 errno
#endif
/*
* kqueue only:
* accept: 等待接受的套接字数量
* read: 当事件就绪时要读取的字节数
* 或使用 NGX_LOWAT_EVENT 设置时的 lowat
* write: 当事件就绪时缓冲区中的可用空间
* 或使用 NGX_LOWAT_EVENT 设置时的 lowat
*
* iocp: TODO
*
* otherwise:
* accept: 1 如果 accept many,否则 0
* read: 当事件就绪时要读取的字节数,-1 如果未知
*/
int available;
ngx_event_handler_pt handler; // 事件处理函数指针
#if (NGX_HAVE_IOCP)
ngx_event_ovlp_t ovlp;
#endif
ngx_uint_t index; // epoll 数组中的索引
ngx_log_t *log; // 日志对象
ngx_rbtree_node_t timer; // 定时器红黑树节点
/* posted 队列节点 */
ngx_queue_t queue;
};
1.2 字段详解
| 字段 | 类型 | 说明 |
|---|---|---|
| data | void * | 通常指向关联的 ngx_connection_t 对象 |
| write | unsigned:1 | 1 表示写事件,0 表示读事件 |
| accept | unsigned:1 | 1 表示这是监听套接字的 accept 事件 |
| instance | unsigned:1 | 用于防止过期事件,每次连接复用时翻转 |
| active | unsigned:1 | 事件是否已添加到事件模型(epoll/kqueue) |
| ready | unsigned:1 | 事件是否就绪(可读/可写) |
| timedout | unsigned:1 | 事件是否超时 |
| timer_set | unsigned:1 | 定时器是否已设置 |
| posted | unsigned:1 | 事件是否在 posted 队列中 |
| handler | ngx_event_handler_pt | 事件处理函数,当事件就绪时调用 |
| timer | ngx_rbtree_node_t | 定时器红黑树节点,key 为绝对超时时间 |
| queue | ngx_queue_t | posted 队列链表节点 |
1.3 Instance 机制
目的: 防止过期事件误触发
工作原理:
// 获取连接时
c = ngx_get_connection(s, log);
rev = c->read;
wev = c->write;
// 翻转 instance
instance = rev->instance;
rev->instance = !instance;
wev->instance = !instance;
// epoll 事件数据中保存 instance
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);
// 事件触发时检查 instance
c = event_list[i].data.ptr;
instance = (uintptr_t) c & 1;
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
rev = c->read;
if (c->fd == -1 || rev->instance != instance) {
continue; // 过期事件,忽略
}
防止的场景:
- 连接 A 关闭,连接对象归还到空闲链表
- epoll 中仍有连接 A 的事件(延迟通知)
- 连接对象被复用为连接 B,instance 翻转
- 连接 A 的过期事件触发时,instance 不匹配,被忽略
2. 事件操作接口 ngx_event_actions_t
2.1 结构体定义
定义位置: src/event/ngx_event.h
typedef struct {
ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
ngx_int_t (*add_conn)(ngx_connection_t *c);
ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);
ngx_int_t (*notify)(ngx_event_handler_pt handler);
ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,
ngx_uint_t flags);
ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer);
void (*done)(ngx_cycle_t *cycle);
} ngx_event_actions_t;
2.2 函数指针说明
| 函数 | 说明 |
|---|---|
| add | 添加事件到事件模型(epoll_ctl ADD/MOD) |
| del | 从事件模型删除事件(epoll_ctl DEL/MOD) |
| enable | 启用事件(有些模型需要) |
| disable | 禁用事件 |
| add_conn | 添加连接(同时添加读写事件) |
| del_conn | 删除连接 |
| notify | 通知事件(线程间通信) |
| process_events | 处理事件(epoll_wait) |
| init | 初始化事件模型 |
| done | 清理事件模型 |
2.3 不同事件模型的实现
Epoll 实现 (Linux):
ngx_event_actions_t ngx_epoll_module_ctx_actions = {
ngx_epoll_add_event, /* add an event */
ngx_epoll_del_event, /* delete an event */
ngx_epoll_add_event, /* enable an event */
ngx_epoll_del_event, /* disable an event */
ngx_epoll_add_connection, /* add an connection */
ngx_epoll_del_connection, /* delete an connection */
NULL, /* trigger a notify */
ngx_epoll_process_events, /* process the events */
ngx_epoll_init, /* init the events */
ngx_epoll_done, /* done the events */
};
Kqueue 实现 (BSD):
ngx_event_actions_t ngx_kqueue_module_ctx_actions = {
ngx_kqueue_add_event,
ngx_kqueue_del_event,
ngx_kqueue_add_event,
ngx_kqueue_del_event,
ngx_kqueue_add_connection,
ngx_kqueue_del_connection,
NULL,
ngx_kqueue_process_events,
ngx_kqueue_init,
ngx_kqueue_done
};
Select 实现 (通用):
ngx_event_actions_t ngx_select_module_ctx_actions = {
ngx_select_add_event,
ngx_select_del_event,
ngx_select_add_event,
ngx_select_del_event,
NULL,
NULL,
NULL,
ngx_select_process_events,
ngx_select_init,
ngx_select_done
};
3. 事件配置 ngx_event_conf_t
3.1 结构体定义
typedef struct {
ngx_uint_t connections; // worker_connections
ngx_uint_t use; // 事件模型索引
ngx_flag_t multi_accept; // 是否一次 accept 多个连接
ngx_flag_t accept_mutex; // 是否启用 accept 互斥锁
ngx_msec_t accept_mutex_delay; // accept 互斥锁延迟(默认 500ms)
u_char *name; // 事件模型名称
} ngx_event_conf_t;
3.2 Epoll 特定配置
typedef struct {
ngx_uint_t events; // epoll_wait 返回的最大事件数
ngx_uint_t aio_requests; // AIO 请求数(未使用)
} ngx_epoll_conf_t;
4. 定时器红黑树
4.1 数据结构
红黑树节点:
typedef struct ngx_rbtree_node_s ngx_rbtree_node_t;
struct ngx_rbtree_node_s {
ngx_rbtree_key_t key; // 键值(绝对超时时间 ngx_current_msec)
ngx_rbtree_node_t *left; // 左子节点
ngx_rbtree_node_t *right; // 右子节点
ngx_rbtree_node_t *parent; // 父节点
u_char color; // 红/黑
u_char data; // 数据(未使用)
};
红黑树:
typedef struct ngx_rbtree_s ngx_rbtree_t;
struct ngx_rbtree_s {
ngx_rbtree_node_t *root; // 根节点
ngx_rbtree_node_t *sentinel; // 哨兵节点(叶子节点)
ngx_rbtree_insert_pt insert; // 插入函数
};
4.2 定时器树全局变量
// 定时器红黑树
ngx_rbtree_t ngx_event_timer_rbtree;
static ngx_rbtree_node_t ngx_event_timer_sentinel;
4.3 内存布局
ngx_event_t
├─ timer (ngx_rbtree_node_t)
│ ├─ key = ngx_current_msec + timeout // 绝对超时时间
│ ├─ left ────→ 左子树
│ ├─ right ───→ 右子树
│ ├─ parent ──→ 父节点
│ └─ color
ngx_event_timer_rbtree (全局)
├─ root ──────→ 根节点
├─ sentinel ──→ 哨兵节点
└─ insert ────→ ngx_rbtree_insert_timer_value
查找最小超时:
从 root 一直向左走到叶子
O(log n) 复杂度,但实际上是 O(1)
因为最小节点地址被缓存
5. Posted 事件队列
5.1 队列节点
typedef struct ngx_queue_s ngx_queue_t;
struct ngx_queue_s {
ngx_queue_t *prev; // 前一个节点
ngx_queue_t *next; // 后一个节点
};
5.2 全局队列
// Accept 事件队列(优先处理)
ngx_queue_t ngx_posted_accept_events;
// 延迟事件队列(允许在当前循环多次触发)
ngx_queue_t ngx_posted_next_events;
// 普通事件队列(下一轮循环处理)
ngx_queue_t ngx_posted_events;
5.3 队列操作
初始化:
ngx_queue_init(&ngx_posted_accept_events);
ngx_queue_init(&ngx_posted_events);
插入:
static ngx_inline void
ngx_post_event(ngx_event_t *ev, ngx_queue_t *queue)
{
if (!ev->posted) {
ev->posted = 1;
ngx_queue_insert_tail(queue, &ev->queue);
}
}
处理:
void
ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted)
{
ngx_queue_t *q;
ngx_event_t *ev;
while (!ngx_queue_empty(posted)) {
q = ngx_queue_head(posted);
ev = ngx_queue_data(q, ngx_event_t, queue);
ngx_queue_remove(q);
ev->posted = 0;
ev->handler(ev);
}
}
6. Accept 互斥锁
6.1 数据结构
// 共享内存互斥锁
typedef struct {
ngx_atomic_t *lock; // 原子锁变量
ngx_atomic_t *wait; // 等待者计数
ngx_uint_t semaphore; // 是否使用信号量
ngx_fd_t fd; // 信号量文件描述符
u_char *name; // 锁名称
} ngx_shmtx_t;
// 全局互斥锁
ngx_shmtx_t ngx_accept_mutex;
ngx_atomic_t *ngx_accept_mutex_ptr; // 指向共享内存中的锁变量
6.2 锁状态
// 是否持有锁
ngx_uint_t ngx_accept_mutex_held;
// accept 事件计数
ngx_uint_t ngx_accept_events;
// 禁用 accept 计数(负载均衡)
ngx_int_t ngx_accept_disabled;
6.3 负载均衡算法
// 在 ngx_event_accept 中计算
ngx_accept_disabled = ngx_cycle->connection_n / 8
- ngx_cycle->free_connection_n;
// 在主循环中检查
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--; // 主动不竞争锁
} else {
// 竞争 accept 互斥锁
}
说明:
- 当空闲连接数 < 总连接数的 7/8 时,
ngx_accept_disabled> 0 - Worker 主动放弃竞争 accept 锁,让其他 Worker 接受新连接
- 实现简单的负载均衡,避免某个 Worker 连接数过多
7. 内存布局总结
Worker 进程内存布局:
全局变量区:
├─ ngx_event_timer_rbtree // 定时器红黑树
├─ ngx_posted_accept_events // Accept 事件队列
├─ ngx_posted_events // 普通事件队列
├─ ngx_accept_mutex // Accept 互斥锁
└─ ngx_event_actions // 事件操作接口
ngx_cycle_t:
├─ connections[worker_connections] // 连接池数组
├─ read_events[worker_connections] // 读事件数组
└─ write_events[worker_connections] // 写事件数组
每个连接:
├─ ngx_connection_t
│ ├─ read ───→ ngx_event_t
│ │ ├─ timer ───→ 红黑树节点
│ │ └─ queue ───→ Posted 队列节点
│ └─ write ──→ ngx_event_t
│ ├─ timer ───→ 红黑树节点
│ └─ queue ───→ Posted 队列节点
事件模型数据(epoll):
├─ ep (epoll fd)
└─ event_list[nevents] // epoll_wait 返回的事件数组
相关文档:
时序图
时序图总览
本文档包含 Event 模块的关键时序图:
- 事件循环主流程
- Epoll 事件处理流程
- 定时器添加与触发流程
- Accept 连接处理流程
- Accept 互斥锁竞争流程
- Posted 事件队列处理流程
1. 事件循环主流程
sequenceDiagram
autonumber
participant Worker as Worker进程
participant MainLoop as 主循环
participant Timer as 定时器管理
participant Mutex as Accept互斥锁
participant Epoll as Epoll模块
participant Posted as Posted队列
Note over Worker: Worker 启动后进入主循环
Worker->>MainLoop: ngx_worker_process_cycle()
loop 无限循环
Note over MainLoop: 开始新一轮事件循环
MainLoop->>MainLoop: ngx_process_events_and_timers()
Note over Timer: 查找最近的定时器超时时间
MainLoop->>Timer: ngx_event_find_timer()
Timer->>Timer: 红黑树查找最小key
Timer-->>MainLoop: 返回超时时间 timer
Note over Mutex: Accept 互斥锁处理
alt 启用 accept_mutex
alt ngx_accept_disabled > 0
MainLoop->>MainLoop: ngx_accept_disabled--
MainLoop->>MainLoop: 不竞争锁,负载均衡
else
MainLoop->>Mutex: ngx_trylock_accept_mutex()
alt 获得锁
Mutex->>Mutex: ngx_accept_mutex_held = 1
Mutex->>Mutex: 启用监听套接字事件
Mutex-->>MainLoop: 返回成功
MainLoop->>MainLoop: flags |= NGX_POST_EVENTS
else 未获得锁
Mutex->>Mutex: 禁用监听套接字事件
Mutex-->>MainLoop: 返回失败
MainLoop->>MainLoop: timer = min(timer, accept_mutex_delay)
end
end
end
Note over Epoll: 等待 I/O 事件
MainLoop->>Epoll: ngx_process_events(cycle, timer, flags)
Epoll->>Epoll: epoll_wait(ep, event_list, nevents, timer)
alt 有事件就绪
Epoll->>Epoll: 更新时间缓存 ngx_time_update()
loop 遍历就绪事件
Epoll->>Epoll: 获取连接和事件对象
Epoll->>Epoll: 检查 instance 防止过期事件
alt 读事件就绪
Epoll->>Epoll: rev->ready = 1
alt flags & NGX_POST_EVENTS
Epoll->>Posted: ngx_post_event(rev, queue)
else
Epoll->>Epoll: rev->handler(rev)
end
end
alt 写事件就绪
Epoll->>Epoll: wev->ready = 1
alt flags & NGX_POST_EVENTS
Epoll->>Posted: ngx_post_event(wev, queue)
else
Epoll->>Epoll: wev->handler(wev)
end
end
end
Epoll-->>MainLoop: 返回处理的事件数
else 超时或中断
Epoll-->>MainLoop: 返回 0 或错误
end
Note over Posted: 处理 Accept 事件队列(优先)
MainLoop->>Posted: ngx_event_process_posted(&ngx_posted_accept_events)
loop 队列不为空
Posted->>Posted: 取出队列头节点
Posted->>Posted: ev->handler(ev) 调用 ngx_event_accept
end
Note over Mutex: 释放 Accept 互斥锁
alt ngx_accept_mutex_held
MainLoop->>Mutex: ngx_shmtx_unlock(&ngx_accept_mutex)
Mutex->>Mutex: ngx_accept_mutex_held = 0
end
Note over Timer: 处理过期定时器
MainLoop->>Timer: ngx_event_expire_timers()
loop 红黑树最小节点超时
Timer->>Timer: 取出最小节点
Timer->>Timer: ngx_rbtree_delete() 删除节点
Timer->>Timer: ev->timedout = 1
Timer->>Timer: ev->handler(ev) 调用超时处理函数
end
Note over Posted: 处理普通事件队列
MainLoop->>Posted: ngx_event_process_posted(&ngx_posted_events)
loop 队列不为空
Posted->>Posted: 取出队列头节点
Posted->>Posted: ev->handler(ev) 调用事件处理函数
end
end
关键步骤说明:
-
定时器查找(步骤 4-6):
- 红黑树最左节点的 key 即为最近超时时间
- O(1) 复杂度,无需遍历
-
Accept 互斥锁(步骤 8-20):
- 负载均衡:连接数过多的 Worker 主动放弃竞争
- 获得锁的 Worker 延迟处理 accept 事件
- 未获得锁的 Worker 缩短等待时间,下次快速重试
-
事件延迟处理(步骤 30-35):
NGX_POST_EVENTS标志表示将事件放入队列- Accept 事件放入
ngx_posted_accept_events - 普通事件放入
ngx_posted_events - 优点:避免在 epoll_wait 回调中长时间处理,快速返回继续 epoll_wait
-
处理顺序(步骤 43-62):
- Accept 事件优先处理(建立新连接)
- 释放 accept 锁
- 处理定时器
- 处理普通事件
2. Epoll 事件处理详细流程
sequenceDiagram
autonumber
participant MainLoop as 主循环
participant Epoll as Epoll模块
participant Kernel as 内核
participant Event as ngx_event_t
participant Handler as 事件处理器
MainLoop->>Epoll: ngx_epoll_process_events(cycle, timer, flags)
Note over Epoll,Kernel: 阻塞等待事件
Epoll->>Kernel: epoll_wait(ep, event_list, nevents, timer)
alt 有事件就绪
Kernel-->>Epoll: 返回就绪事件数 events
else 超时
Kernel-->>Epoll: 返回 0
else 中断
Kernel-->>Epoll: 返回 -1, errno = EINTR
end
Note over Epoll: 更新时间缓存
Epoll->>Epoll: ngx_time_update()
alt events == -1 && errno == EINTR
Epoll-->>MainLoop: 返回 NGX_OK(被信号中断)
else events == 0
Epoll-->>MainLoop: 返回 NGX_OK(超时)
else events > 0
Note over Epoll: 遍历就绪事件
loop i = 0 to events - 1
Epoll->>Epoll: c = event_list[i].data.ptr
Epoll->>Epoll: instance = c & 1
Epoll->>Epoll: c = c & ~1
Epoll->>Event: rev = c->read
Note over Epoll: 检查连接有效性和 instance
alt c->fd == -1 || rev->instance != instance
Epoll->>Epoll: continue(过期事件,跳过)
end
Epoll->>Epoll: revents = event_list[i].events
Note over Epoll: 处理读事件
alt revents & EPOLLIN
Epoll->>Event: rev->ready = 1
Epoll->>Event: rev->available = -1
alt flags & NGX_POST_EVENTS
alt rev->accept
Epoll->>Epoll: ngx_post_event(rev, &ngx_posted_accept_events)
else
Epoll->>Epoll: ngx_post_event(rev, &ngx_posted_events)
end
else
Epoll->>Handler: rev->handler(rev)
end
end
Note over Epoll: 处理写事件
alt revents & EPOLLOUT
Epoll->>Event: wev = c->write
Epoll->>Event: wev->ready = 1
alt flags & NGX_POST_EVENTS
Epoll->>Epoll: ngx_post_event(wev, &ngx_posted_events)
else
Epoll->>Handler: wev->handler(wev)
end
end
Note over Epoll: 处理错误和关闭
alt revents & (EPOLLERR|EPOLLHUP)
Epoll->>Event: rev->ready = 1
Epoll->>Event: wev->ready = 1
end
alt revents & EPOLLRDHUP
Epoll->>Event: rev->pending_eof = 1
end
end
Epoll-->>MainLoop: 返回 NGX_OK
end
3. 定时器添加与触发流程
3.1 添加定时器
sequenceDiagram
autonumber
participant User as 用户代码
participant Timer as ngx_event_add_timer
participant Tree as 红黑树
User->>Timer: ngx_event_add_timer(ev, timeout)
Timer->>Timer: key = ngx_current_msec + timeout
alt ev->timer_set == 1
Note over Timer: 定时器已存在
Timer->>Timer: diff = key - ev->timer.key
alt diff == 0
Timer-->>User: 返回(超时时间未变)
else
Timer->>Tree: ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer)
Timer->>Timer: ev->timer_set = 0
end
end
Timer->>Timer: ev->timer.key = key
Timer->>Tree: ngx_rbtree_insert(&ngx_event_timer_rbtree, &ev->timer)
Note over Tree: 红黑树插入 O(log n)
Tree->>Tree: 找到插入位置
Tree->>Tree: 插入节点
Tree->>Tree: 调整红黑树平衡
Timer->>Timer: ev->timer_set = 1
Timer-->>User: 返回
3.2 触发定时器
sequenceDiagram
autonumber
participant MainLoop as 主循环
participant Expire as ngx_event_expire_timers
participant Tree as 红黑树
participant Event as ngx_event_t
participant Handler as 超时处理器
MainLoop->>Expire: ngx_event_expire_timers()
loop 处理所有过期定时器
Expire->>Tree: root = ngx_event_timer_rbtree.root
alt root == sentinel
Expire-->>MainLoop: 返回(红黑树为空)
end
Expire->>Tree: node = ngx_rbtree_min(root, sentinel)
Expire->>Tree: 获取最小 key 节点
alt node->key - ngx_current_msec > 0
Expire-->>MainLoop: 返回(最小的都未超时)
end
Note over Expire: 定时器已超时
Expire->>Event: ev = ngx_rbtree_data(node, ngx_event_t, timer)
Expire->>Tree: ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer)
Note over Tree: 红黑树删除 O(log n)
Tree->>Tree: 删除节点
Tree->>Tree: 调整红黑树平衡
Expire->>Event: ev->timer_set = 0
Expire->>Event: ev->timedout = 1
Expire->>Handler: ev->handler(ev)
Note over Handler: 超时处理示例
Handler->>Handler: 检查 ev->timedout
Handler->>Handler: 关闭连接或重试
end
4. Accept 连接处理流程
sequenceDiagram
autonumber
participant Epoll as Epoll
participant Accept as ngx_event_accept
participant Kernel as 内核
participant Conn as ngx_connection_t
participant Pool as 内存池
participant Protocol as 协议处理器
Epoll->>Accept: 监听套接字可读事件触发
Accept->>Accept: ev->handler = ngx_event_accept
Note over Accept: 批量 accept 循环
loop multi_accept
Accept->>Kernel: accept4(lc->fd, &sa, &socklen, SOCK_NONBLOCK)
alt 成功接受连接
Kernel-->>Accept: 返回新连接 fd = s
else EAGAIN
Accept-->>Epoll: 返回(没有新连接了)
else ECONNABORTED
Accept->>Accept: continue(客户端中止,继续)
else 其他错误
Accept-->>Epoll: 返回(记录日志)
end
Note over Accept,Conn: 获取连接对象
Accept->>Conn: ngx_get_connection(s, log)
Conn->>Conn: 从 cycle->free_connections 取出
Conn->>Conn: 重置连接状态
Conn->>Conn: 翻转 instance
Conn-->>Accept: 返回 c
alt c == NULL
Accept->>Kernel: close(s)
Accept-->>Epoll: 返回(连接池耗尽)
end
Note over Accept,Pool: 创建连接内存池
Accept->>Pool: ngx_create_pool(ls->pool_size, log)
Pool-->>Accept: 返回 c->pool
alt pool == NULL
Accept->>Conn: ngx_close_accepted_connection(c)
Accept-->>Epoll: 返回
end
Note over Accept: 保存客户端地址
Accept->>Accept: c->sockaddr = ngx_palloc(c->pool, socklen)
Accept->>Accept: ngx_memcpy(c->sockaddr, sa, socklen)
Note over Accept: 设置日志对象
Accept->>Accept: c->log = ngx_palloc(c->pool, sizeof(ngx_log_t))
Accept->>Accept: *c->log = ls->log
Note over Accept: 设置 I/O 函数
Accept->>Conn: c->recv = ngx_recv
Accept->>Conn: c->send = ngx_send
Accept->>Conn: c->recv_chain = ngx_recv_chain
Accept->>Conn: c->send_chain = ngx_send_chain
Note over Accept: 设置连接属性
Accept->>Conn: c->listening = ls
Accept->>Conn: c->local_sockaddr = ls->sockaddr
Note over Accept: 设置套接字选项
Accept->>Kernel: setsockopt(s, TCP_NODELAY)
Note over Accept: 设置读写事件就绪状态
Accept->>Conn: c->write->ready = 1
Note over Accept,Protocol: 调用协议处理器
Accept->>Protocol: ls->handler(c)
alt HTTP 协议
Protocol->>Protocol: ngx_http_init_connection(c)
Protocol->>Protocol: 注册读事件 ngx_http_wait_request_handler
Protocol->>Protocol: 添加读超时定时器
else Stream 协议
Protocol->>Protocol: ngx_stream_init_connection(c)
end
Note over Accept: 检查是否继续 accept
alt ev->available > 0
Accept->>Accept: continue(继续循环)
else
Accept-->>Epoll: 返回
end
end
5. Accept 互斥锁竞争流程
sequenceDiagram
autonumber
participant W1 as Worker 1
participant W2 as Worker 2
participant W3 as Worker 3
participant Shm as 共享内存锁
participant Listen as 监听套接字
Note over W1,W3: 三个 Worker 进程竞争 accept 锁
par Worker 1 竞争
W1->>W1: ngx_trylock_accept_mutex()
W1->>Shm: ngx_shmtx_trylock(&ngx_accept_mutex)
Shm->>Shm: CAS 原子操作尝试获取锁
Shm-->>W1: 成功获得锁
W1->>W1: ngx_accept_mutex_held = 1
W1->>Listen: ngx_enable_accept_events()
W1->>Listen: 注册监听套接字的读事件到 epoll
W1->>W1: flags |= NGX_POST_EVENTS
and Worker 2 竞争
W2->>W2: ngx_trylock_accept_mutex()
W2->>Shm: ngx_shmtx_trylock(&ngx_accept_mutex)
Shm-->>W2: 失败(锁已被占用)
W2->>W2: ngx_accept_mutex_held = 0
W2->>Listen: ngx_disable_accept_events()
W2->>Listen: 从 epoll 删除监听套接字的读事件
W2->>W2: timer = min(timer, accept_mutex_delay)
and Worker 3 竞争
W3->>W3: ngx_trylock_accept_mutex()
W3->>Shm: ngx_shmtx_trylock(&ngx_accept_mutex)
Shm-->>W3: 失败(锁已被占用)
W3->>W3: ngx_accept_mutex_held = 0
W3->>Listen: ngx_disable_accept_events()
W3->>W3: timer = min(timer, accept_mutex_delay)
end
Note over W1,W3: Worker 1 处理 accept 事件
W1->>W1: epoll_wait() 返回监听套接字可读
W1->>W1: ngx_event_accept() 接受新连接
Note over W1,W3: 处理完 accept 事件后释放锁
W1->>Shm: ngx_shmtx_unlock(&ngx_accept_mutex)
Shm->>Shm: 释放锁
W1->>W1: ngx_accept_mutex_held = 0
Note over W2,W3: 下一轮循环,Worker 2/3 再次竞争
par Worker 2 再次竞争
W2->>Shm: ngx_shmtx_trylock(&ngx_accept_mutex)
Shm-->>W2: 成功获得锁
W2->>Listen: ngx_enable_accept_events()
and Worker 3 再次竞争
W3->>Shm: ngx_shmtx_trylock(&ngx_accept_mutex)
Shm-->>W3: 失败
W3->>Listen: ngx_disable_accept_events()
end
负载均衡机制:
sequenceDiagram
autonumber
participant Worker as Worker进程
participant Check as 负载检查
participant Mutex as 互斥锁
Worker->>Check: 每次事件循环开始前检查
Check->>Check: 计算 ngx_accept_disabled
Check->>Check: disabled = conn_n / 8 - free_conn_n
alt disabled > 0
Note over Check: 空闲连接少于 1/8,负载高
Check->>Check: ngx_accept_disabled--
Check->>Worker: 不竞争锁,跳过 accept
else disabled <= 0
Note over Check: 空闲连接充足,可以接受新连接
Check->>Mutex: ngx_trylock_accept_mutex()
alt 获得锁
Mutex-->>Worker: 处理 accept 事件
else 未获得锁
Mutex-->>Worker: 缩短等待时间,快速重试
end
end
6. Posted 事件队列处理流程
sequenceDiagram
autonumber
participant MainLoop as 主循环
participant Epoll as Epoll
participant Queue as 事件队列
participant Event as ngx_event_t
participant Handler as 事件处理器
Note over MainLoop: epoll_wait 返回后
MainLoop->>Epoll: ngx_epoll_process_events()
loop 遍历就绪事件
Epoll->>Event: rev->ready = 1
alt flags & NGX_POST_EVENTS
Note over Epoll,Queue: 延迟处理,加入队列
Epoll->>Queue: ngx_post_event(rev, queue)
alt !ev->posted
Queue->>Queue: ev->posted = 1
Queue->>Queue: ngx_queue_insert_tail(queue, &ev->queue)
end
else
Note over Epoll,Handler: 立即处理
Epoll->>Handler: rev->handler(rev)
end
end
Epoll-->>MainLoop: 返回
Note over MainLoop,Queue: 优先处理 Accept 队列
MainLoop->>Queue: ngx_event_process_posted(&ngx_posted_accept_events)
loop !ngx_queue_empty(posted)
Queue->>Queue: q = ngx_queue_head(posted)
Queue->>Event: ev = ngx_queue_data(q, ngx_event_t, queue)
Queue->>Queue: ngx_queue_remove(q)
Queue->>Event: ev->posted = 0
Queue->>Handler: ev->handler(ev)
Handler->>Handler: ngx_event_accept() 接受新连接
end
Note over MainLoop: 释放 accept 互斥锁
MainLoop->>MainLoop: ngx_shmtx_unlock(&ngx_accept_mutex)
Note over MainLoop: 处理定时器
MainLoop->>MainLoop: ngx_event_expire_timers()
Note over MainLoop,Queue: 处理普通事件队列
MainLoop->>Queue: ngx_event_process_posted(&ngx_posted_events)
loop !ngx_queue_empty(posted)
Queue->>Queue: q = ngx_queue_head(posted)
Queue->>Event: ev = ngx_queue_data(q, ngx_event_t, queue)
Queue->>Queue: ngx_queue_remove(q)
Queue->>Event: ev->posted = 0
Queue->>Handler: ev->handler(ev)
Handler->>Handler: 处理读写事件
end
延迟处理的优点:
- 快速返回 epoll_wait: 不在回调中长时间处理,快速返回继续监听
- 控制处理顺序: Accept 事件优先,保证新连接及时建立
- 避免竞争条件: Accept 处理完后再释放锁,避免其他 Worker 同时 accept
- 批量处理: 多个事件积累后批量处理,提高缓存局部性