概述
CPython 解析器负责将 Python 源代码转换为抽象语法树(AST),是编译过程的第一个关键阶段。本章深入分析解析器的架构、词法分析、语法分析和 AST 构建过程。
1. 解析器架构总览
1.1 解析流程架构图
graph TB
subgraph "输入层"
A[Python 源代码]
B[文件/字符串/交互式输入]
end
subgraph "词法分析层"
C[Token 生成器]
D[词法状态机]
E[Token 队列]
end
subgraph "语法分析层"
F[PEG 解析器]
G[语法规则引擎]
H[错误恢复机制]
end
subgraph "AST 构建层"
I[AST 节点工厂]
J[语法树验证]
K[符号表构建]
end
subgraph "输出层"
L[抽象语法树 AST]
M[编译器接口]
end
A --> C
B --> C
C --> D
D --> E
E --> F
F --> G
G --> H
H --> I
I --> J
J --> K
K --> L
L --> M
1.2 核心组件文件
- Parser/tokenizer/ - 词法分析器实现
- Parser/pegen.c - PEG 解析器核心
- Parser/Python.asdl - AST 节点定义
- Parser/asdl.py - ASDL 编译器
- Python/ast.c - AST 节点实现
2. 词法分析器实现
2.1 Token 结构定义
/* Parser/pegen.h - Token 结构定义 */
typedef struct {
int type; // Token 类型
PyObject *bytes; // Token 字节内容
int level; // 缩进级别
int lineno; // 行号
int col_offset; // 列偏移
int end_lineno; // 结束行号
int end_col_offset; // 结束列偏移
Memo *memo; // 记忆化缓存
PyObject *metadata; // 元数据
} Token;
/* Token 类型枚举 */
typedef enum {
ENDMARKER = 0,
NAME = 1,
NUMBER = 2,
STRING = 3,
NEWLINE = 4,
INDENT = 5,
DEDENT = 6,
LPAR = 7, // (
RPAR = 8, // )
LSQB = 9, // [
RSQB = 10, // ]
COLON = 11, // :
COMMA = 12, // ,
SEMI = 13, // ;
PLUS = 14, // +
MINUS = 15, // -
STAR = 16, // *
SLASH = 17, // /
VBAR = 18, // |
AMPER = 19, // &
LESS = 20, // <
GREATER = 21, // >
EQUAL = 22, // =
DOT = 23, // .
PERCENT = 24, // %
LBRACE = 25, // {
RBRACE = 26, // }
EQEQUAL = 27, // ==
NOTEQUAL = 28, // !=
LESSEQUAL = 29, // <=
GREATEREQUAL = 30, // >=
TILDE = 31, // ~
CIRCUMFLEX = 32, // ^
LEFTSHIFT = 33, // <<
RIGHTSHIFT = 34, // >>
DOUBLESTAR = 35, // **
PLUSEQUAL = 36, // +=
MINEQUAL = 37, // -=
STAREQUAL = 38, // *=
SLASHEQUAL = 39, // /=
PERCENTEQUAL = 40, // %=
// ... 更多 Token 类型
} TokenType;
2.2 词法分析器状态机
/* Parser/tokenizer/tokenizer.c - 词法分析器核心 */
struct tok_state {
/* 输入缓冲区 */
char *buf; // 当前缓冲区
char *cur; // 当前位置指针
char *inp; // 输入结束位置
char *end; // 缓冲区结束位置
char *start; // Token 开始位置
/* 行信息 */
int lineno; // 当前行号
int level; // 缩进级别
int altlevel; // 替代缩进级别
int decoding_erred; // 解码错误标志
int decoding_readline; // 解码行读取标志
/* 缩进栈 */
int indstack[MAXINDENT]; // 缩进栈
int indent; // 当前缩进索引
/* 待处理 Token */
int pendin; // 待处理缩进数量
/* 文件信息 */
PyObject *filename; // 文件名
PyObject *decoding_buffer; // 解码缓冲区
/* 交互式输入 */
int interactive; // 交互式标志
int done; // 完成标志
/* 编码信息 */
const char *encoding; // 源文件编码
int decoding_state; // 解码状态
/* 错误处理 */
char *error_ret; // 错误返回信息
PyObject *decoding_erred; // 解码错误对象
};
/* 主要的 Token 获取函数 */
int
PyTokenizer_Get(struct tok_state *tok, const char **p_start, const char **p_end)
{
int c;
int blankline, nonascii;
nextline:
tok->start = NULL;
blankline = 0;
/* 获取下一行 */
if (tok->decoding_erred) {
return ERRORTOKEN;
}
/* 处理缩进 */
if (tok->atbol) {
int col = 0;
int altcol = 0;
tok->atbol = 0;
/* 跳过空白和注释 */
for (;;) {
c = tok_nextc(tok);
if (c == ' ') {
col++, altcol++;
}
else if (c == '\t') {
col = (col / tok->tabsize + 1) * tok->tabsize;
altcol = (altcol / ALTTABSIZE + 1) * ALTTABSIZE;
}
else if (c == '\014') /* Control-L (formfeed) */
col = altcol = 0; /* For Emacs users */
else
break;
}
tok_backup(tok, c);
if (c == '#' || c == '\n' || c == '\\') {
/* 注释或空行 */
if (c == '#') {
/* 跳过注释 */
do {
c = tok_nextc(tok);
} while (c != EOF && c != '\n');
tok_backup(tok, c);
}
if (c == '\n') {
tok->atbol = 1;
if (blankline || tok->level > 0) {
goto nextline;
}
*p_start = tok->cur - 1;
*p_end = tok->cur;
return NEWLINE;
}
}
/* 处理缩进变化 */
if (col != tok->indstack[tok->indent]) {
if (col > tok->indstack[tok->indent]) {
/* 增加缩进 */
if (tok->indent+1 >= MAXINDENT) {
tok->done = E_TOODEEP;
tok->cur = tok->inp;
return ERRORTOKEN;
}
tok->indstack[++tok->indent] = col;
return INDENT;
}
else {
/* 减少缩进 */
while (tok->indent > 0 &&
col < tok->indstack[tok->indent]) {
tok->pendin--;
tok->indent--;
}
if (col != tok->indstack[tok->indent]) {
tok->done = E_DEDENT;
tok->cur = tok->inp;
return ERRORTOKEN;
}
if (tok->pendin < 0) {
tok->pendin++;
return DEDENT;
}
}
}
}
/* 处理待处理的 DEDENT */
if (tok->pendin != 0) {
if (tok->pendin < 0) {
tok->pendin++;
return DEDENT;
}
else {
tok->pendin--;
return INDENT;
}
}
/* 开始新的 Token */
again:
tok->start = tok->cur;
c = tok_nextc(tok);
/* 处理各种 Token 类型 */
switch (c) {
case EOF:
return ENDMARKER;
case '\n':
tok->atbol = 1;
if (blankline || tok->level > 0) {
goto nextline;
}
*p_start = tok->start;
*p_end = tok->cur - 1; /* Leave '\n' out of the string */
return NEWLINE;
case '#':
/* 注释 */
while (c != EOF && c != '\n') {
c = tok_nextc(tok);
}
tok_backup(tok, c);
goto again;
case ' ':
case '\t':
case '\014': /* Form feed */
goto again;
case '\\':
/* 行连接 */
c = tok_nextc(tok);
if (c != '\n') {
tok->done = E_LINECONT;
return ERRORTOKEN;
}
c = tok_nextc(tok);
if (c == EOF) {
tok->done = E_EOF;
return ERRORTOKEN;
}
goto again;
/* 单字符 Token */
case '(':
return LPAR;
case ')':
return RPAR;
case '[':
return LSQB;
case ']':
return RSQB;
case ':':
return COLON;
case ',':
return COMMA;
case ';':
return SEMI;
/* 多字符操作符 */
case '+':
c = tok_nextc(tok);
if (c == '=') {
return PLUSEQUAL;
}
tok_backup(tok, c);
return PLUS;
case '-':
c = tok_nextc(tok);
if (c == '=') {
return MINEQUAL;
} else if (c == '>') {
return RARROW;
}
tok_backup(tok, c);
return MINUS;
case '*':
c = tok_nextc(tok);
if (c == '*') {
return DOUBLESTAR;
} else if (c == '=') {
return STAREQUAL;
}
tok_backup(tok, c);
return STAR;
/* 字符串字面量 */
case '\'':
case '"':
return tok_get_str(tok, c);
/* 数字字面量 */
case '0': case '1': case '2': case '3': case '4':
case '5': case '6': case '7': case '8': case '9':
return tok_get_number(tok, c);
/* 标识符和关键字 */
default:
if (Py_UNICODE_ISALPHA(c) || c == '_') {
return tok_get_name(tok, c);
}
/* 未知字符 */
return ERRORTOKEN;
}
}
2.3 字符串和数字解析
/* 字符串字面量解析 */
static int
tok_get_str(struct tok_state *tok, int quote)
{
int c;
int triple = 0; // 三引号标志
int end_quote = quote;
c = tok_nextc(tok);
if (c == quote) {
c = tok_nextc(tok);
if (c == quote) {
/* 三引号字符串 */
triple = 1;
} else {
/* 空字符串 */
tok_backup(tok, c);
*p_start = tok->start;
*p_end = tok->cur;
return STRING;
}
}
/* 读取字符串内容 */
for (;;) {
c = tok_nextc(tok);
if (c == EOF) {
if (triple) {
tok->done = E_EOFS;
} else {
tok->done = E_EOLS;
}
return ERRORTOKEN;
}
if (c == end_quote) {
if (triple) {
/* 检查三引号结束 */
c = tok_nextc(tok);
if (c != end_quote) {
tok_backup(tok, c);
continue;
}
c = tok_nextc(tok);
if (c != end_quote) {
tok_backup(tok, c);
tok_backup(tok, end_quote);
continue;
}
}
break; /* 字符串结束 */
}
if (c == '\\') {
/* 转义序列 */
c = tok_nextc(tok);
if (c == EOF) {
tok->done = E_EOLS;
return ERRORTOKEN;
}
}
if (!triple && (c == '\n' || c == '\r')) {
tok->done = E_EOLS;
return ERRORTOKEN;
}
}
*p_start = tok->start;
*p_end = tok->cur;
return STRING;
}
/* 数字字面量解析 */
static int
tok_get_number(struct tok_state *tok, int c)
{
int nonzero_digits = 0;
int digits = 0;
/* 整数部分 */
if (c == '0') {
/* 可能是八进制、十六进制或二进制 */
c = tok_nextc(tok);
if (c == 'x' || c == 'X') {
/* 十六进制 */
return tok_get_hex_number(tok);
}
if (c == 'o' || c == 'O') {
/* 八进制 */
return tok_get_oct_number(tok);
}
if (c == 'b' || c == 'B') {
/* 二进制 */
return tok_get_bin_number(tok);
}
tok_backup(tok, c);
nonzero_digits = 1;
}
/* 读取十进制数字 */
while (Py_UNICODE_ISDIGIT(c)) {
if (c != '0') {
nonzero_digits = 1;
}
digits++;
c = tok_nextc(tok);
}
/* 小数点 */
if (c == '.') {
c = tok_nextc(tok);
while (Py_UNICODE_ISDIGIT(c)) {
c = tok_nextc(tok);
}
}
/* 科学计数法 */
if (c == 'e' || c == 'E') {
c = tok_nextc(tok);
if (c == '+' || c == '-') {
c = tok_nextc(tok);
}
if (!Py_UNICODE_ISDIGIT(c)) {
tok->done = E_TOKEN;
return ERRORTOKEN;
}
while (Py_UNICODE_ISDIGIT(c)) {
c = tok_nextc(tok);
}
}
/* 复数后缀 */
if (c == 'j' || c == 'J') {
c = tok_nextc(tok);
}
tok_backup(tok, c);
*p_start = tok->start;
*p_end = tok->cur;
return NUMBER;
}
3. PEG 解析器实现
3.1 PEG 解析器核心结构
/* Parser/pegen.h - 解析器状态结构 */
typedef struct {
struct tok_state *tok; // 词法分析器状态
Token **tokens; // Token 数组
int mark; // 当前解析位置
int fill, size; // Token 缓冲区信息
PyArena *arena; // 内存区域
KeywordToken **keywords; // 关键字表
char **soft_keywords; // 软关键字表
int n_keyword_lists; // 关键字列表数量
int start_rule; // 起始规则
int *errcode; // 错误码
int parsing_started; // 解析开始标志
PyObject* normalize; // 规范化函数
int starting_lineno; // 起始行号
int starting_col_offset; // 起始列偏移
int error_indicator; // 错误指示器
int flags; // 解析标志
int feature_version; // 特性版本
growable_comment_array type_ignore_comments; // 类型忽略注释
Token *known_err_token; // 已知错误 Token
int level; // 嵌套级别
int call_invalid_rules; // 调用无效规则标志
int debug; // 调试标志
} Parser;
/* 记忆化结构 */
typedef struct _memo {
int type; // 规则类型
void *node; // AST 节点
int mark; // 位置标记
struct _memo *next; // 下一个记忆
} Memo;
3.2 核心解析函数
/* Parser/pegen.c - 主要解析函数 */
mod_ty
_PyPegen_run_parser_from_string(const char *str, int start_rule,
PyObject *filename, PyCompilerFlags *flags,
PyArena *arena)
{
struct tok_state *tok;
Parser *p;
mod_ty result = NULL;
/* 创建词法分析器 */
tok = PyTokenizer_FromString(str, 1);
if (tok == NULL) {
if (PyErr_Occurred()) {
return NULL;
}
return NULL;
}
/* 设置词法分析器参数 */
tok->filename = filename;
Py_INCREF(filename);
/* 创建解析器 */
p = _PyPegen_Parser_New(tok, start_rule, flags, NULL, arena);
if (p == NULL) {
PyTokenizer_Free(tok);
return NULL;
}
/* 执行解析 */
result = _PyPegen_run_parser(p);
/* 清理资源 */
_PyPegen_Parser_Free(p);
PyTokenizer_Free(tok);
return result;
}
/* 运行解析器主循环 */
mod_ty
_PyPegen_run_parser(Parser *p)
{
mod_ty res = NULL;
int errcode = 0;
p->errcode = &errcode;
/* 根据起始规则选择解析函数 */
switch (p->start_rule) {
case Py_file_input:
res = file_rule(p);
break;
case Py_single_input:
res = interactive_rule(p);
break;
case Py_eval_input:
res = eval_rule(p);
break;
case Py_func_type_input:
res = func_type_rule(p);
break;
default:
PyErr_Format(PyExc_SystemError,
"invalid start rule %d", p->start_rule);
return NULL;
}
return res;
}
/* Token 获取和管理 */
static Token *
_PyPegen_get_token(Parser *p, int type)
{
Token *new_token;
/* 确保有足够的 Token 缓冲区 */
if (p->fill == p->size) {
int newsize = p->size * 2;
Token **new_tokens = PyMem_Resize(p->tokens, Token *, newsize);
if (new_tokens == NULL) {
p->error_indicator = 1;
return NULL;
}
p->tokens = new_tokens;
p->size = newsize;
}
/* 获取下一个 Token */
new_token = PyMem_Malloc(sizeof(Token));
if (new_token == NULL) {
p->error_indicator = 1;
return NULL;
}
/* 从词法分析器获取 Token */
if (PyTokenizer_Get(p->tok, &(new_token->start), &(new_token->end)) < 0) {
p->error_indicator = 1;
PyMem_Free(new_token);
return NULL;
}
new_token->type = p->tok->type;
new_token->bytes = PyBytes_FromStringAndSize(
new_token->start,
new_token->end - new_token->start
);
new_token->level = p->tok->level;
new_token->lineno = p->tok->lineno;
new_token->col_offset = p->tok->col_offset;
new_token->end_lineno = p->tok->end_lineno;
new_token->end_col_offset = p->tok->end_col_offset;
p->tokens[p->fill] = new_token;
p->fill++;
if (type != -1 && new_token->type != type) {
return NULL;
}
return new_token;
}
/* 回溯和位置管理 */
#define _PyPegen_mark(p) ((p)->mark)
static void
_PyPegen_reset(Parser *p, int mark)
{
p->mark = mark;
}
static Token *
_PyPegen_peek_token(Parser *p, int n)
{
if (p->mark + n >= p->fill) {
/* 需要更多 Token */
for (int i = p->fill; i <= p->mark + n; i++) {
if (_PyPegen_get_token(p, -1) == NULL) {
return NULL;
}
}
}
return p->tokens[p->mark + n];
}
4. 语法规则定义
4.1 Python.asdl - AST 定义
-- ASDL's 4 builtin types are:
-- identifier, int, string, constant
module Python
{
-- 顶层模块类型
mod = Module(stmt* body, type_ignore* type_ignores)
| Interactive(stmt* body)
| Expression(expr body)
| FunctionType(expr* argtypes, expr returns)
-- 语句类型
stmt = FunctionDef(identifier name, arguments args,
stmt* body, expr* decorator_list, expr? returns,
string? type_comment, type_param* type_params)
| AsyncFunctionDef(identifier name, arguments args,
stmt* body, expr* decorator_list, expr? returns,
string? type_comment, type_param* type_params)
| ClassDef(identifier name,
expr* bases,
keyword* keywords,
stmt* body,
expr* decorator_list,
type_param* type_params)
| Return(expr? value)
| Delete(expr* targets)
| Assign(expr* targets, expr value, string? type_comment)
| AnnAssign(expr target, expr annotation, expr? value, int simple)
| AugAssign(expr target, operator op, expr value)
| For(expr target, expr iter, stmt* body, stmt* orelse,
string? type_comment)
| AsyncFor(expr target, expr iter, stmt* body, stmt* orelse,
string? type_comment)
| While(expr test, stmt* body, stmt* orelse)
| If(expr test, stmt* body, stmt* orelse)
| With(withitem* items, stmt* body, string? type_comment)
| AsyncWith(withitem* items, stmt* body, string? type_comment)
| Raise(expr? exc, expr? cause)
| Try(stmt* body, excepthandler* handlers, stmt* orelse,
stmt* finalbody)
| Assert(expr test, expr? msg)
| Import(alias* names)
| ImportFrom(identifier? module, alias* names, int? level)
| Global(identifier* names)
| Nonlocal(identifier* names)
| Expr(expr value)
| Pass | Break | Continue
-- 表达式类型
expr = BoolOp(boolop op, expr* values)
| NamedExpr(expr target, expr value)
| BinOp(expr left, operator op, expr right)
| UnaryOp(unaryop op, expr operand)
| Lambda(arguments args, expr body)
| IfExp(expr test, expr body, expr orelse)
| Dict(expr* keys, expr* values)
| Set(expr* elts)
| ListComp(expr elt, comprehension* generators)
| SetComp(expr elt, comprehension* generators)
| DictComp(expr key, expr value, comprehension* generators)
| GeneratorExp(expr elt, comprehension* generators)
| Await(expr value)
| Yield(expr? value)
| YieldFrom(expr value)
| Compare(expr left, cmpop* ops, expr* comparators)
| Call(expr func, expr* args, keyword* keywords)
| FormattedValue(expr value, int? conversion, expr? format_spec)
| JoinedStr(expr* values)
| Constant(constant value, string? kind)
| Attribute(expr value, identifier attr, expr_context ctx)
| Subscript(expr value, expr slice, expr_context ctx)
| Starred(expr value, expr_context ctx)
| Name(identifier id, expr_context ctx)
| List(expr* elts, expr_context ctx)
| Tuple(expr* elts, expr_context ctx)
| Slice(expr? lower, expr? upper, expr? step)
-- 更多语法结构定义...
}
4.2 语法规则生成
/* 根据 Grammar/python.gram 生成的解析规则 */
// file: file_input
static mod_ty
file_rule(Parser *p)
{
D(fprintf(stderr, "Enter file_rule\n"));
if (p->level++ == MAXSTACK) {
p->error_indicator = 1;
PyErr_SetString(PyExc_MemoryError, "too much recursion");
}
if (p->error_indicator) {
D(fprintf(stderr, "Leave file_rule (error indicator set)\n"));
return NULL;
}
mod_ty res = NULL;
int mark = p->mark;
{ // statements ENDMARKER
if (p->error_indicator) {
D(fprintf(stderr, "Alternative failed\n"));
p->level--;
return NULL;
}
D(fprintf(stderr, "Trying file: statements ENDMARKER\n"));
asdl_stmt_seq* a;
Token* endmarker;
if (
(a = statements_rule(p)) // statements
&&
(endmarker = _PyPegen_expect_token(p, ENDMARKER)) // token='ENDMARKER'
)
{
D(fprintf(stderr, "Matched file: statements ENDMARKER\n"));
res = _Py_Module(a, NULL, p->arena);
if (res == NULL && PyErr_Occurred()) {
p->error_indicator = 1;
D(fprintf(stderr, "Leave file_rule (error indicator set)\n"));
return NULL;
}
goto done;
}
p->mark = mark;
D(fprintf(stderr, "Failed\n"));
}
res = NULL;
done:
p->level--;
return res;
}
// 表达式解析规则
static expr_ty
expression_rule(Parser *p)
{
D(fprintf(stderr, "Enter expression_rule\n"));
if (p->level++ == MAXSTACK) {
p->error_indicator = 1;
PyErr_SetString(PyExc_MemoryError, "too much recursion");
}
if (p->error_indicator) {
D(fprintf(stderr, "Leave expression_rule (error indicator set)\n"));
return NULL;
}
expr_ty res = NULL;
int mark = p->mark;
{ // disjunction 'if' disjunction 'else' expression
if (p->error_indicator) {
D(fprintf(stderr, "Alternative failed\n"));
p->level--;
return NULL;
}
D(fprintf(stderr, "Trying expression: disjunction 'if' disjunction 'else' expression\n"));
expr_ty a;
Token* if_token;
expr_ty b;
Token* else_token;
expr_ty c;
if (
(a = disjunction_rule(p)) // disjunction
&&
(if_token = _PyPegen_expect_token(p, NAME)) && (strcmp(PyBytes_AsString(if_token->bytes), "if") == 0)
&&
(b = disjunction_rule(p)) // disjunction
&&
(else_token = _PyPegen_expect_token(p, NAME)) && (strcmp(PyBytes_AsString(else_token->bytes), "else") == 0)
&&
(c = expression_rule(p)) // expression
)
{
D(fprintf(stderr, "Matched expression: disjunction 'if' disjunction 'else' expression\n"));
Token *token = _PyPegen_get_last_nonnwhitespace_token(p);
if (token == NULL) {
D(fprintf(stderr, "Leave expression_rule (error indicator set)\n"));
return NULL;
}
int end_lineno = token->end_lineno;
int end_col_offset = token->end_col_offset;
res = _Py_IfExp(b, a, c, EXTRA);
if (res == NULL && PyErr_Occurred()) {
p->error_indicator = 1;
D(fprintf(stderr, "Leave expression_rule (error indicator set)\n"));
return NULL;
}
goto done;
}
p->mark = mark;
D(fprintf(stderr, "Failed\n"));
}
{ // disjunction
if (p->error_indicator) {
D(fprintf(stderr, "Alternative failed\n"));
p->level--;
return NULL;
}
D(fprintf(stderr, "Trying expression: disjunction\n"));
expr_ty disjunction_var;
if (
(disjunction_var = disjunction_rule(p)) // disjunction
)
{
D(fprintf(stderr, "Matched expression: disjunction\n"));
res = disjunction_var;
goto done;
}
p->mark = mark;
D(fprintf(stderr, "Failed\n"));
}
res = NULL;
done:
p->level--;
return res;
}
5. AST 节点构建
5.1 AST 节点工厂函数
/* Python/ast.c - AST 节点创建函数 */
/* 创建模块节点 */
mod_ty
_Py_Module(asdl_stmt_seq *body, asdl_type_ignore_seq *type_ignores, PyArena *arena)
{
mod_ty p;
p = (mod_ty)PyArena_Malloc(arena, sizeof(*p));
if (!p)
return NULL;
p->kind = Module_kind;
p->v.Module.body = body;
p->v.Module.type_ignores = type_ignores;
return p;
}
/* 创建函数定义节点 */
stmt_ty
_Py_FunctionDef(identifier name, arguments_ty args, asdl_stmt_seq *body,
asdl_expr_seq *decorator_list, expr_ty returns,
string type_comment, asdl_type_param_seq *type_params,
int lineno, int col_offset, int end_lineno,
int end_col_offset, PyArena *arena)
{
stmt_ty p;
if (!name) {
PyErr_SetString(PyExc_ValueError,
"field name is required for FunctionDef");
return NULL;
}
if (!args) {
PyErr_SetString(PyExc_ValueError,
"field args is required for FunctionDef");
return NULL;
}
p = (stmt_ty)PyArena_Malloc(arena, sizeof(*p));
if (!p)
return NULL;
p->kind = FunctionDef_kind;
p->v.FunctionDef.name = name;
p->v.FunctionDef.args = args;
p->v.FunctionDef.body = body;
p->v.FunctionDef.decorator_list = decorator_list;
p->v.FunctionDef.returns = returns;
p->v.FunctionDef.type_comment = type_comment;
p->v.FunctionDef.type_params = type_params;
p->lineno = lineno;
p->col_offset = col_offset;
p->end_lineno = end_lineno;
p->end_col_offset = end_col_offset;
return p;
}
/* 创建二元运算节点 */
expr_ty
_Py_BinOp(expr_ty left, operator_ty op, expr_ty right, int lineno,
int col_offset, int end_lineno, int end_col_offset, PyArena *arena)
{
expr_ty p;
if (!left) {
PyErr_SetString(PyExc_ValueError,
"field left is required for BinOp");
return NULL;
}
if (!op) {
PyErr_SetString(PyExc_ValueError,
"field op is required for BinOp");
return NULL;
}
if (!right) {
PyErr_SetString(PyExc_ValueError,
"field right is required for BinOp");
return NULL;
}
p = (expr_ty)PyArena_Malloc(arena, sizeof(*p));
if (!p)
return NULL;
p->kind = BinOp_kind;
p->v.BinOp.left = left;
p->v.BinOp.op = op;
p->v.BinOp.right = right;
p->lineno = lineno;
p->col_offset = col_offset;
p->end_lineno = end_lineno;
p->end_col_offset = end_col_offset;
return p;
}
/* 创建名称节点 */
expr_ty
_Py_Name(identifier id, expr_context_ty ctx, int lineno, int col_offset,
int end_lineno, int end_col_offset, PyArena *arena)
{
expr_ty p;
if (!id) {
PyErr_SetString(PyExc_ValueError,
"field id is required for Name");
return NULL;
}
if (!ctx) {
PyErr_SetString(PyExc_ValueError,
"field ctx is required for Name");
return NULL;
}
p = (expr_ty)PyArena_Malloc(arena, sizeof(*p));
if (!p)
return NULL;
p->kind = Name_kind;
p->v.Name.id = id;
p->v.Name.ctx = ctx;
p->lineno = lineno;
p->col_offset = col_offset;
p->end_lineno = end_lineno;
p->end_col_offset = end_col_offset;
return p;
}
5.2 AST 验证
/* Python/ast.c - AST 验证函数 */
int
_PyAST_Validate(mod_ty mod)
{
int res = 0;
struct validate_state state;
PyObject *dict = PyDict_New();
if (dict == NULL)
return 0;
state.dict = dict;
state.in_function = 0;
state.in_async_function = 0;
state.in_comprehension = 0;
state.in_class = 0;
switch (mod->kind) {
case Module_kind:
res = validate_stmts(state, mod->v.Module.body);
break;
case Interactive_kind:
res = validate_stmts(state, mod->v.Interactive.body);
break;
case Expression_kind:
res = validate_expr(&state, mod->v.Expression.body, Load);
break;
case FunctionType_kind:
res = validate_exprs(&state, mod->v.FunctionType.argtypes, Load, 0) &&
validate_expr(&state, mod->v.FunctionType.returns, Load);
break;
default:
PyErr_SetString(PyExc_SystemError, "impossible module kind");
res = 0;
}
Py_DECREF(dict);
return res;
}
/* 验证语句序列 */
static int
validate_stmts(struct validate_state *state, asdl_stmt_seq *seq)
{
int i;
for (i = 0; i < asdl_seq_LEN(seq); i++) {
stmt_ty stmt = (stmt_ty)asdl_seq_GET(seq, i);
if (!validate_stmt(state, stmt))
return 0;
}
return 1;
}
/* 验证单个语句 */
static int
validate_stmt(struct validate_state *state, stmt_ty stmt)
{
int i;
switch (stmt->kind) {
case FunctionDef_kind:
return validate_name(stmt->v.FunctionDef.name) &&
validate_arguments(state, stmt->v.FunctionDef.args) &&
validate_stmts(state, stmt->v.FunctionDef.body) &&
validate_exprs(state, stmt->v.FunctionDef.decorator_list, Load, 0) &&
(!stmt->v.FunctionDef.returns ||
validate_expr(state, stmt->v.FunctionDef.returns, Load)) &&
validate_type_params(state, stmt->v.FunctionDef.type_params);
case Assign_kind:
return validate_exprs(state, stmt->v.Assign.targets, Store, 0) &&
validate_expr(state, stmt->v.Assign.value, Load);
case Return_kind:
return !stmt->v.Return.value ||
validate_expr(state, stmt->v.Return.value, Load);
// 更多语句类型验证...
default:
PyErr_SetString(PyExc_SystemError, "unexpected statement");
return 0;
}
}
6. 错误处理和恢复
6.1 语法错误报告
/* Parser/pegen_errors.c - 语法错误处理 */
void
_PyPegen_raise_error(Parser *p, PyObject *errtype, const char *errmsg, ...)
{
if (p->error_indicator) {
return;
}
p->error_indicator = 1;
va_list va;
va_start(va, errmsg);
PyObject *error_msg = PyUnicode_FromFormatV(errmsg, va);
va_end(va);
if (error_msg == NULL) {
return;
}
/* 获取错误位置信息 */
Token *token = p->known_err_token;
if (token == NULL) {
token = _PyPegen_get_last_nonnwhitespace_token(p);
}
if (token != NULL) {
int lineno = token->lineno;
int col_offset = token->col_offset;
int end_lineno = token->end_lineno;
int end_col_offset = token->end_col_offset;
PyErr_SetExcInfo(errtype, error_msg, NULL);
/* 添加位置信息 */
if (PyErr_Occurred()) {
PyObject *type, *value, *traceback;
PyErr_Fetch(&type, &value, &traceback);
/* 创建 SyntaxError 对象 */
PyObject *args = PyTuple_New(2);
PyTuple_SET_ITEM(args, 0, error_msg);
PyObject *error_details = PyTuple_New(4);
PyTuple_SET_ITEM(error_details, 0, p->tok->filename);
PyTuple_SET_ITEM(error_details, 1, PyLong_FromLong(lineno));
PyTuple_SET_ITEM(error_details, 2, PyLong_FromLong(col_offset));
PyTuple_SET_ITEM(error_details, 3, PyUnicode_FromString(token->start));
PyTuple_SET_ITEM(args, 1, error_details);
PyObject *syntax_error = PyObject_CallObject(errtype, args);
PyErr_SetObject(errtype, syntax_error);
}
} else {
PyErr_SetObject(errtype, error_msg);
}
Py_DECREF(error_msg);
}
/* 期望特定 Token 的错误 */
Token *
_PyPegen_expect_token(Parser *p, int type)
{
Token *token = _PyPegen_get_token(p, type);
if (token != NULL) {
return token;
}
/* 生成期望 Token 错误 */
if (PyErr_Occurred()) {
return NULL;
}
Token *current_token = _PyPegen_get_token(p, -1);
if (current_token == NULL) {
return NULL;
}
const char *expected = _PyParser_TokenNames[type];
const char *got;
if (current_token->type == ENDMARKER) {
got = "end of file";
} else {
got = _PyParser_TokenNames[current_token->type];
}
_PyPegen_raise_error(p, PyExc_SyntaxError,
"expected '%s', got '%s'", expected, got);
return NULL;
}
6.2 错误恢复机制
/* 错误恢复策略 */
static int
_PyPegen_lookahead_with_string(Parser *p, int positive, void *(*rule)(Parser *),
const char *str)
{
int mark = p->mark;
int error_indicator = p->error_indicator;
p->error_indicator = 0;
void *res = rule(p);
p->error_indicator = error_indicator;
if (res != NULL) {
if (positive) {
p->mark = mark;
return 1;
}
p->mark = mark;
return 0;
}
if (positive) {
p->mark = mark;
return 0;
}
p->mark = mark;
return 1;
}
/* 同步到特定 Token */
static void
_PyPegen_synchronize(Parser *p, int token_type)
{
while (!p->error_indicator) {
Token *token = _PyPegen_get_token(p, -1);
if (token == NULL) {
return;
}
if (token->type == token_type || token->type == ENDMARKER) {
return;
}
p->mark++;
}
}
7. 解析流程时序图
sequenceDiagram
participant User as 用户代码
participant Parser as 解析器
participant Tokenizer as 词法分析器
participant Grammar as 语法规则
participant AST as AST 构建器
participant Arena as 内存区域
User->>Parser: 解析源代码
Parser->>Arena: 创建内存区域
Parser->>Tokenizer: 初始化词法分析器
loop 解析循环
Parser->>Grammar: 匹配语法规则
Grammar->>Tokenizer: 请求下一个 Token
Tokenizer->>Tokenizer: 词法分析
Tokenizer-->>Grammar: 返回 Token
Grammar->>Grammar: 规则匹配检查
alt 匹配成功
Grammar->>AST: 创建 AST 节点
AST->>Arena: 分配节点内存
Arena-->>AST: 返回内存地址
AST-->>Grammar: 返回节点
else 匹配失败
Grammar->>Parser: 回溯到之前位置
Parser->>Grammar: 尝试其他规则
end
end
Parser->>AST: 验证完整 AST
AST->>AST: 语义检查
Parser-->>User: 返回 AST
8. 实战案例
8.1 自定义语法扩展
# 示例:扩展 Python 语法支持管道操作符
# 源代码:numbers |> map(lambda x: x * 2) |> list
# 期望 AST:Call(func=list, args=[Call(func=map, args=[Lambda(...), Name(numbers)])])
"""
在 Grammar/python.gram 中添加规则:
expression:
| disjunction 'if' disjunction 'else' expression
| pipeline
pipeline:
| pipeline '|>' disjunction { _Py_BinOp($pipeline, PipeOp, $disjunction, EXTRA) }
| disjunction
"""
# 对应的 C 解析代码会自动生成
static expr_ty
pipeline_rule(Parser *p)
{
D(fprintf(stderr, "Enter pipeline_rule\n"));
if (p->level++ == MAXSTACK) {
p->error_indicator = 1;
PyErr_SetString(PyExc_MemoryError, "too much recursion");
}
expr_ty res = NULL;
int mark = p->mark;
{ // pipeline '|>' disjunction
expr_ty a;
Token* pipe_token;
expr_ty b;
if (
(a = pipeline_rule(p)) // 递归调用
&&
(pipe_token = _PyPegen_expect_token(p, PIPE_OP)) // 管道操作符
&&
(b = disjunction_rule(p)) // 右操作数
)
{
res = _Py_BinOp(a, PipeOp, b, EXTRA);
goto done;
}
p->mark = mark;
}
{ // disjunction - 基础情况
expr_ty disjunction_var;
if ((disjunction_var = disjunction_rule(p))) {
res = disjunction_var;
goto done;
}
p->mark = mark;
}
done:
p->level--;
return res;
}
8.2 AST 分析工具
import ast
import types
from typing import List, Dict, Any
class ASTAnalyzer(ast.NodeVisitor):
"""CPython AST 深度分析工具"""
def __init__(self):
self.stats = {
'node_counts': {},
'complexity_metrics': {
'cyclomatic_complexity': 0,
'nesting_depth': 0,
'max_nesting': 0
},
'identifiers': set(),
'function_calls': [],
'imports': [],
}
self.current_depth = 0
def analyze(self, source_code: str) -> Dict[str, Any]:
"""分析源代码的 AST"""
try:
tree = ast.parse(source_code)
self.visit(tree)
return self.stats
except SyntaxError as e:
return {'error': f'语法错误: {e}'}
def visit(self, node):
"""访问节点并更新统计"""
node_type = type(node).__name__
self.stats['node_counts'][node_type] = (
self.stats['node_counts'].get(node_type, 0) + 1
)
# 跟踪嵌套深度
if isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.For,
ast.While, ast.If, ast.With, ast.Try)):
self.current_depth += 1
self.stats['complexity_metrics']['max_nesting'] = max(
self.stats['complexity_metrics']['max_nesting'],
self.current_depth
)
self.generic_visit(node)
if isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.For,
ast.While, ast.If, ast.With, ast.Try)):
self.current_depth -= 1
def visit_Name(self, node):
"""访问名称节点"""
self.stats['identifiers'].add(node.id)
self.generic_visit(node)
def visit_Call(self, node):
"""访问函数调用节点"""
if isinstance(node.func, ast.Name):
func_name = node.func.id
elif isinstance(node.func, ast.Attribute):
func_name = f"{ast.unparse(node.func.value)}.{node.func.attr}"
else:
func_name = ast.unparse(node.func)
self.stats['function_calls'].append({
'name': func_name,
'args_count': len(node.args),
'keywords_count': len(node.keywords),
'lineno': node.lineno
})
self.generic_visit(node)
def visit_Import(self, node):
"""访问 import 语句"""
for alias in node.names:
self.stats['imports'].append({
'type': 'import',
'name': alias.name,
'asname': alias.asname,
'lineno': node.lineno
})
self.generic_visit(node)
def visit_ImportFrom(self, node):
"""访问 from ... import 语句"""
for alias in node.names:
self.stats['imports'].append({
'type': 'from_import',
'module': node.module,
'name': alias.name,
'asname': alias.asname,
'level': node.level,
'lineno': node.lineno
})
self.generic_visit(node)
def visit_If(self, node):
"""访问 if 语句 - 计算圈复杂度"""
self.stats['complexity_metrics']['cyclomatic_complexity'] += 1
self.generic_visit(node)
def visit_For(self, node):
"""访问 for 循环"""
self.stats['complexity_metrics']['cyclomatic_complexity'] += 1
self.generic_visit(node)
def visit_While(self, node):
"""访问 while 循环"""
self.stats['complexity_metrics']['cyclomatic_complexity'] += 1
self.generic_visit(node)
def print_analysis(self):
"""打印分析结果"""
print("=== AST 分析结果 ===")
print(f"节点类型统计: {self.stats['node_counts']}")
print(f"复杂度指标: {self.stats['complexity_metrics']}")
print(f"标识符数量: {len(self.stats['identifiers'])}")
print(f"函数调用数量: {len(self.stats['function_calls'])}")
print(f"导入语句数量: {len(self.stats['imports'])}")
# 使用示例
source_code = """
def fibonacci(n):
if n <= 1:
return n
else:
return fibonacci(n-1) + fibonacci(n-2)
def main():
for i in range(10):
print(f"fibonacci({i}) = {fibonacci(i)}")
if __name__ == "__main__":
main()
"""
analyzer = ASTAnalyzer()
results = analyzer.analyze(source_code)
analyzer.print_analysis()
8.3 性能测试工具
import ast
import time
import memory_profiler
from typing import List, Tuple
class ParserBenchmark:
"""解析器性能测试"""
def __init__(self):
self.test_cases = []
def add_test_case(self, name: str, code: str):
"""添加测试用例"""
self.test_cases.append((name, code))
def benchmark_parsing(self, iterations: int = 100) -> List[Tuple[str, float, float]]:
"""基准测试解析性能"""
results = []
for name, code in self.test_cases:
# 预热
for _ in range(10):
ast.parse(code)
# 测试解析时间
start_time = time.perf_counter()
for _ in range(iterations):
tree = ast.parse(code)
end_time = time.perf_counter()
parse_time = (end_time - start_time) / iterations
# 测试内存使用
@memory_profiler.profile(precision=4)
def parse_with_memory():
return ast.parse(code)
# 简化的内存测试
import tracemalloc
tracemalloc.start()
ast.parse(code)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
results.append((name, parse_time * 1000, peak / 1024)) # ms, KB
return results
def print_benchmark_results(self, results: List[Tuple[str, float, float]]):
"""打印基准测试结果"""
print("=== 解析器性能测试结果 ===")
print(f"{'测试用例':<20} {'解析时间(ms)':<15} {'内存使用(KB)':<15}")
print("-" * 50)
for name, parse_time, memory_usage in results:
print(f"{name:<20} {parse_time:<15.3f} {memory_usage:<15.1f}")
# 创建测试用例
benchmark = ParserBenchmark()
# 简单表达式
benchmark.add_test_case("简单表达式", "a + b * c")
# 复杂函数
complex_function = """
def complex_function(a, b, c, d=None, *args, **kwargs):
result = []
for i in range(len(a)):
if i % 2 == 0:
temp = [x for x in b if x > i]
result.extend(temp)
else:
result.append(c.get(str(i), d))
try:
final_result = sum(result) if all(isinstance(x, (int, float)) for x in result) else None
except (TypeError, ValueError) as e:
final_result = str(e)
finally:
return final_result or "default"
"""
benchmark.add_test_case("复杂函数", complex_function)
# 类定义
class_definition = """
class ComplexClass(BaseClass, Mixin):
\"\"\"这是一个复杂的类定义\"\"\"
class_var = 100
def __init__(self, name: str, value: int = 0):
self.name = name
self.value = value
super().__init__()
@property
def formatted_name(self) -> str:
return f"{self.name.upper()}_{self.value}"
@classmethod
def from_dict(cls, data: dict) -> 'ComplexClass':
return cls(data['name'], data.get('value', 0))
def __str__(self) -> str:
return f"ComplexClass(name={self.name}, value={self.value})"
"""
benchmark.add_test_case("类定义", class_definition)
# 运行基准测试
results = benchmark.benchmark_parsing(1000)
benchmark.print_benchmark_results(results)
9. 最佳实践
9.1 解析器扩展建议
- 语法规则设计 - 避免左递归,考虑优先级
- 错误恢复 - 提供有用的错误信息和恢复策略
- 性能优化 - 使用记忆化避免重复解析
- AST 设计 - 保持节点结构简洁和一致
9.2 调试技巧
# 启用解析器调试模式
import ast
import sys
# 设置环境变量启用详细输出
import os
os.environ['PYTHONPARSER_DEBUG'] = '1'
# 或在代码中启用
sys.flags.debug = True
# 自定义 AST 遍历器进行调试
class DebugVisitor(ast.NodeVisitor):
def __init__(self):
self.level = 0
def visit(self, node):
print(" " * self.level + f"访问: {type(node).__name__}")
if hasattr(node, 'lineno'):
print(" " * self.level + f" 行号: {node.lineno}")
self.level += 1
self.generic_visit(node)
self.level -= 1
# 使用调试访问器
code = "x = y + z"
tree = ast.parse(code)
visitor = DebugVisitor()
visitor.visit(tree)
10. 总结
CPython 解析器的设计体现了以下核心思想:
- 分层架构 - 词法分析、语法分析和 AST 构建分离
- PEG 算法 - 强大的解析表达式语法支持
- 错误恢复 - 友好的错误报告和恢复机制
- 内存效率 - 使用 Arena 内存分配器
- 可扩展性 - 支持语法规则的灵活扩展
解析器是编程语言实现的前端,其设计质量直接影响开发者的使用体验。CPython 的解析器通过精心设计的架构和算法,为 Python 语言的灵活性和易用性提供了坚实基础。