📅 2026-05-26 👤 张伟 🏷️ Python 3.11+ · asyncio · 并发

Python asyncio 深度剖析:事件循环实现与协程调度

从事件循环的底层实现到 Task 调度机制,深入解析 await 表达式解析原理、嵌套事件循环的挑战,以及 asyncio 中容易被忽视的内存泄漏陷阱。

1. 事件循环原理

1.1 事件循环架构概览

asyncio 的核心是事件循环(Event Loop),它是一个基于 selector 的单线程调度器,通过 I/O 多路复用机制实现高效的并发。以下是事件循环的核心组件架构:

EventLoop
Tasks (TaskScheduler)
Coroutines


Selector (I/O Multiplexing)
Socket I/O (non-blocking)

1.2 事件循环启动流程

当调用 asyncio.run() 时,Python 会创建一个全新的事件循环对象,并在其中执行主协程。以下是关键步骤:

asyncio.run() 核心逻辑
# asyncio/src/asyncio/runners.py
def run(coro, *, debug=False):
    # 1. 创建事件循环
    loop = events.new_event_loop()
    try:
        # 2. 设置调试模式
        loop.set_debug(debug)
        
        # 3. 创建主任务并运行
        loop.run_until_complete(_core_old_loop_run(coro))
    finally:
        # 4. 关闭事件循环
        loop.close()

# 事件循环内部结构
class AbstractEventLoop:
    # 任务队列(FIFO)
    _ready: deque[Callback]
    
    # 定时器堆(按执行时间排序)
    _scheduled: deque[TimerHandle]
    
    # I/O 多路复用选择器
    _selector: selector.BaseSelector
    
    # 所有打开的文件描述符映射
    _ssock, _csock  # 用于信号处理

1.3 I/O 多路复用机制

asyncio 使用 selectors 模块实现平台无关的 I/O 多路复用。在 Linux 上实际使用 epoll,在 macOS 上使用 kqueue,在 Windows 上使用 select(效率较低)。

Selector 注册与回调流程
import asyncio
import selectors

# 模拟事件循环的 selector 机制
async def fetch(reader, nbytes):
    # 内部实际调用 selector.select() 监听就绪事件
    data = await reader.read(nbytes)
    return data

# epoll 的事件类型
# EPOLLIN  - 可读
# EPOLLOUT - 可写
# EPOLLERR - 错误
# EPOLLHUP - 挂起

selector.register(fileobj, events, data)
selector.select(timeout)  # 阻塞直到事件就绪
💡 关键洞察

事件循环的 select() 调用是阻塞式的——当没有任何 I/O 就绪时,它会等待直到超时或收到信号。这解释了为何 asyncio 是单线程却能并发处理大量 I/O 操作。

2. Task 调度机制

2.1 Task 的创建与状态机

asyncio.Task 是协程的包装器,实现了协程的调度与取消。每个 Task 都有一个内部状态机:

Task 状态转换
# Task 的五种状态
# CREATED    - 刚创建,未调度
# WAITING    - 等待某个 Future/Coroutine 完成
# READY      - 已就绪,等待事件循环执行
# FINISHED   - 执行完成
# CANCELLED  - 被取消

async def worker():
    result = await something()
    return result

# 创建 Task
task = asyncio.create_task(worker())
# 内部状态: Task CREATED

# Task 被调度时状态
# _step() 被调用 -> READY -> WAITING

2.2 Task 调度队列

事件循环维护一个 _ready 队列(双端队列),所有已就绪的 Task 都会被放入这个队列。事件循环每轮迭代从队列中取出任务执行。

Task 调度核心逻辑
# 事件循环主循环伪代码
def run_once():
    # 1. 处理所有到期的定时器
    for handle in _timers:
        handle._callback(*handle._args)
    
    # 2. 从就绪队列取任务执行
    while _ready:
        handle = _ready.popleft()
        try:
            handle._callback(*handle._args)
        except Exception as e:
            # 异常处理
            loop.call_exception_handler({'exception': e})
    
    # 3. I/O 就绪事件处理
    events = selector.select(timeout)
    for key, mask in events:
        key.data(key.fileobj, mask)

2.3 create_task vs ensure_future

asyncio.create_task()asyncio.ensure_future() 都用于将协程包装成 Task,但行为略有不同:

维度 create_task() ensure_future()
返回值 Task(直接) Future(可能是Task或协程)
参数校验 必须是协程对象 接受协程、Future 或 awaitable
异常抛出 参数不是协程时立即抛出 延迟到 await 时
推荐场景 大多数场景(首选) 需要兼容非协程 awaitable 时

3. await 表达式解析

3.1 await 的编译时变换

当 Python 解释器遇到 await 表达式时,会将其编译成一系列 YIELD FROM 操作码(opcode)。这使得协程可以暂停和恢复执行。

await 的字节码解析
import dis

async def example():
    result = await asyncio.sleep(1)
    return result

# 查看字节码
dis.dis(example)

# 输出:
# LOAD_GLOBAL     0 (asyncio)

3.2 协程对象的内部状态

协程函数被调用时返回一个协程对象,它包含执行所需的全部状态:

协程对象的 cr_ 属性
async def demo(x):
    y = await other_coro(x)
    return y

coro = demo(10)

# 协程对象的关键属性
# coro.cr_code     - 字节码对象
# coro.cr_frame    - 当前栈帧
# coro.cr_running  - 是否正在运行
# coro.cr_await    - 当前等待的 awaitable(用于调试)

