Nginx 事件模块深度分析
1. 概述
事件模块是Nginx高性能的核心所在,实现了异步非阻塞的事件驱动架构。本文档深入分析事件模块的设计原理、不同事件机制的实现以及性能优化策略。
2. 事件模块架构图
graph TB
subgraph "Event Module Architecture"
EM[Event Module<br/>ngx_event_module]
subgraph "Event Core"
EC[Event Core<br/>ngx_event_core_module]
EA[Event Actions<br/>ngx_event_actions_t]
ET[Event Timer<br/>Timer Management]
end
subgraph "Event Mechanisms"
EP[Epoll Module<br/>Linux]
KQ[Kqueue Module<br/>BSD/macOS]
SEL[Select Module<br/>Universal]
POLL[Poll Module<br/>POSIX]
DEV[DevPoll Module<br/>Solaris]
PORT[EventPort Module<br/>Solaris]
end
subgraph "Event Processing"
EL[Event Loop<br/>ngx_process_events_and_timers]
EH[Event Handlers<br/>Connection/Request Handlers]
EP_QUEUE[Posted Events Queue<br/>Deferred Processing]
end
EM --> EC
EC --> EA
EC --> ET
EA --> EP
EA --> KQ
EA --> SEL
EA --> POLL
EA --> DEV
EA --> PORT
EC --> EL
EL --> EH
EL --> EP_QUEUE
end
3. 事件结构定义
3.1 核心事件结构
// 事件结构定义
struct ngx_event_s {
void *data; // 事件数据(通常是连接对象)
// 事件标志位
unsigned write:1; // 写事件
unsigned accept:1; // 接受连接事件
unsigned instance:1; // 实例标识(防止过期事件)
unsigned active:1; // 事件已激活
unsigned disabled:1; // 事件已禁用
unsigned ready:1; // 事件就绪
unsigned oneshot:1; // 一次性事件
unsigned complete:1; // AIO操作完成
unsigned eof:1; // 连接结束
unsigned error:1; // 错误标志
unsigned timedout:1; // 超时标志
unsigned timer_set:1; // 定时器已设置
unsigned delayed:1; // 延迟处理
unsigned deferred_accept:1; // 延迟接受
unsigned pending_eof:1; // 待处理的EOF
unsigned posted:1; // 已投递到队列
unsigned closed:1; // 连接已关闭
int available; // 可用数据量
ngx_event_handler_pt handler; // 事件处理函数
ngx_uint_t index; // 事件索引
ngx_log_t *log; // 日志对象
ngx_rbtree_node_t timer; // 定时器节点
ngx_queue_t queue; // 队列节点
};
3.2 事件动作接口
// 事件操作接口
typedef struct {
// 添加/删除事件
ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
// 启用/禁用事件
ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
// 连接管理
ngx_int_t (*add_conn)(ngx_connection_t *c);
ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);
// 通知机制
ngx_int_t (*notify)(ngx_event_handler_pt handler);
// 事件处理
ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,
ngx_uint_t flags);
// 初始化/清理
ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer);
void (*done)(ngx_cycle_t *cycle);
} ngx_event_actions_t;
4. 事件机制实现
4.1 Epoll 实现 (Linux)
初始化函数
static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
ngx_epoll_conf_t *epcf;
epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);
if (ep == -1) {
// 创建epoll实例
ep = epoll_create(cycle->connection_n / 2);
if (ep == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"epoll_create() failed");
return NGX_ERROR;
}
#if (NGX_HAVE_EVENTFD)
// 初始化通知机制
if (ngx_epoll_notify_init(cycle->log) != NGX_OK) {
ngx_epoll_module_ctx.actions.notify = NULL;
}
#endif
#if (NGX_HAVE_FILE_AIO)
// 初始化异步I/O
ngx_epoll_aio_init(cycle, epcf);
#endif
}
// 分配事件列表
if (nevents < epcf->events) {
if (event_list) {
ngx_free(event_list);
}
event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
cycle->log);
if (event_list == NULL) {
return NGX_ERROR;
}
}
nevents = epcf->events;
// 设置I/O操作和事件动作
ngx_io = ngx_os_io;
ngx_event_actions = ngx_epoll_module_ctx.actions;
// 设置事件标志
#if (NGX_HAVE_CLEAR_EVENT)
ngx_event_flags = NGX_USE_CLEAR_EVENT
#else
ngx_event_flags = NGX_USE_LEVEL_EVENT
#endif
|NGX_USE_GREEDY_EVENT
|NGX_USE_EPOLL_EVENT;
return NGX_OK;
}
添加事件函数
static ngx_int_t
ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
int op;
uint32_t events, prev;
ngx_event_t *e;
ngx_connection_t *c;
struct epoll_event ee;
c = ev->data;
events = (uint32_t) event;
if (event == NGX_READ_EVENT) {
e = c->write;
prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP)
events = EPOLLIN|EPOLLRDHUP;
#endif
} else {
e = c->read;
prev = EPOLLIN|EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
events = EPOLLOUT;
#endif
}
if (e->active) {
op = EPOLL_CTL_MOD;
events |= prev;
} else {
op = EPOLL_CTL_ADD;
}
#if (NGX_HAVE_EPOLLEXCLUSIVE && NGX_HAVE_EPOLLRDHUP)
if (flags & NGX_EXCLUSIVE_EVENT) {
events &= ~EPOLLRDHUP;
}
#endif
ee.events = events | (uint32_t) flags;
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"epoll add event: fd:%d op:%d ev:%08XD",
c->fd, op, ee.events);
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"epoll_ctl(%d, %d) failed", op, c->fd);
return NGX_ERROR;
}
ev->active = 1;
#if 0
ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
#endif
return NGX_OK;
}
事件处理函数
static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
int events;
uint32_t revents;
ngx_int_t instance, i;
ngx_uint_t level;
ngx_err_t err;
ngx_event_t *rev, *wev;
ngx_queue_t *queue;
ngx_connection_t *c;
// 等待事件发生
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll timer: %M", timer);
events = epoll_wait(ep, event_list, (int) nevents, timer);
err = (events == -1) ? ngx_errno : 0;
// 更新时间
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
ngx_time_update();
}
// 处理错误
if (err) {
if (err == NGX_EINTR) {
if (ngx_event_timer_alarm) {
ngx_event_timer_alarm = 0;
return NGX_OK;
}
level = NGX_LOG_INFO;
} else {
level = NGX_LOG_ALERT;
}
ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
return NGX_ERROR;
}
if (events == 0) {
if (timer != NGX_TIMER_INFINITE) {
return NGX_OK;
}
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"epoll_wait() returned no events without timeout");
return NGX_ERROR;
}
// 处理每个就绪事件
for (i = 0; i < events; i++) {
c = event_list[i].data.ptr;
instance = (uintptr_t) c & 1;
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
rev = c->read;
if (c->fd == -1 || rev->instance != instance) {
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}
revents = event_list[i].events;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: fd:%d ev:%04XD d:%p",
c->fd, revents, event_list[i].data.ptr);
// 处理错误事件
if (revents & (EPOLLERR|EPOLLHUP)) {
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll_wait() error on fd:%d ev:%04XD",
c->fd, revents);
/*
* if the error events were returned, add EPOLLIN and EPOLLOUT
* to handle the events at least in one active handler
*/
revents |= EPOLLIN|EPOLLOUT;
}
#if 0
if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"strange epoll_wait() events fd:%d ev:%04XD",
c->fd, revents);
}
#endif
// 处理读事件
if ((revents & EPOLLIN) && rev->active) {
#if (NGX_HAVE_EPOLLRDHUP)
if (revents & EPOLLRDHUP) {
rev->pending_eof = 1;
}
#endif
rev->ready = 1;
rev->available = -1;
if (flags & NGX_POST_EVENTS) {
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(rev, queue);
} else {
rev->handler(rev);
}
}
wev = c->write;
// 处理写事件
if ((revents & EPOLLOUT) && wev->active) {
if (c->fd == -1 || wev->instance != instance) {
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}
wev->ready = 1;
#if (NGX_THREADS)
wev->complete = 1;
#endif
if (flags & NGX_POST_EVENTS) {
ngx_post_event(wev, &ngx_posted_events);
} else {
wev->handler(wev);
}
}
}
return NGX_OK;
}
4.2 Kqueue 实现 (BSD/macOS)
初始化函数
static ngx_int_t
ngx_kqueue_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
ngx_kqueue_conf_t *kcf;
struct timespec ts;
struct kevent kev;
kcf = ngx_event_get_conf(cycle->conf_ctx, ngx_kqueue_module);
if (kq == -1) {
kq = kqueue();
if (kq == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"kqueue() failed");
return NGX_ERROR;
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "kqueue created");
}
if (nevents < kcf->events) {
if (change_list) {
ngx_free(change_list);
}
if (event_list) {
ngx_free(event_list);
}
change_list = ngx_alloc(kcf->changes * sizeof(struct kevent),
cycle->log);
if (change_list == NULL) {
return NGX_ERROR;
}
event_list = ngx_alloc(kcf->events * sizeof(struct kevent),
cycle->log);
if (event_list == NULL) {
return NGX_ERROR;
}
}
nchanges = 0;
nevents = kcf->events;
ngx_io = ngx_os_io;
ngx_event_actions = ngx_kqueue_module_ctx.actions;
ngx_event_flags = NGX_USE_ONESHOT_EVENT|NGX_USE_KQUEUE_EVENT
|NGX_USE_LOWAT_EVENT|NGX_USE_VNODE_EVENT;
return NGX_OK;
}
4.3 Select 实现 (通用)
事件处理函数
static ngx_int_t
ngx_select_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
ngx_uint_t flags)
{
int ready, nready;
ngx_err_t err;
ngx_uint_t i, found;
ngx_event_t *ev;
ngx_connection_t *c;
struct timeval tv, *tp;
fd_set work_read_fd_set, work_write_fd_set;
if (max_fd == -1) {
return NGX_OK;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"select timer: %M", timer);
work_read_fd_set = master_read_fd_set;
work_write_fd_set = master_write_fd_set;
if (timer == NGX_TIMER_INFINITE) {
tp = NULL;
} else {
tv.tv_sec = (long) (timer / 1000);
tv.tv_usec = (long) ((timer % 1000) * 1000);
tp = &tv;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"select read fd_set: %08Xd", *(int *) &work_read_fd_set);
ready = select(max_fd + 1, &work_read_fd_set, &work_write_fd_set, NULL, tp);
err = (ready == -1) ? ngx_errno : 0;
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
ngx_time_update();
}
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"select ready %d of %d", ready, nevents);
if (err) {
if (err == NGX_EINTR) {
if (ngx_event_timer_alarm) {
ngx_event_timer_alarm = 0;
return NGX_OK;
}
level = NGX_LOG_INFO;
} else {
level = NGX_LOG_ALERT;
}
ngx_log_error(level, cycle->log, err, "select() failed");
if (err == NGX_EBADF) {
ngx_select_repair_fd_sets(cycle);
}
return NGX_ERROR;
}
if (ready == 0) {
if (timer != NGX_TIMER_INFINITE) {
return NGX_OK;
}
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"select() returned no events without timeout");
return NGX_ERROR;
}
nready = 0;
for (i = 0; i < nevents && ready; i++) {
c = event_index[i];
if (c->fd == -1) {
continue;
}
found = 0;
ev = c->read;
if (ev->active && FD_ISSET(c->fd, &work_read_fd_set)) {
found = 1;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"select read %d", c->fd);
ev->ready = 1;
ev->available = -1;
if (flags & NGX_POST_EVENTS) {
queue = ev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(ev, queue);
} else {
ev->handler(ev);
}
nready++;
}
ev = c->write;
if (ev->active && FD_ISSET(c->fd, &work_write_fd_set)) {
found = 1;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"select write %d", c->fd);
ev->ready = 1;
if (flags & NGX_POST_EVENTS) {
ngx_post_event(ev, &ngx_posted_events);
} else {
ev->handler(ev);
}
nready++;
}
if (found) {
ready--;
}
}
if (ready != 0) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "select ready != events");
}
return NGX_OK;
}
5. 事件循环核心
5.1 主事件循环
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t flags;
ngx_msec_t timer, delta;
if (ngx_timer_resolution) {
timer = NGX_TIMER_INFINITE;
flags = 0;
} else {
timer = ngx_event_find_timer();
flags = NGX_UPDATE_TIME;
#if (NGX_WIN32)
/* handle signals from master in case of network inactivity */
if (timer == NGX_TIMER_INFINITE || timer > 500) {
timer = 500;
}
#endif
}
if (ngx_use_accept_mutex) {
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--;
} else {
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
flags |= NGX_POST_EVENTS;
} else {
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
timer = ngx_accept_mutex_delay;
}
}
}
}
if (!ngx_queue_empty(&ngx_posted_next_events)) {
ngx_event_move_posted_next(cycle);
timer = 0;
}
delta = ngx_current_msec;
// 调用具体的事件处理函数
(void) ngx_process_events(cycle, timer, flags);
delta = ngx_current_msec - delta;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"timer delta: %M", delta);
// 处理投递的accept事件
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
// 处理定时器事件
if (delta) {
ngx_event_expire_timers();
}
// 处理投递的普通事件
ngx_event_process_posted(cycle, &ngx_posted_events);
}
5.2 事件投递机制
// 投递事件到队列
void
ngx_post_event(ngx_event_t *ev, ngx_queue_t *queue)
{
if (!ev->posted) {
ev->posted = 1;
ngx_queue_insert_tail(queue, &ev->queue);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ev->log, 0, "post event %p", ev);
} else {
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"update posted event %p", ev);
}
}
// 处理投递的事件
static void
ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted)
{
ngx_queue_t *q;
ngx_event_t *ev;
while (!ngx_queue_empty(posted)) {
q = ngx_queue_head(posted);
ev = ngx_queue_data(q, ngx_event_t, queue);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"posted event %p", ev);
ngx_delete_posted_event(ev);
ev->handler(ev);
}
}
6. 定时器管理
6.1 定时器结构
// 定时器红黑树
extern ngx_rbtree_t ngx_event_timer_rbtree;
extern ngx_rbtree_node_t ngx_event_timer_sentinel;
// 定时器操作宏
#define ngx_event_timer_set(ev, timer) \
do { \
ev->timer.key = ngx_current_msec + timer; \
ngx_rbtree_insert(&ngx_event_timer_rbtree, &ev->timer); \
ev->timer_set = 1; \
} while (0)
#define ngx_event_timer_del(ev) \
do { \
ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer); \
ev->timer_set = 0; \
} while (0)
6.2 定时器处理
// 查找最近的定时器
ngx_msec_t
ngx_event_find_timer(void)
{
ngx_msec_int_t timer;
ngx_rbtree_node_t *node, *root, *sentinel;
if (ngx_event_timer_rbtree.root == &ngx_event_timer_sentinel) {
return NGX_TIMER_INFINITE;
}
root = ngx_event_timer_rbtree.root;
sentinel = ngx_event_timer_rbtree.sentinel;
node = ngx_rbtree_min(root, sentinel);
timer = (ngx_msec_int_t) (node->key - ngx_current_msec);
return (ngx_msec_t) (timer > 0 ? timer : 0);
}
// 处理过期定时器
void
ngx_event_expire_timers(void)
{
ngx_event_t *ev;
ngx_rbtree_node_t *node, *root, *sentinel;
sentinel = ngx_event_timer_rbtree.sentinel;
for ( ;; ) {
root = ngx_event_timer_rbtree.root;
if (root == sentinel) {
return;
}
node = ngx_rbtree_min(root, sentinel);
/* node->key > ngx_current_msec */
if ((ngx_msec_int_t) (node->key - ngx_current_msec) > 0) {
return;
}
ev = ngx_rbtree_data(node, ngx_event_t, timer);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"event timer del: %d: %M",
ngx_event_ident(ev->data), ev->timer.key);
ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer);
#if (NGX_DEBUG)
ev->timer.left = NULL;
ev->timer.right = NULL;
ev->timer.parent = NULL;
#endif
ev->timer_set = 0;
ev->timedout = 1;
ev->handler(ev);
}
}
7. 连接管理
7.1 连接池管理
// 获取空闲连接
ngx_connection_t *
ngx_get_connection(ngx_socket_t s, ngx_log_t *log)
{
ngx_uint_t instance;
ngx_event_t *rev, *wev;
ngx_connection_t *c;
/* disable warning: Win32 SOCKET is u_int while UNIX socket is int */
if (ngx_cycle->free_connections) {
c = ngx_cycle->free_connections;
if (c->fd != -1) {
ngx_log_error(NGX_LOG_ALERT, log, 0,
ngx_get_connection_n " reused connection");
}
ngx_cycle->free_connections = c->data;
ngx_cycle->free_connection_n--;
if (ngx_cycle->files && (ngx_uint_t) s >= ngx_cycle->files_n) {
ngx_log_error(NGX_LOG_ALERT, log, 0,
"the new socket has number %d, "
"but only %ui files are available",
s, ngx_cycle->files_n);
return NULL;
}
} else if (ngx_cycle->free_connection_n == 0) {
ngx_log_error(NGX_LOG_ALERT, log, 0,
"%ui worker_connections are not enough",
ngx_cycle->connection_n);
return NULL;
} else {
c = NULL;
}
if (c == NULL) {
c = &ngx_cycle->connections[ngx_cycle->connection_n
- ngx_cycle->free_connection_n];
/* ngx_connection_t is allocated in a single ngx_palloc() */
rev = &ngx_cycle->read_events[ngx_cycle->connection_n
- ngx_cycle->free_connection_n];
wev = &ngx_cycle->write_events[ngx_cycle->connection_n
- ngx_cycle->free_connection_n];
ngx_cycle->free_connection_n--;
}
/* we have a pool here */
rev = c->read;
wev = c->write;
ngx_memzero(c, sizeof(ngx_connection_t));
c->read = rev;
c->write = wev;
c->fd = s;
c->log = log;
instance = rev->instance;
ngx_memzero(rev, sizeof(ngx_event_t));
ngx_memzero(wev, sizeof(ngx_event_t));
rev->instance = !instance;
wev->instance = !instance;
rev->index = NGX_INVALID_INDEX;
wev->index = NGX_INVALID_INDEX;
rev->data = c;
wev->data = c;
wev->write = 1;
return c;
}
// 释放连接
void
ngx_free_connection(ngx_connection_t *c)
{
c->data = ngx_cycle->free_connections;
ngx_cycle->free_connections = c;
ngx_cycle->free_connection_n++;
if (ngx_cycle->files && c->fd != -1) {
ngx_cycle->files[c->fd] = NULL;
}
c->fd = (ngx_socket_t) -1;
}
8. 事件模块时序图
8.1 事件循环时序
sequenceDiagram
participant W as Worker Process
participant EL as Event Loop
participant EM as Event Mechanism
participant TM as Timer Manager
participant EH as Event Handler
W->>EL: ngx_process_events_and_timers()
EL->>TM: ngx_event_find_timer()
TM-->>EL: Next timer timeout
EL->>EL: Check accept mutex
alt Accept mutex acquired
EL->>EL: Set POST_EVENTS flag
end
EL->>EM: ngx_process_events(timer, flags)
EM->>EM: epoll_wait() / kqueue() / select()
EM->>EM: Process ready events
loop For each ready event
EM->>EH: ev->handler(ev)
EH->>EH: Process event
EH-->>EM: Event processed
end
EM-->>EL: Events processed
EL->>EL: Process posted accept events
EL->>EL: Release accept mutex
EL->>TM: ngx_event_expire_timers()
TM->>TM: Process expired timers
loop For each expired timer
TM->>EH: ev->handler(ev)
EH->>EH: Handle timeout
EH-->>TM: Timeout handled
end
EL->>EL: Process posted events
EL-->>W: Event loop iteration complete
8.2 连接建立时序
sequenceDiagram
participant C as Client
participant L as Listen Socket
participant W as Worker
participant CP as Connection Pool
participant EV as Event System
C->>L: TCP SYN
L->>EV: Accept event ready
EV->>W: ngx_event_accept()
W->>L: accept()
L-->>W: New socket fd
W->>CP: ngx_get_connection(fd)
CP->>CP: Allocate connection
CP-->>W: Connection object
W->>W: Initialize connection
W->>EV: Add read/write events
EV->>EV: Register events with mechanism
W->>C: TCP SYN-ACK
C->>W: TCP ACK
Note over W: Connection established
C->>W: HTTP Request
W->>EV: Read event triggered
EV->>W: ngx_http_process_request_line()
W->>W: Process HTTP request
9. 性能优化策略
9.1 事件机制选择
// 事件机制优先级选择
static char *
ngx_event_core_init_conf(ngx_cycle_t *cycle, void *conf)
{
ngx_event_conf_t *ecf = conf;
ngx_module_t *module;
module = NULL;
#if (NGX_HAVE_EPOLL) && !(NGX_TEST_BUILD_EPOLL)
// Linux优先选择epoll
fd = epoll_create(100);
if (fd != -1) {
(void) close(fd);
module = &ngx_epoll_module;
}
#endif
#if (NGX_HAVE_KQUEUE)
// BSD/macOS优先选择kqueue
module = &ngx_kqueue_module;
#endif
#if (NGX_HAVE_DEVPOLL)
// Solaris选择/dev/poll
module = &ngx_devpoll_module;
#endif
#if (NGX_HAVE_SELECT)
// 通用fallback选择select
if (module == NULL) {
module = &ngx_select_module;
}
#endif
// 设置选择的事件模块
ngx_conf_init_uint_value(ecf->use, module->ctx_index);
return NGX_CONF_OK;
}
9.2 事件处理优化
-
批量事件处理
- 一次系统调用获取多个事件
- 减少用户态/内核态切换开销
-
事件投递机制
- 延迟处理非紧急事件
- 优先处理accept事件
-
连接复用
- 连接池避免频繁创建/销毁
- Keep-alive减少连接建立开销
-
定时器优化
- 红黑树高效管理定时器
- 批量处理过期定时器
9.3 内存优化
-
预分配事件结构
- 启动时预分配事件和连接对象
- 避免运行时内存分配
-
零拷贝技术
- sendfile直接从文件到socket
- splice在管道间传输数据
-
缓存友好设计
- 事件结构紧凑排列
- 热数据集中存放
10. 实战应用示例
10.1 自定义事件处理器
// 自定义读事件处理器
static void
my_read_event_handler(ngx_event_t *rev)
{
ssize_t n;
ngx_connection_t *c;
u_char buffer[1024];
c = rev->data;
if (rev->timedout) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");
ngx_close_connection(c);
return;
}
n = c->recv(c, buffer, sizeof(buffer));
if (n == NGX_AGAIN) {
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_close_connection(c);
}
return;
}
if (n == NGX_ERROR || n == 0) {
ngx_close_connection(c);
return;
}
// 处理接收到的数据
process_data(buffer, n);
// 继续监听读事件
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_close_connection(c);
}
}
// 设置定时器
static void
set_connection_timeout(ngx_connection_t *c, ngx_msec_t timeout)
{
ngx_event_t *rev;
rev = c->read;
if (rev->timer_set) {
ngx_del_timer(rev);
}
ngx_add_timer(rev, timeout);
}
10.2 事件驱动服务器框架
// 简化的事件驱动服务器
static void
my_event_server_init(ngx_cycle_t *cycle)
{
ngx_listening_t *ls;
ngx_connection_t *c;
// 创建监听socket
ls = ngx_create_listening(cycle->pool, &addr, sizeof(struct sockaddr_in));
if (ls == NULL) {
return;
}
ls->handler = my_accept_handler;
ls->log = cycle->log;
// 打开监听socket
if (ngx_open_listening_sockets(cycle) != NGX_OK) {
return;
}
// 配置监听socket
ngx_configure_listening_sockets(cycle);
}
static void
my_accept_handler(ngx_event_t *ev)
{
socklen_t socklen;
ngx_err_t err;
ngx_log_t *log;
ngx_uint_t level;
ngx_socket_t s;
ngx_event_t *rev, *wev;
ngx_listening_t *ls;
ngx_connection_t *c, *lc;
struct sockaddr *sa;
u_char sa_buf[NGX_SOCKADDRLEN];
lc = ev->data;
ls = lc->listening;
ev->ready = 0;
do {
socklen = NGX_SOCKADDRLEN;
s = accept(lc->fd, (struct sockaddr *) sa_buf, &socklen);
if (s == (ngx_socket_t) -1) {
err = ngx_socket_errno;
if (err == NGX_EAGAIN) {
return;
}
level = NGX_LOG_ALERT;
if (err == NGX_ECONNABORTED) {
level = NGX_LOG_ERR;
}
ngx_log_error(level, ev->log, err, "accept() failed");
return;
}
// 获取连接对象
c = ngx_get_connection(s, ev->log);
if (c == NULL) {
if (ngx_close_socket(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
ngx_close_socket_n " failed");
}
return;
}
// 设置连接属性
c->pool = ngx_create_pool(ls->pool_size, ev->log);
if (c->pool == NULL) {
ngx_close_accepted_connection(c);
return;
}
// 设置事件处理器
rev = c->read;
wev = c->write;
rev->handler = my_read_event_handler;
wev->handler = my_write_event_handler;
// 添加读事件
if (ngx_add_event(rev, NGX_READ_EVENT, 0) != NGX_OK) {
ngx_close_accepted_connection(c);
return;
}
// 设置超时
ngx_add_timer(rev, 60000);
} while (ev->available);
}
11. 调试和监控
11.1 事件调试
// 事件调试宏
#if (NGX_DEBUG)
#define ngx_event_debug(ev, fmt, ...) \
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, (ev)->log, 0, \
"event %p: " fmt, ev, ##__VA_ARGS__)
#else
#define ngx_event_debug(ev, fmt, ...)
#endif
// 连接状态监控
static void
ngx_connection_debug(ngx_connection_t *c)
{
ngx_log_debug6(NGX_LOG_DEBUG_EVENT, c->log, 0,
"connection: fd:%d read:%d write:%d "
"pool:%p sent:%O buffered:%O",
c->fd, c->read->ready, c->write->ready,
c->pool, c->sent, c->buffered);
}
11.2 性能监控
// 事件统计
typedef struct {
ngx_atomic_t connections;
ngx_atomic_t accepts;
ngx_atomic_t handled;
ngx_atomic_t requests;
ngx_atomic_t reading;
ngx_atomic_t writing;
ngx_atomic_t waiting;
} ngx_event_stats_t;
// 更新统计信息
static void
ngx_event_update_stats(ngx_event_t *ev)
{
if (ev->accept) {
ngx_atomic_fetch_add(ngx_stat_accepts, 1);
}
if (ev->write) {
ngx_atomic_fetch_add(ngx_stat_writing, 1);
} else {
ngx_atomic_fetch_add(ngx_stat_reading, 1);
}
}
12. 总结
Nginx事件模块的设计体现了以下核心特点:
12.1 设计优势
- 高效事件机制: 支持epoll、kqueue等高性能事件机制
- 异步非阻塞: 单线程处理大量并发连接
- 事件驱动: 基于事件的响应式编程模型
- 跨平台抽象: 统一的事件接口,支持多种操作系统
12.2 性能特点
- 高并发: 单个Worker可处理数万并发连接
- 低延迟: 事件就绪即处理,响应迅速
- 资源高效: 内存和CPU使用效率高
- 可扩展: 支持多Worker进程并行处理
12.3 应用价值
- 为HTTP、Mail、Stream等上层模块提供高效的事件基础设施
- 统一的异步编程模型,简化模块开发
- 优秀的性能表现,支撑高负载应用
- 灵活的扩展机制,支持自定义事件处理
通过深入理解事件模块的实现原理,开发者可以更好地利用Nginx的事件驱动架构,开发高性能的网络应用。