CPython-09-生成器与协程-概览

1. 模块职责

生成器和协程是 Python 中实现惰性计算和异步编程的核心机制。本文档深入剖析 CPython 中 yield、生成器对象、协程对象的底层实现。

核心概念

  • 生成器(Generator):使用 yield 的函数,返回迭代器
  • 协程(Coroutine):使用 async def 定义的函数,返回可等待对象
  • 异步生成器(Async Generator):在 async def 中使用 yield 的函数

应用场景

  • 惰性计算、大数据处理
  • 协程式并发、异步I/O
  • 状态机实现
  • 无限序列生成

2. 生成器、协程与异步生成器对比

2.1 三种对象类型

flowchart TD
    A[可调用对象<br/>Function/AsyncFunction] --> B{函数标志}

    B -->|CO_GENERATOR| C[生成器<br/>PyGenObject]
    B -->|CO_COROUTINE| D[协程<br/>PyCoroObject]
    B -->|CO_ASYNC_GENERATOR| E[异步生成器<br/>PyAsyncGenObject]

    C --> F[yield 语句]
    D --> G[await 表达式]
    E --> H[yield + async]

    F --> I[迭代器协议<br/>__iter__, __next__]
    G --> J[等待协议<br/>__await__]
    H --> K[异步迭代器<br/>__aiter__, __anext__]

    style C fill:#e1ffe1
    style D fill:#fff4e1
    style E fill:#ffe1f5

类型对比表

特性 生成器 协程 异步生成器
定义方式 def + yield async def async def + yield
返回类型 generator coroutine async_generator
代码标志 CO_GENERATOR CO_COROUTINE CO_ASYNC_GENERATOR
协议 迭代器 等待 异步迭代器
主要方法 send(), throw(), close() 同生成器 + __await__() asend(), athrow(), aclose()
恢复方式 next() / send() await async for

2.2 代码标志识别

// Python/codegen.c

static int
codegen_function_body(compiler *c, stmt_ty s, int is_async, Py_ssize_t funcflags,
                      int firstlineno)
{
    // 根据 AST 节点类型和是否 async 确定函数类型
    int scope_type;

    if (is_async) {
        // async def 函数
        scope_type = COMPILE_SCOPE_ASYNC_FUNCTION;
    } else {
        // 普通 def 函数
        scope_type = COMPILE_SCOPE_FUNCTION;
    }

    // 进入作用域,编译器会根据函数体中的语句
    // 自动设置 CO_GENERATOR/CO_COROUTINE/CO_ASYNC_GENERATOR 标志
    codegen_enter_scope(c, name, scope_type, (void *)s, firstlineno, NULL, &umd);

    // ... 编译函数体
}

代码对象标志co_flags):

// Include/cpython/code.h

#define CO_GENERATOR        0x0020  // 包含 yield
#define CO_COROUTINE        0x0080  // async def(无 yield)
#define CO_ASYNC_GENERATOR  0x0200  // async def + yield

3. 生成器对象(Generator)

3.1 生成器数据结构

// Include/cpython/genobject.h

typedef struct {
    PyObject_HEAD
    // 生成器的栈帧(执行状态)
    _PyInterpreterFrame gi_iframe;
    // 弱引用列表
    PyObject *gi_weakreflist;
    // 生成器名称和限定名
    PyObject *gi_name;
    PyObject *gi_qualname;
    // 生成器状态代码
    _PyGen_State gi_frame_state;
    // 异常信息
    PyObject *gi_exc_state_value;
} PyGenObject;

生成器状态

typedef enum {
    _Py_GEN_CREATED,   // 刚创建,未开始执行
    _Py_GEN_RUNNING,   // 正在执行
    _Py_GEN_SUSPENDED, // 暂停(在yield处)
    _Py_GEN_COMPLETED  // 已完成
} _PyGen_State;

3.2 生成器创建

// Objects/genobject.c

PyObject *
_Py_MakeCoro(PyFunctionObject *func)
{
    // 获取代码对象的标志
    int coro_flags = ((PyCodeObject *)func->func_code)->co_flags &
        (CO_GENERATOR | CO_COROUTINE | CO_ASYNC_GENERATOR);

    assert(coro_flags);

    // 1. 普通生成器
    if (coro_flags == CO_GENERATOR) {
        return make_gen(&PyGen_Type, func);
    }

    // 2. 异步生成器
    if (coro_flags == CO_ASYNC_GENERATOR) {
        PyAsyncGenObject *ag;
        ag = (PyAsyncGenObject *)make_gen(&PyAsyncGen_Type, func);
        if (ag == NULL) {
            return NULL;
        }
        ag->ag_origin_or_finalizer = NULL;
        ag->ag_closed = 0;
        ag->ag_hooks_inited = 0;
        ag->ag_running_async = 0;
        return (PyObject*)ag;
    }

    // 3. 协程
    assert (coro_flags == CO_COROUTINE);
    PyObject *coro = make_gen(&PyCoro_Type, func);
    if (!coro) {
        return NULL;
    }

    // 设置协程追踪信息
    PyThreadState *tstate = _PyThreadState_GET();
    int origin_depth = tstate->coroutine_origin_tracking_depth;

    if (origin_depth == 0) {
        ((PyCoroObject *)coro)->cr_origin_or_finalizer = NULL;
    } else {
        // 记录协程创建的调用栈
        _PyInterpreterFrame *frame = tstate->current_frame;
        frame = _PyFrame_GetFirstComplete(frame->previous);
        PyObject *cr_origin = compute_cr_origin(origin_depth, frame);
        ((PyCoroObject *)coro)->cr_origin_or_finalizer = cr_origin;
        if (!cr_origin) {
            Py_DECREF(coro);
            return NULL;
        }
    }
    return coro;
}

static PyObject *
make_gen(PyTypeObject *type, PyFunctionObject *func)
{
    PyGenObject *gen;

    // 分配生成器对象
    gen = PyObject_GC_New(PyGenObject, type);
    if (gen == NULL) {
        return NULL;
    }

    // 初始化生成器栈帧
    _PyInterpreterFrame *frame = &gen->gi_iframe;
    _PyFrame_Initialize(frame, func, NULL, func->func_code, 0);

    gen->gi_weakreflist = NULL;
    gen->gi_frame_state = _Py_GEN_CREATED;
    gen->gi_exc_state_value = NULL;

    Py_INCREF(func->func_name);
    gen->gi_name = func->func_name;
    Py_INCREF(func->func_qualname);
    gen->gi_qualname = func->func_qualname;

    _PyObject_GC_TRACK(gen);

    return (PyObject *)gen;
}

3.3 生成器执行流程

完整执行时序图