# 协程执行流程
# 1. 创建协程对象(cr_frame = None)
# 2. send(None) 启动协程,开始执行字节码
# 3. 遇到 await expr 时:
#    - 调用 expr.__await__() 获取迭代器
#    - YIELD FROM 该迭代器
# 4. 迭代器完成时,调用 coroutine.send(None) 恢复

3.3 awaitable 协议

任何实现了 __await__ 方法的对象都可以被 await。asyncio 构建了一套 awaitable 层次结构:

Awaitable 协议层次
# awaitable 协议
class Awaitable:
    async def __await__(self): ...

# Future 是 Awaitable 的子类
class Future(Awaitable):
    def result(self): ...
    def set_result(self, value): ...
    def set_exception(self, exc): ...
    def cancel(self): bool

# Task 继承自 Future
class Task(Future):
    def get_name(self): ...
    def get_coro(self): ...

# 典型 awaitable 对象
# - asyncio.Task       (被 await)
# - asyncio.Future     (被 await)
# - GeneratorExit     (yield from 隐式 await)
# - trio.Nursery      (外部库实现)

4. 嵌套事件循环

4.1 嵌套循环的挑战

在同一个线程中嵌套运行事件循环是一个常见陷阱。Python 的事件循环是单例的,在一个线程中同时运行两个事件循环会导致冲突。

嵌套事件循环的陷阱
# 错误示例:嵌套 run()
async def inner():
    return 42

async def outer():
    # 错误!在一个事件循环中启动另一个
    result = asyncio.run(inner())
    return result

# RuntimeError: asyncio.run() cannot be called from a running event loop

# 正确做法:使用 nest_asyncio
import nest_asyncio
nest_asyncio.apply()

# 或者使用 loop.run_until_complete 明确嵌套

4.2 Jupyter/IPython 中的嵌套循环

Jupyter Notebook 和 IPython 在 REPL 中已经运行一个事件循环,这导致用户无法直接调用 asyncio.run()。解决方案是使用 nest_asyncioawait 主协程。

Jupyter 环境下的解决方案
# 方案1:使用 await 直接运行协程
await my_coro()

# 方案2:使用 nest_asyncio
import nest_asyncio
nest_asyncio.apply()

# 方案3:在 async context 中使用 run_until_complete
loop = asyncio.get_event_loop()
result = loop.run_until_complete(my_coro())

# 方案4:asyncio.run() 带明确事件循环
async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    # 在同一循环中创建子协程
    tasks = [asyncio.create_task(coro()) for coro in coros]
    results = await asyncio.gather(*tasks)
    return results

4.3 跨平台嵌套注意事项

⚠️ Windows 平台的特殊限制

在 Windows 上,SelectorEventLoop 不支持通过 add_reader/add_writer 注册文件描述符。如果需要在 Windows 上处理管道/子进程,需要使用 ProactorEventLoop 或切换到 asyncio.SelectorEventLoop 配合进程。

5. 内存泄漏陷阱

5.1 常见内存泄漏模式

asyncio 程序中常见的内存泄漏通常与未正确清理的 Task、Future 或循环引用有关:

❌ 泄漏模式

  • 创建 Task 但永不 await/ancel
  • 在长生命周期对象中注册短生命周期回调
  • 事件循环中的强引用闭包捕获外部变量
  • 未关闭的 StreamReader/StreamWriter

✅ 正确做法

  • 使用 TaskGroup(3.11+)管理生命周期
  • 显式 task.cancel() 后 await 其完成
  • 使用弱引用或显式清理函数
  • 使用 async with 确保资源释放

5.2 未取消的 Task 泄漏

Task 泄漏示例
# 危险:创建大量永不清理的 Task
async def background_task():
    while True:
        await asyncio.sleep(60)
        do_something()

async def handle_request():
    # 每次请求创建后台任务,但从不取消
    asyncio.create_task(background_task())  # 泄漏!
    return "OK"

# 正确做法:使用 TaskGroup 管理
async def handle_request_correct():
    async with TaskGroup() as tg:
        tg.create_task(background_task())
    # TaskGroup 退出时自动取消所有 Task
    return "OK"

5.3 循环引用导致的泄漏

循环引用泄漏
# 危险:闭包捕获导致循环引用
class DataProcessor:
    def __init__(self):
        self.data = []
    
    async def process(self, item):
        # 这里的 lambda 捕获了 self,形成引用循环
        loop = asyncio.get_event_loop()
        loop.call_later(10, lambda: self.data.append(item))
        
# 调试:使用 objgraph 检测泄漏
# pip install objgraph
# objgraph.show_backrefs([obj], file=sys.stdout)

# 正确做法:使用 weakref 或显式清理
import weakref

class DataProcessorFixed:
    def __init__(self):
        self.data = []
        self._callbacks = []
    
    def cleanup(self):
        for cb in self._callbacks:
            cb.cancel()
        self._callbacks.clear()

5.4 调试内存泄漏工具

内存泄漏调试工具
# 1. asyncio.all_tasks() 检测未完成的任务
pending = [t for t in asyncio.all_tasks() if not t.done()]
print(ff"Pending tasks: {len(pending)}")

# 2. 使用 tracemalloc 追踪内存分配
import tracemalloc
tracemalloc.start()

# 业务代码

snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
for stat in top_stats[:10]:
    print(stat)

# 3. 使用 objgraph 可视化引用链
# pip install objgraph
import objgraph
objgraph.show_backrefs(
    objgraph.at今夜(),
    max_depth=3,
    filename='refs.dot'
)
💡 最佳实践

在 Python 3.11+ 中,优先使用 TaskGroup 管理并发任务,它会在上下文退出时自动取消和清理所有子任务,大大减少泄漏风险。