sequenceDiagram
    autonumber
    participant User as 用户代码
    participant Gen as 生成器对象
    participant Frame as 栈帧
    participant Eval as 求值器

    User->>Gen: gen = func()<br/>创建生成器
    Gen->>Frame: _PyFrame_Initialize()<br/>初始化栈帧
    Gen-->>User: 返回生成器对象

    rect rgb(230, 255, 230)
    note right of User: 第一次迭代
    User->>Gen: next(gen)
    Gen->>Gen: 检查状态 = CREATED
    Gen->>Eval: _PyEval_EvalFrameDefault(frame)
    Eval->>Eval: 执行字节码...
    Eval->>Eval: 遇到 YIELD_VALUE
    Eval->>Gen: 保存栈帧状态
    Gen->>Gen: 设置状态 = SUSPENDED
    Gen-->>User: 返回 yield 的值
    end

    rect rgb(255, 244, 230)
    note right of User: 后续迭代
    User->>Gen: next(gen) / send(value)
    Gen->>Gen: 检查状态 = SUSPENDED
    Gen->>Frame: 恢复栈帧
    Gen->>Eval: _PyEval_EvalFrameDefault(frame)
    Eval->>Eval: 从上次yield处继续
    Eval->>Eval: 遇到 YIELD_VALUE
    Eval->>Gen: 保存栈帧状态
    Gen-->>User: 返回值
    end

    rect rgb(255, 230, 230)
    note right of User: 生成器结束
    User->>Gen: next(gen)
    Gen->>Eval: _PyEval_EvalFrameDefault(frame)
    Eval->>Eval: 函数返回(RETURN_VALUE)
    Eval->>Gen: 设置状态 = COMPLETED
    Gen->>Gen: PyErr_SetObject(StopIteration)
    Gen-->>User: 抛出 StopIteration
    end

生成器send方法

// Objects/genobject.c

static PyObject *
gen_send(PyObject *self, PyObject *arg)
{
    PyGenObject *gen = (PyGenObject *)self;

    // 检查生成器状态
    if (gen->gi_frame_state == _Py_GEN_COMPLETED) {
        PyErr_SetNone(PyExc_StopIteration);
        return NULL;
    }

    if (gen->gi_frame_state == _Py_GEN_CREATED && arg != Py_None) {
        // 第一次send必须是None
        PyErr_SetString(PyExc_TypeError,
                       "can't send non-None value to a just-started generator");
        return NULL;
    }

    // 执行生成器
    return gen_send_ex(gen, arg, 0, 0);
}

static PyObject *
gen_send_ex(PyGenObject *gen, PyObject *arg, int exc, int closing)
{
    PyThreadState *tstate = _PyThreadState_GET();
    _PyInterpreterFrame *frame = &gen->gi_iframe;
    PyObject *result;

    // 设置生成器为运行状态
    gen->gi_frame_state = _Py_GEN_RUNNING;

    // 将send的值压入栈(作为上次yield的返回值)
    if (arg != NULL) {
        // 栈顶已经有一个值(上次yield的),需要替换
        _PyFrame_StackPush(frame, arg);
    }

    // 执行栈帧
    result = _PyEval_EvalFrame(tstate, frame, exc);

    // 如果返回了值,说明遇到了yield
    if (result != NULL) {
        // 生成器暂停
        gen->gi_frame_state = _Py_GEN_SUSPENDED;
        return result;
    }

    // 如果没有返回值且没有异常,说明生成器结束
    if (PyErr_Occurred() == NULL) {
        PyErr_SetNone(PyExc_StopIteration);
    }

    gen->gi_frame_state = _Py_GEN_COMPLETED;
    return NULL;
}

3.4 yield 字节码

yield 语句编译

// Python/codegen.c

case Yield_kind:
    // 编译 yield value 表达式
    VISIT(c, expr, e->v.Yield.value);
    // 生成 YIELD_VALUE 指令
    ADDOP_YIELD(c, loc);
    break;

case YieldFrom_kind:
    // 编译 yield from iterable
    if (SCOPE_TYPE(c) == COMPILE_SCOPE_ASYNC_FUNCTION) {
        return _PyCompile_Error(c, loc, "'yield from' inside async function");
    }

    VISIT(c, expr, e->v.YieldFrom.value);
    // 获取迭代器
    ADDOP(c, loc, GET_YIELD_FROM_ITER);
    // 压入None(第一次send)
    ADDOP_LOAD_CONST(c, loc, Py_None);
    // 委托给子迭代器
    ADD_YIELD_FROM(c, loc, 0);
    break;

YIELD_VALUE 指令执行

// Python/bytecodes.c

inst(YIELD_VALUE, (value -- value)) {
    // 将值保存到栈中
    _PyFrame_SetStackPointer(frame, stack_pointer);

    // 记录yield位置(用于恢复)
    frame->prev_instr = next_instr - 1;

    // 返回yielded的值
    DISPATCH_INLINED(value);
}

4. yield from 委托

yield from 允许一个生成器委托给另一个生成器。

4.1 yield from 执行流程

sequenceDiagram
    autonumber
    participant User as 用户代码
    participant Outer as 外层生成器
    participant Inner as 内层生成器
    participant Eval as 求值器

    User->>Outer: next(outer_gen)
    Outer->>Outer: 执行到 yield from inner_gen
    Outer->>Eval: GET_YIELD_FROM_ITER
    Eval->>Inner: 获取内层生成器
    Outer->>Inner: send(None)

    loop 委托循环
        Inner->>Inner: 执行yield
        Inner-->>Outer: 返回值
        Outer-->>User: 透传返回值
        User->>Outer: send(value)
        Outer->>Inner: 透传send(value)
    end

    Inner->>Inner: 抛出StopIteration(result)
    Inner-->>Outer: 捕获异常
    Outer->>Outer: 提取result值
    Outer->>Outer: 继续执行

实现细节

// Python/bytecodes.c

inst(GET_YIELD_FROM_ITER, (iterable -- iter)) {
    // 如果是生成器,直接使用
    if (PyGen_CheckExact(iterable) || PyCoro_CheckExact(iterable)) {
        iter = iterable;
    }
    // 否则获取迭代器
    else {
        iter = PyObject_GetIter(iterable);
        if (iter == NULL) {
            GOTO_ERROR(error);
        }
        Py_DECREF(iterable);
    }
}

// 委托执行
macro(YIELD_FROM) = {
    SEND,           // 发送值到子生成器
    JUMP_BACKWARD,  // 继续循环
};

示例

def inner():
    yield 1
    yield 2
    return 'done'

def outer():
    result = yield from inner()  # 委托给inner
    print(f"inner returned: {result}")
    yield 3

gen = outer()
print(next(gen))  # 1(来自inner)
print(next(gen))  # 2(来自inner)
print(next(gen))  # 打印"inner returned: done",然后yield 3

5. 协程对象(Coroutine)

协程是使用 async def 定义的函数,本质上是特殊的生成器。

5.1 协程数据结构

// Include/cpython/genobject.h

typedef struct {
    PyObject_HEAD
    _PyInterpreterFrame cr_iframe;
    PyObject *cr_weakreflist;
    PyObject *cr_name;
    PyObject *cr_qualname;
    _PyGen_State cr_frame_state;
    PyObject *cr_exc_state_value;
    // 协程特有:追踪创建位置或finalizer
    PyObject *cr_origin_or_finalizer;
} PyCoroObject;

5.2 await 表达式

await 是协程中暂停和恢复的关键字。

编译为字节码

// Python/codegen.c

case Await_kind:
    // 编译 await expr
    VISIT(c, expr, e->v.Await.value);
    // 获取awaitable对象
    ADDOP_I(c, loc, GET_AWAITABLE, 0);
    // 压入None
    ADDOP_LOAD_CONST(c, loc, Py_None);
    // 委托执行(类似yield from)
    ADD_YIELD_FROM(c, loc, 1);
    break;

GET_AWAITABLE 指令

// Python/bytecodes.c

inst(GET_AWAITABLE, (iterable -- iter)) {
    // 检查是否实现了__await__方法
    iter = _PyObject_CallMethod(iterable, "__await__", NULL);

    if (iter == NULL) {
        // 如果是协程或生成器,直接使用
        if (PyCoro_CheckExact(iterable) || PyGen_CheckExact(iterable)) {
            iter = iterable;
        }
        else {
            PyErr_Format(PyExc_TypeError,
                        "object %.100s can't be used in 'await' expression",
                        Py_TYPE(iterable)->tp_name);
            GOTO_ERROR(error);
        }
    }

    Py_DECREF(iterable);
}

5.3 协程的等待协议

协程实现了等待协议(awaitable protocol):

// Objects/genobject.c

static PyObject *
coro_await(PyObject *coro)
{
    // 返回self(协程本身就是awaitable)
    Py_INCREF(coro);
    return coro;
}

static PyAsyncMethods coro_as_async = {
    coro_await,    // __await__
    0,             // __aiter__
    0,             // __anext__
};

PyTypeObject PyCoro_Type = {
    PyVarObject_HEAD_INIT(&PyType_Type, 0)
    "coroutine",
    // ...
    &coro_as_async,  // tp_as_async
    // ...
};

示例

async def coro():
    print("Coroutine start")
    await asyncio.sleep(1)  # 暂停,交出控制权
    print("Coroutine resume")
    return 42

# 创建协程对象
c = coro()
print(type(c))  # <class 'coroutine'>

# 协程需要被驱动(通过事件循环)
# asyncio.run(coro())

6. 异步生成器(Async Generator)

异步生成器结合了 async defyield,用于异步迭代。

6.1 异步生成器数据结构

// Include/cpython/genobject.h

typedef struct {
    PyObject_HEAD
    _PyInterpreterFrame ag_iframe;
    PyObject *ag_weakreflist;
    PyObject *ag_name;
    PyObject *ag_qualname;
    _PyGen_State ag_frame_state;
    PyObject *ag_exc_state_value;
    PyObject *ag_origin_or_finalizer;
    // 异步生成器特有
    int ag_running_async;  // 是否在异步运行
    int ag_closed;         // 是否已关闭
    int ag_hooks_inited;   // hooks是否已初始化
} PyAsyncGenObject;

6.2 异步迭代协议

// Objects/genobject.c

static PyObject *
async_gen_anext(PyAsyncGenObject *ag)
{
    // 返回asend包装器
    return async_gen_asend_new(ag, Py_None);
}

static PyObject *
async_gen_aiter(PyAsyncGenObject *ag)
{
    // 返回self
    Py_INCREF(ag);
    return (PyObject *)ag;
}

static PyAsyncMethods async_gen_as_async = {
    0,                       // __await__
    async_gen_aiter,         // __aiter__
    async_gen_anext,         // __anext__
};

示例

async def async_range(n):
    for i in range(n):
        await asyncio.sleep(0.1)  // 模拟异步操作
        yield i

async def main():
    async for value in async_range(5):
        print(value)

# asyncio.run(main())

6.3 async for 循环编译

// Python/codegen.c

static int
codegen_async_for(compiler *c, stmt_ty s)
{
    // async for target in iter:
    //     body

    // 获取异步迭代器
    VISIT(c, expr, s->v.AsyncFor.iter);
    ADDOP(c, LOC(s->v.AsyncFor.iter), GET_AITER);

    USE_LABEL(c, start);

    // 设置异常处理
    ADDOP_JUMP(c, loc, SETUP_FINALLY, except);

    // 获取下一个值
    ADDOP(c, loc, GET_ANEXT);
    ADDOP_LOAD_CONST(c, loc, Py_None);

    // 等待结果(类似await)
    ADD_YIELD_FROM(c, loc, 1);
    ADDOP(c, loc, POP_BLOCK);

    // 赋值给target
    VISIT(c, expr, s->v.AsyncFor.target);

    // 执行循环体
    VISIT_SEQ(c, stmt, s->v.AsyncFor.body);

    // 跳回循环开始
    ADDOP_JUMP(c, NO_LOCATION, JUMP, start);

    // 异常处理(StopAsyncIteration)
    USE_LABEL(c, except);
    ADDOP_JUMP(c, loc, END_ASYNC_FOR, send);

    // else块
    VISIT_SEQ(c, stmt, s->v.AsyncFor.orelse);

    USE_LABEL(c, end);
    return SUCCESS;
}

7. 实战案例

7.1 生成器:斐波那契数列

def fibonacci():
    """无限斐波那契数列生成器"""
    a, b = 0, 1
    while True:
        yield a  # 暂停并返回当前值
        a, b = b, a + b

# 使用
fib = fibonacci()
for _ in range(10):
    print(next(fib), end=' ')
# 输出: 0 1 1 2 3 5 8 13 21 34

对应字节码

import dis

def fibonacci():
    a, b = 0, 1
    while True:
        yield a
        a, b = b, a + b

dis.dis(fibonacci)
# 简化后的字节码
LOAD_CONST    (0, 1)
UNPACK_SEQUENCE  2
STORE_FAST    a
STORE_FAST    b

LOAD_FAST     a
YIELD_VALUE       # 暂停,返回a
POP_TOP

LOAD_FAST     b
LOAD_FAST     a
LOAD_FAST     b
BINARY_ADD
BUILD_TUPLE   2
UNPACK_SEQUENCE  2
STORE_FAST    a
STORE_FAST    b
JUMP_BACKWARD

7.2 yield from:树遍历

class Node:
    def __init__(self, value, left=None, right=None):
        self.value = value
        self.left = left
        self.right = right

    def traverse(self):
        """中序遍历"""
        if self.left:
            yield from self.left.traverse()  # 委托给左子树
        yield self.value
        if self.right:
            yield from self.right.traverse()  # 委托给右子树

# 构建树
root = Node(2,
            Node(1),
            Node(3))

# 遍历
for value in root.traverse():
    print(value)
# 输出: 1 2 3

7.3 协程:简单任务调度

def simple_coro():
    print('-> Started')
    x = yield      # 暂停,等待send
    print('-> Received:', x)
    y = yield x * 2
    print('-> Received:', y)
    return x + y

# 使用
coro = simple_coro()
next(coro)           # 启动协程,输出: -> Started
print(coro.send(10)) # 发送10,输出: -> Received: 10, 返回20
try:
    coro.send(5)     # 发送5,输出: -> Received: 5
except StopIteration as e:
    print('Result:', e.value)  # 输出: Result: 15

7.4 async/await:HTTP请求

import asyncio
import aiohttp

async def fetch_url(session, url):
    """异步获取URL内容"""
    async with session.get(url) as response:  # await 隐式调用
        return await response.text()

async def main():
    urls = [
        'http://example.com',
        'http://example.org',
        'http://example.net',
    ]

    async with aiohttp.ClientSession() as session:
        # 并发执行多个请求
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

        for url, content in zip(urls, results):
            print(f"{url}: {len(content)} bytes")

# asyncio.run(main())

8. 生成器与协程的区别

8.1 概念区别

维度 生成器 协程
目的 惰性迭代 异步编程
暂停点 yield await
恢复方式 next() / send() 事件循环驱动
返回值 通过 StopIteration.value 通过 return
可否嵌套 可(yield from 可(await

8.2 底层实现差异

flowchart LR
    A[函数定义] --> B{检查关键字}

    B -->|包含yield| C[设置CO_GENERATOR]
    B -->|async def<br/>无yield| D[设置CO_COROUTINE]
    B -->|async def<br/>+yield| E[设置CO_ASYNC_GENERATOR]

    C --> F[PyGenObject<br/>迭代器协议]
    D --> G[PyCoroObject<br/>等待协议]
    E --> H[PyAsyncGenObject<br/>异步迭代器协议]

    F --> I[next/send驱动]
    G --> J[await驱动]
    H --> K[async for驱动]

9. 性能考量

9.1 生成器优势

  • 内存效率:惰性计算,不需要一次性生成所有值
  • 无限序列:可以表示无限序列
  • 管道式处理:多个生成器可以链式组合

示例:内存对比

# 列表:一次性生成所有值
def list_squares(n):
    return [x**2 for x in range(n)]

# 生成器:按需生成
def gen_squares(n):
    for x in range(n):
        yield x**2

import sys

# 内存占用
lst = list_squares(1000000)
gen = gen_squares(1000000)

print(sys.getsizeof(lst))  # ~8MB
print(sys.getsizeof(gen))  # ~128 bytes

9.2 协程性能

协程相比线程:

  • 更轻量:无需操作系统线程切换开销
  • 更高效:单线程事件循环,无GIL竞争
  • 更可控:显式的暂停点(await)

并发对比

import time
import asyncio

# 同步版本
def sync_task():
    for i in range(5):
        time.sleep(0.1)
    return "done"

def sync_main():
    start = time.time()
    tasks = [sync_task() for _ in range(10)]
    print(f"Sync: {time.time() - start:.2f}s")  # ~5s

# 异步版本
async def async_task():
    for i in range(5):
        await asyncio.sleep(0.1)
    return "done"

async def async_main():
    start = time.time()
    tasks = [async_task() for _ in range(10)]
    await asyncio.gather(*tasks)
    print(f"Async: {time.time() - start:.2f}s")  # ~0.5s

sync_main()
asyncio.run(async_main())

10. 调试技巧

10.1 查看生成器状态

import inspect

def my_gen():
    yield 1
    yield 2

gen = my_gen()

print(inspect.getgeneratorstate(gen))  # GEN_CREATED
next(gen)
print(inspect.getgeneratorstate(gen))  # GEN_SUSPENDED
next(gen)
print(inspect.getgeneratorstate(gen))  # GEN_SUSPENDED
try:
    next(gen)
except StopIteration:
    print(inspect.getgeneratorstate(gen))  # GEN_CLOSED

10.2 协程调试

import sys

async def buggy_coro():
    await asyncio.sleep(1)
    x = 1 / 0  # 错误

async def main():
    try:
        await buggy_coro()
    except ZeroDivisionError:
        import traceback
        traceback.print_exc()

asyncio.run(main())

11. 最佳实践

11.1 生成器

推荐

  • 用于大数据流处理
  • 表示无限序列
  • 管道式数据转换
def read_large_file(filename):
    """按行读取大文件"""
    with open(filename) as f:
        for line in f:
            yield line.strip()

def filter_lines(lines, keyword):
    """过滤包含关键字的行"""
    for line in lines:
        if keyword in line:
            yield line

def process(filename, keyword):
    """管道式处理"""
    lines = read_large_file(filename)
    filtered = filter_lines(lines, keyword)
    for line in filtered:
        print(line)

11.2 协程

推荐

  • I/O密集型任务
  • 网络请求
  • 并发操作

不推荐

  • CPU密集型任务(使用多进程)
  • 简单顺序操作(不需要异步)

12. 总结

生成器和协程是Python中强大的控制流机制:

  1. 生成器

    • 使用 yield 实现惰性迭代
    • 内存高效,适合大数据处理
    • 通过 yield from 支持委托
  2. 协程

    • 使用 async def + await 实现异步编程
    • 轻量级并发,无需多线程
    • 事件循环驱动执行
  3. 底层实现

    • 基于栈帧保存和恢复
    • 字节码级别的暂停和恢复
    • 共享相同的基础结构(PyGenObject系列)

理解生成器和协程的底层实现有助于:

  • 编写高效的异步代码
  • 理解Python并发模型
  • 调试复杂的异步问题
  • 设计更好的API

14. 生成器与协程API源码深度剖析

14.1 生成器send实现

// Objects/genobject.c

static PyObject* gen_send_ex(PyGenObject *gen, PyObject *arg, int exc, int closing)
{
    PyThreadState *tstate = _PyThreadState_GET();
    _PyInterpreterFrame *frame = (_PyInterpreterFrame *)gen->gi_iframe;
    PyObject *result;

    if (gen->gi_frame_state == FRAME_CREATED) {
        // 首次调用
        if (arg && arg != Py_None) {
            PyErr_SetString(PyExc_TypeError,
                           "can't send non-None value to a just-started generator");
            return NULL;
        }
    }

    // 恢复栈帧并执行
    result = _PyEval_EvalFrame(tstate, frame, exc);

    if (result == NULL) {
        if (PyErr_ExceptionMatches(PyExc_StopIteration)) {
            return NULL;
        }
    }

    return result;
}

生成器状态转换图

stateDiagram-v2
    [*] --> Created: 创建生成器
    Created --> Running: send(None)
    Running --> Suspended: yield value
    Suspended --> Running: send(value)
    Running --> Closed: return/exception
    Suspended --> Closed: close()
    Closed --> [*]

深度补充:生成器与协程核心实现

10.1 生成器send实现

// Objects/genobject.c

static PyObject* gen_send_ex(PyGenObject *gen, PyObject *arg, int exc, int closing)
{
    PyThreadState *tstate = _PyThreadState_GET();
    _PyInterpreterFrame *frame = (_PyInterpreterFrame *)gen->gi_iframe;
    PyObject *result;

    // 1. 检查生成器状态
    if (gen->gi_frame_state == FRAME_CREATED) {
        // 首次调用
        if (arg && arg != Py_None) {
            const char *msg = "can't send non-None value to a just-started generator";
            PyErr_SetString(PyExc_TypeError, msg);
            return NULL;
        }
    }
    else if (gen->gi_frame_state == FRAME_COMPLETED) {
        // 已完成
        if (closing) {
            return NULL;
        }
        return gen_send_ex_done(gen);
    }

    // 2. 设置栈帧状态
    frame->previous = tstate->current_frame;
    gen->gi_frame_state = FRAME_EXECUTING;
    tstate->current_frame = frame;

    // 3. 压入发送的值
    if (arg != NULL) {
        _PyStackRef *stack_top = frame->stackpointer;
        *stack_top = PyStackRef_FromPyObjectNew(arg);
        frame->stackpointer++;
    }

    // 4. 恢复执行
    result = _PyEval_EvalFrame(tstate, frame, exc);

    // 5. 处理结果
    tstate->current_frame = frame->previous;

    if (result == NULL) {
        if (PyErr_ExceptionMatches(PyExc_StopIteration)) {
            // 正常结束
            gen->gi_frame_state = FRAME_COMPLETED;
            _PyEval_FrameClearAndPop(tstate, frame);
        }
        else {
            // 异常
            gen->gi_frame_state = FRAME_COMPLETED;
        }
        return NULL;
    }

    // 6. yield返回
    gen->gi_frame_state = FRAME_SUSPENDED;
    return result;
}

10.2 生成器状态机UML

stateDiagram-v2
    [*] --> CREATED: 创建生成器
    CREATED --> EXECUTING: send(None) / next()
    EXECUTING --> SUSPENDED: yield value
    SUSPENDED --> EXECUTING: send(value)
    EXECUTING --> COMPLETED: return / StopIteration
    SUSPENDED --> COMPLETED: close()
    COMPLETED --> [*]

    note right of EXECUTING
        gi_frame_state = 0
        执行字节码
    end note

    note right of SUSPENDED
        gi_frame_state = -2
        保存栈帧状态
    end note

10.3 async/await执行时序

sequenceDiagram
    autonumber
    participant Code as async def
    participant Coro as PyCoroObject
    participant EventLoop as 事件循环
    participant Frame as _PyInterpreterFrame
    participant Awaitable as 被await对象

    Code->>Coro: 创建协程对象
    Note over Coro: cr_frame_state = CREATED

    EventLoop->>Coro: send(None)
    Coro->>Frame: 恢复栈帧
    Frame->>Frame: 执行到await

    Frame->>Awaitable: 获取__await__
    Awaitable-->>Frame: 返回迭代器

    Frame->>EventLoop: yield from iterator
    Note over Coro: cr_frame_state = SUSPENDED

    EventLoop->>Awaitable: 等待完成
    Awaitable-->>EventLoop: 结果

    EventLoop->>Coro: send(result)
    Coro->>Frame: 恢复执行
    Frame->>Frame: 继续执行

    alt 遇到return
        Frame-->>Coro: StopIteration(value)
        Coro-->>EventLoop: 协程完成
    else 遇到另一个await
        Frame-->>EventLoop: yield from
    end

10.4 完整函数调用链

# 生成器执行链
next(generator)
  └─> gen_iternext()
        └─> gen_send_ex()
              ├─> _PyEval_EvalFrame()
              │     └─> [执行到YIELD_VALUE]
              │           └─> YIELD_VALUE指令
              └─> [返回yield的值]

# 协程执行链
await coroutine
  └─> BEFORE_ASYNC_WITH / GET_AWAITABLE
        └─> _PyCoro_GetAwaitableIter()
              └─> coro_wrapper_iternext()
                    └─> _PyGen_Send()
                          └─> [执行协程代码]

10.5 性能优化建议

# 1. 使用生成器表达式代替列表推导
gen = (x*2 for x in range(1000000))  # O(1)内存
# vs
lst = [x*2 for x in range(1000000)]  # O(n)内存

# 2. 使用itertools提高效率
from itertools import islice, chain
result = list(islice(gen, 10))  # 取前10个

# 3. 避免在生成器中使用异常
# 使用if检查代替try-except

# 4. 使用asyncio gather并发
results = await asyncio.gather(coro1(), coro2(), coro3())

# 5. 使用asyncio.create_task创建后台任务
task = asyncio.create_task(background_work())

# 6. 避免阻塞调用
# bad: time.sleep(1)
# good: await asyncio.sleep(1)

深度补充扩展:协程系统完整架构与调用链分析

11.1 协程系统整体架构图

flowchart TB
    subgraph 用户层["用户代码层"]
        AsyncDef["async def 函数"]
        AwaitExpr["await 表达式"]
        AsyncFor["async for 循环"]
        AsyncWith["async with 上下文"]
    end

    subgraph 编译层["编译器层"]
        AST["AST节点<br/>AsyncFunctionDef<br/>Await<br/>AsyncFor"]
        Compiler["编译器<br/>codegen.c"]
        Bytecode["字节码<br/>GET_AWAITABLE<br/>SEND<br/>YIELD_VALUE"]
    end

    subgraph 运行时层["运行时执行层"]
        CoroCreate["协程创建<br/>PyCoroObject"]
        FrameState["栈帧状态<br/>_PyInterpreterFrame"]
        Evaluator["解释器<br/>_PyEval_EvalFrame"]
        SendRecv["send/throw<br/>gen_send_ex"]
    end

    subgraph 事件循环层["事件循环层"]
        EventLoop["asyncio.EventLoop"]
        Task["asyncio.Task"]
        Future["asyncio.Future"]
        Scheduler["任务调度器"]
    end

    subgraph 底层支持["底层支持模块"]
        GIL["GIL管理"]
        ThreadState["线程状态<br/>PyThreadState"]
        Exception["异常处理<br/>StopIteration"]
        Memory["内存管理<br/>PyArena"]
    end

    AsyncDef --> AST
    AwaitExpr --> AST
    AsyncFor --> AST
    AsyncWith --> AST

    AST --> Compiler
    Compiler --> Bytecode

    Bytecode --> CoroCreate
    CoroCreate --> FrameState
    FrameState --> Evaluator
    Evaluator --> SendRecv

    SendRecv --> EventLoop
    EventLoop --> Task
    Task --> Future
    Future --> Scheduler

    Scheduler --> SendRecv

    Evaluator --> GIL
    Evaluator --> ThreadState
    SendRecv --> Exception
    CoroCreate --> Memory

    style 用户层 fill:#e1f5ff
    style 编译层 fill:#ffe1cc
    style 运行时层 fill:#e1ffe1
    style 事件循环层 fill:#fff3e1
    style 底层支持 fill:#f0e1ff

架构层次说明

  1. 用户代码层:Python开发者编写的异步代码

    • async def:定义协程函数
    • await:暂停点,等待异步操作完成
    • async for/with:异步迭代和上下文管理
  2. 编译器层:将Python代码转换为字节码

    • AST构建:识别async/await语法
    • 代码生成:生成协程相关字节码
    • 标志设置:CO_COROUTINE标志
  3. 运行时执行层:协程对象的创建和执行

    • 协程对象:PyCoroObject结构体
    • 栈帧管理:保存和恢复执行状态
    • 解释器:执行字节码直到yield
  4. 事件循环层:协程的调度和管理

    • EventLoop:事件循环驱动
    • Task:协程的包装器
    • Scheduler:任务调度逻辑
  5. 底层支持:系统级支持

    • GIL:保证线程安全
    • 异常:StopIteration传递返回值
    • 内存:对象分配和回收

本文档详细剖析了CPython生成器、协程和异步生成器的底层实现机制,包括完整的架构图、时序图、调用链分析以及关键API源码。掌握这些知识对于编写高效的异步Python程序至关重要。

协程完整执行流程(从代码到运行)

sequenceDiagram
    autonumber
    participant User as 用户代码
    participant Parser as 解析器
    participant Compiler as 编译器
    participant Runtime as 运行时
    participant CoroObj as 协程对象
    participant Frame as 栈帧
    participant Eval as 解释器
    participant Loop as 事件循环

    rect rgb(230, 240, 255)
    note over User,Compiler: 阶段1: 编译阶段
    User->>Parser: async def coro():<br/>    await task()
    Parser->>Parser: 词法分析<br/>识别async/await关键字
    Parser->>Compiler: AST树<br/>AsyncFunctionDef + Await
    Compiler->>Compiler: 设置CO_COROUTINE标志
    Compiler->>Compiler: 生成字节码<br/>GET_AWAITABLE + SEND
    Compiler-->>Runtime: PyCodeObject<br/>(co_flags=CO_COROUTINE)
    end

    rect rgb(255, 245, 230)
    note over Runtime,CoroObj: 阶段2: 协程创建
    Runtime->>CoroObj: 调用协程函数
    CoroObj->>CoroObj: PyCoroObject_New()
    CoroObj->>Frame: 创建_PyInterpreterFrame
    Frame->>Frame: 分配栈空间
    Frame->>Frame: 初始化局部变量
    CoroObj-->>Runtime: 返回协程对象<br/>(未启动)
    end

    rect rgb(240, 255, 240)
    note over Loop,Eval: 阶段3: 协程执行
    Loop->>CoroObj: send(None) 首次启动
    CoroObj->>Frame: 激活栈帧
    Frame->>Frame: frame_state = EXECUTING
    Frame->>Eval: _PyEval_EvalFrame(frame)

    loop 执行字节码直到await
        Eval->>Eval: 执行指令
        Eval->>Eval: LOAD_FAST, CALL等
    end

    Eval->>Eval: GET_AWAITABLE指令
    Eval->>Eval: 获取awaitable对象
    Eval->>Eval: SEND指令<br/>委托给子协程
    Eval->>Frame: 保存当前状态
    Frame->>Frame: frame_state = SUSPENDED
    Eval-->>CoroObj: yield 控制权
    CoroObj-->>Loop: 返回awaitable
    end

    rect rgb(255, 240, 240)
    note over Loop,Eval: 阶段4: 恢复执行
    Loop->>Loop: 等待awaitable完成
    Loop->>CoroObj: send(result) 恢复
    CoroObj->>Frame: 恢复栈帧
    Frame->>Frame: frame_state = EXECUTING
    Frame->>Eval: 继续执行

    loop 继续执行
        Eval->>Eval: 处理await结果
        Eval->>Eval: 执行剩余代码
    end

    alt 正常返回
        Eval->>Eval: RETURN_VALUE指令
        Eval->>Exception: 抛出StopIteration(value)
        Exception-->>CoroObj: 捕获异常
        CoroObj->>Frame: frame_state = COMPLETED
        CoroObj-->>Loop: 返回结果
    else 遇到下一个await
        Eval-->>CoroObj: 再次yield
        CoroObj-->>Loop: 返回awaitable
    end
    end

关键模块交互图

flowchart LR
    subgraph 编译模块["Python/compile.c"]
        C1[compiler_async_function]
        C2[compiler_await]
        C3[codegen_addop<br/>GET_AWAITABLE]
    end

    subgraph 字节码模块["Python/bytecodes.c"]
        B1[GET_AWAITABLE]
        B2[SEND]
        B3[YIELD_VALUE]
        B4[END_SEND]
    end

    subgraph 协程对象["Objects/genobject.c"]
        G1[PyCoroObject]
        G2[coro_send_ex]
        G3[coro_close]
        G4[coro_throw]
    end

    subgraph 解释器["Python/ceval.c"]
        E1[_PyEval_EvalFrame]
        E2[_PyFrame_PushUnchecked]
        E3[_PyFrame_Pop]
    end

    subgraph 栈帧管理["Python/frame.c"]
        F1[_PyInterpreterFrame]
        F2[frame_state]
        F3[frame->previous]
    end

    subgraph 异常系统["Python/errors.c"]
        X1[StopIteration]
        X2[GeneratorExit]
        X3[异常传播]
    end

    C1 --> C2
    C2 --> C3
    C3 --> B1

    B1 --> G1
    B2 --> G2
    B3 --> G2

    G2 --> E1
    E1 --> F1
    F1 --> F2

    E1 --> B1
    E1 --> B2
    E1 --> B3

    G2 --> X1
    G3 --> X2

    style 编译模块 fill:#ffe1cc
    style 字节码模块 fill:#e1ffe1
    style 协程对象 fill:#e1f5ff
    style 解释器 fill:#fff3e1
    style 栈帧管理 fill:#ffe1f3
    style 异常系统 fill:#f0e1ff

协程核心API源码深度剖析

协程对象创建:PyCoroObject_New

// Objects/genobject.c

PyObject *
_PyCoroObject_New(PyThreadState *tstate, PyCodeObject *code,
                  PyObject *closure, PyObject *name, PyObject *qualname)
{
    // 1. 分配协程对象
    PyCoroObject *coro = PyObject_GC_New(PyCoroObject, &PyCoro_Type);
    if (coro == NULL) {
        return NULL;
    }

    // 2. 初始化栈帧
    _PyInterpreterFrame *frame = (_PyInterpreterFrame *)coro->cr_iframe;

    // 设置代码对象
    frame->f_code = Py_NewRef(code);
    frame->f_executable = Py_NewRef(code);

    // 设置闭包和全局变量
    frame->f_builtins = Py_NewRef(tstate->interp->builtins);
    frame->f_globals = Py_NewRef(((PyFunctionObject *)closure)->func_globals);
    frame->f_locals = NULL;

    // 3. 初始化栈指针
    frame->stackpointer = frame->localsplus + code->co_nlocalsplus;

    // 4. 设置协程特定字段
    coro->cr_name = Py_NewRef(name);
    coro->cr_qualname = Py_NewRef(qualname);
    coro->cr_frame_state = FRAME_CREATED;  // 初始状态
    coro->cr_origin_or_finalizer = NULL;
    coro->cr_exc_state_value = NULL;

    // 5. 注册到GC
    PyObject_GC_Track(coro);

    return (PyObject *)coro;
}

函数功能

  • 分配PyCoroObject结构体
  • 初始化_PyInterpreterFrame栈帧
  • 设置代码对象和闭包
  • 配置栈指针和局部变量
  • 设置初始状态为CREATED

调用链

async def函数调用
  └─> _PyFunction_Vectorcall
        └─> coro_wrapper_new
              └─> _PyCoroObject_New
                    ├─> PyObject_GC_New (分配内存)
                    ├─> 初始化frame结构
                    └─> PyObject_GC_Track (注册GC)

协程执行:coro_send_ex

// Objects/genobject.c

static PyObject *
coro_send_ex(PyCoroObject *coro, PyObject *arg, int exc, int closing)
{
    PyThreadState *tstate = _PyThreadState_GET();
    _PyInterpreterFrame *frame = (_PyInterpreterFrame *)coro->cr_iframe;
    PyObject *result;

    // 1. 状态检查
    if (coro->cr_frame_state == FRAME_CREATED) {
        // 首次调用,不能发送非None值
        if (arg != NULL && arg != Py_None) {
            PyErr_SetString(PyExc_TypeError,
                "can't send non-None value to a just-started coroutine");
            return NULL;
        }
        arg = Py_None;
    }
    else if (coro->cr_frame_state == FRAME_COMPLETED) {
        // 协程已完成
        if (closing) {
            PyErr_SetNone(PyExc_StopIteration);
        }
        else {
            PyErr_SetString(PyExc_StopIteration, "coroutine has already finished");
        }
        return NULL;
    }

    // 2. 保存当前栈帧链
    frame->previous = tstate->current_frame;

    // 3. 设置为执行状态
    coro->cr_frame_state = FRAME_EXECUTING;
    tstate->current_frame = frame;

    // 4. 如果有发送值,压入栈
    if (arg != Py_None) {
        // 将arg压入执行栈顶
        // 这个值会被SEND指令的结果接收
        *frame->stackpointer = PyStackRef_FromPyObjectNew(arg);
        frame->stackpointer++;
    }

    // 5. 恢复执行协程代码
    result = _PyEval_EvalFrame(tstate, frame, exc);

    // 6. 恢复栈帧链
    tstate->current_frame = frame->previous;
    frame->previous = NULL;

    // 7. 处理执行结果
    if (result != NULL) {
        // yield了一个值,协程暂停
        coro->cr_frame_state = FRAME_SUSPENDED;
    }
    else {
        // 抛出了异常或return
        if (PyErr_ExceptionMatches(PyExc_StopIteration)) {
            // 正常return,协程完成
            coro->cr_frame_state = FRAME_COMPLETED;
            // 清理栈帧
            _PyEval_FrameClearAndPop(tstate, frame);
        }
        else if (PyErr_ExceptionMatches(PyExc_GeneratorExit)) {
            // close()调用
            coro->cr_frame_state = FRAME_COMPLETED;
            _PyEval_FrameClearAndPop(tstate, frame);
        }
        else {
            // 其他异常,标记为完成
            coro->cr_frame_state = FRAME_COMPLETED;
        }
    }

    return result;
}

执行流程详解

  1. 状态检查:验证协程当前状态是否可以执行send
  2. 栈帧链接:保存当前栈帧,建立调用链
  3. 状态切换:将协程状态设置为EXECUTING
  4. 传递值:将send的值压入栈顶
  5. 执行字节码:调用_PyEval_EvalFrame恢复执行
  6. 恢复栈帧:恢复调用者的栈帧
  7. 处理结果:根据执行结果更新协程状态

调用链

协程.send(value)
  └─> coro_send (Python方法)
        └─> coro_send_ex
              ├─> 状态检查和验证
              ├─> 栈帧链接
              ├─> _PyEval_EvalFrame (执行字节码)
              │     ├─> 执行指令直到YIELD_VALUE
              │     └─> 或执行到RETURN_VALUE
              └─> 状态更新和清理

GET_AWAITABLE字节码实现

// Python/bytecodes.c

inst(GET_AWAITABLE, (iterable -- iter)) {
    // 1. 检查是否已经是awaitable(协程或生成器)
    if (PyCoro_CheckExact(iterable) || PyGen_CheckExact(iterable)) {
        // 协程和生成器本身就是awaitable
        iter = iterable;
    }
    else {
        // 2. 尝试调用__await__方法
        PyObject *await_meth = _PyObject_LookupSpecial(iterable, &_Py_ID(__await__));

        if (await_meth != NULL) {
            // 调用__await__()获取迭代器
            iter = _PyObject_CallNoArgs(await_meth);
            Py_DECREF(await_meth);

            if (iter == NULL) {
                GOTO_ERROR(error);
            }
            Py_DECREF(iterable);

            // 3. 验证返回的是迭代器
            if (!PyIter_Check(iter)) {
                PyErr_Format(PyExc_TypeError,
                    "__await__() returned non-iterator of type '%.100s'",
                    Py_TYPE(iter)->tp_name);
                Py_DECREF(iter);
                GOTO_ERROR(error);
            }
        }
        else if (PyErr_Occurred()) {
            GOTO_ERROR(error);
        }
        else {
            // 4. 不是awaitable对象
            PyErr_Format(PyExc_TypeError,
                "object %.100s can't be used in 'await' expression",
                Py_TYPE(iterable)->tp_name);
            Py_DECREF(iterable);
            GOTO_ERROR(error);
        }
    }
}

指令执行流程图

flowchart TD
    Start[GET_AWAITABLE指令] --> CheckType{检查类型}

    CheckType -->|协程| UseDirect[直接使用]
    CheckType -->|生成器| UseDirect
    CheckType -->|其他| LookupAwait[查找__await__]

    LookupAwait --> HasAwait{有__await__?}

    HasAwait -->|是| CallAwait[调用__await__]
    HasAwait -->|否| RaiseError[抛出TypeError]

    CallAwait --> CheckIter{返回迭代器?}

    CheckIter -->|是| Success[压入栈]
    CheckIter -->|否| RaiseError

    UseDirect --> Success
    Success --> End[继续执行]
    RaiseError --> Error[错误处理]

    style Start fill:#e1f5ff
    style Success fill:#e1ffe1
    style RaiseError fill:#ffe1e1

完整调用链:从用户代码到底层实现

示例代码

import asyncio

async def fetch_data(url):
    # 模拟异步HTTP请求
    await asyncio.sleep(1)
    return f"Data from {url}"

async def main():
    result = await fetch_data("https://example.com")
    print(result)

asyncio.run(main())

完整调用链(13层)

第1层:用户代码
asyncio.run(main())

第2层:事件循环创建 (Lib/asyncio/runners.py)
  └─> run(main())
        ├─> loop = asyncio.new_event_loop()
        └─> loop.run_until_complete(main())

第3层:协程启动 (Lib/asyncio/base_events.py)
  └─> BaseEventLoop.run_until_complete(main())
        ├─> task = ensure_future(main())
        │     └─> Task.__init__(main())
        └─> loop.run_forever()

第4层:Task调度 (Lib/asyncio/tasks.py)
  └─> Task._step()
        └─> result = coro.send(None)

第5层:协程send方法 (Objects/genobject.c)
  └─> coro_send(coro, arg)
        └─> coro_send_ex(coro, arg, 0, 0)
              ├─> frame->previous = tstate->current_frame
              ├─> coro->cr_frame_state = FRAME_EXECUTING
              └─> _PyEval_EvalFrame(tstate, frame, 0)

第6层:字节码解释器 (Python/ceval.c)
  └─> _PyEval_EvalFrame(tstate, frame, throwflag)
        └─> 主解释循环
              ├─> LOAD_GLOBAL "fetch_data"
              ├─> LOAD_CONST "https://example.com"
              ├─> CALL_FUNCTION 1
              │     └─> PyCoroObject_New()
              └─> GET_AWAITABLE

第7层:GET_AWAITABLE指令 (Python/bytecodes.c)
  └─> inst(GET_AWAITABLE, (iterable -- iter))
        ├─> PyCoro_CheckExact(iterable)
        └─> iter = iterable

第8层:SEND指令 (Python/bytecodes.c)
  └─> inst(SEND, (receiver, v -- receiver, retval))
        └─> gen_send_ex2(fetch_data_coro, None, ...)
              └─> _PyEval_EvalFrame(fetch_data_frame)
                    ├─> LOAD_GLOBAL "asyncio"
                    ├─> LOAD_ATTR "sleep"
                    ├─> CALL_FUNCTION 1
                    │     └─> Future对象
                    └─> GET_AWAITABLE
                          └─> Future.__await__()

第9层:返回到事件循环
  └─> Task._step()
        ├─> result = Future对象
        ├─> Future.add_done_callback(Task._wakeup)
        └─> 等待

第10层:Future完成
  └─> Future.set_result(result)
        └─> Task._wakeup()
              └─> loop.call_soon(Task._step)

第11层:恢复执行
  └─> Task._step()
        └─> coro.send(sleep_result)
              └─> coro_send_ex(main_coro, sleep_result, ...)
                    └─> SEND指令
                          └─> fetch_data_coro继续

第12层:完成返回
  └─> RETURN_VALUE
        └─> StopIteration("Data from ...")
              └─> SEND捕获并提取value

第13层:事件循环退出
  └─> loop.run_until_complete完成
        └─> loop.close()

await表达式执行详细时序图

sequenceDiagram
    autonumber
    participant User as 用户代码<br/>await expr
    participant Compiler as 编译器
    participant Bytecode as 字节码
    participant GetAwait as GET_AWAITABLE
    participant SendInst as SEND指令
    participant SubCoro as 子协程
    participant Loop as 事件循环

    rect rgb(230, 240, 255)
    note over User,Compiler: 编译阶段
    User->>Compiler: await fetch_data()
    Compiler->>Compiler: 生成代码:<br/>CALL fetch_data<br/>GET_AWAITABLE<br/>LOAD_CONST None<br/>SEND
    Compiler-->>Bytecode: 字节码序列
    end

    rect rgb(255, 245, 230)
    note over Bytecode,SubCoro: 执行阶段1: 获取awaitable
    Bytecode->>GetAwait: 执行GET_AWAITABLE
    GetAwait->>GetAwait: 检查栈顶对象

    alt 是协程对象
        GetAwait->>GetAwait: 直接使用
    else 有__await__
        GetAwait->>SubCoro: 调用__await__()
        SubCoro-->>GetAwait: 返回迭代器
    else 不支持await
        GetAwait->>GetAwait: 抛出TypeError
    end

    GetAwait-->>Bytecode: awaitable对象
    end

    rect rgb(240, 255, 240)
    note over Bytecode,Loop: 执行阶段2: 委托执行
    Bytecode->>SendInst: 执行SEND<br/>发送None
    SendInst->>SubCoro: coro.send(None)
    SubCoro->>SubCoro: 执行子协程代码

    loop 子协程执行
        SubCoro->>SubCoro: 执行指令

        alt 遇到yield/await
            SubCoro-->>SendInst: yield value
            SendInst-->>Bytecode: value (继续循环)
            Bytecode-->>Loop: 返回value
        else 遇到return
            SubCoro->>SubCoro: StopIteration(result)
            SubCoro-->>SendInst: 异常
            SendInst->>SendInst: 捕获StopIteration
            SendInst->>SendInst: 提取result
            SendInst-->>Bytecode: result
        end
    end
    end

    rect rgb(255, 240, 240)
    note over Loop,Bytecode: 执行阶段3: 恢复执行
    Loop->>SendInst: 事件完成<br/>send(result)
    SendInst->>SubCoro: 恢复执行
    SubCoro->>SubCoro: 继续执行
    SubCoro-->>SendInst: return value
    SendInst-->>Bytecode: value
    Bytecode->>User: await结果
    end

协程对象生命周期状态图

stateDiagram-v2
    [*] --> CREATED: PyCoroObject_New()

    CREATED --> EXECUTING: send(None)<br/>首次启动
    EXECUTING --> SUSPENDED: yield/await<br/>暂停执行
    SUSPENDED --> EXECUTING: send(value)<br/>恢复执行
    EXECUTING --> COMPLETED: return<br/>正常结束
    SUSPENDED --> COMPLETED: close()<br/>强制关闭
    EXECUTING --> COMPLETED: 异常<br/>异常终止
    COMPLETED --> [*]: 对象销毁

    note right of CREATED
        cr_frame_state = FRAME_CREATED
        栈帧已分配但未执行
        awaiting first send
    end note

    note right of EXECUTING
        cr_frame_state = FRAME_EXECUTING
        正在执行字节码
        current_frame指向此栈帧
    end note

    note right of SUSPENDED
        cr_frame_state = FRAME_SUSPENDED
        栈帧状态已保存
        等待send恢复
    end note

    note right of COMPLETED
        cr_frame_state = FRAME_COMPLETED
        栈帧已清理
        无法再恢复
    end note