Python asyncio 深度剖析:事件循环实现与协程调度
从事件循环的底层实现到 Task 调度机制,深入解析 await 表达式解析原理、嵌套事件循环的挑战,以及 asyncio 中容易被忽视的内存泄漏陷阱。
1. 事件循环原理
1.1 事件循环架构概览
asyncio 的核心是事件循环(Event Loop),它是一个基于 selector 的单线程调度器,通过 I/O 多路复用机制实现高效的并发。以下是事件循环的核心组件架构:
1.2 事件循环启动流程
当调用 asyncio.run() 时,Python 会创建一个全新的事件循环对象,并在其中执行主协程。以下是关键步骤:
# asyncio/src/asyncio/runners.py
def run(coro, *, debug=False):
# 1. 创建事件循环
loop = events.new_event_loop()
try:
# 2. 设置调试模式
loop.set_debug(debug)
# 3. 创建主任务并运行
loop.run_until_complete(_core_old_loop_run(coro))
finally:
# 4. 关闭事件循环
loop.close()
# 事件循环内部结构
class AbstractEventLoop:
# 任务队列(FIFO)
_ready: deque[Callback]
# 定时器堆(按执行时间排序)
_scheduled: deque[TimerHandle]
# I/O 多路复用选择器
_selector: selector.BaseSelector
# 所有打开的文件描述符映射
_ssock, _csock # 用于信号处理
1.3 I/O 多路复用机制
asyncio 使用 selectors 模块实现平台无关的 I/O 多路复用。在 Linux 上实际使用 epoll,在 macOS 上使用 kqueue,在 Windows 上使用 select(效率较低)。
import asyncio
import selectors
# 模拟事件循环的 selector 机制
async def fetch(reader, nbytes):
# 内部实际调用 selector.select() 监听就绪事件
data = await reader.read(nbytes)
return data
# epoll 的事件类型
# EPOLLIN - 可读
# EPOLLOUT - 可写
# EPOLLERR - 错误
# EPOLLHUP - 挂起
selector.register(fileobj, events, data)
selector.select(timeout) # 阻塞直到事件就绪
事件循环的 select() 调用是阻塞式的——当没有任何 I/O 就绪时,它会等待直到超时或收到信号。这解释了为何 asyncio 是单线程却能并发处理大量 I/O 操作。
2. Task 调度机制
2.1 Task 的创建与状态机
asyncio.Task 是协程的包装器,实现了协程的调度与取消。每个 Task 都有一个内部状态机:
# Task 的五种状态
# CREATED - 刚创建,未调度
# WAITING - 等待某个 Future/Coroutine 完成
# READY - 已就绪,等待事件循环执行
# FINISHED - 执行完成
# CANCELLED - 被取消
async def worker():
result = await something()
return result
# 创建 Task
task = asyncio.create_task(worker())
# 内部状态: Task CREATED
# Task 被调度时状态
# _step() 被调用 -> READY -> WAITING
2.2 Task 调度队列
事件循环维护一个 _ready 队列(双端队列),所有已就绪的 Task 都会被放入这个队列。事件循环每轮迭代从队列中取出任务执行。
# 事件循环主循环伪代码
def run_once():
# 1. 处理所有到期的定时器
for handle in _timers:
handle._callback(*handle._args)
# 2. 从就绪队列取任务执行
while _ready:
handle = _ready.popleft()
try:
handle._callback(*handle._args)
except Exception as e:
# 异常处理
loop.call_exception_handler({'exception': e})
# 3. I/O 就绪事件处理
events = selector.select(timeout)
for key, mask in events:
key.data(key.fileobj, mask)
2.3 create_task vs ensure_future
asyncio.create_task() 和 asyncio.ensure_future() 都用于将协程包装成 Task,但行为略有不同:
| 维度 | create_task() | ensure_future() |
|---|---|---|
| 返回值 | Task(直接) | Future(可能是Task或协程) |
| 参数校验 | 必须是协程对象 | 接受协程、Future 或 awaitable |
| 异常抛出 | 参数不是协程时立即抛出 | 延迟到 await 时 |
| 推荐场景 | 大多数场景(首选) | 需要兼容非协程 awaitable 时 |
3. await 表达式解析
3.1 await 的编译时变换
当 Python 解释器遇到 await 表达式时,会将其编译成一系列 YIELD FROM 操作码(opcode)。这使得协程可以暂停和恢复执行。
import dis
async def example():
result = await asyncio.sleep(1)
return result
# 查看字节码
dis.dis(example)
# 输出:
# LOAD_GLOBAL 0 (asyncio)
3.2 协程对象的内部状态
协程函数被调用时返回一个协程对象,它包含执行所需的全部状态:
async def demo(x):
y = await other_coro(x)
return y
coro = demo(10)
# 协程对象的关键属性
# coro.cr_code - 字节码对象
# coro.cr_frame - 当前栈帧
# coro.cr_running - 是否正在运行
# coro.cr_await - 当前等待的 awaitable(用于调试)
# 协程执行流程
# 1. 创建协程对象(cr_frame = None)
# 2. send(None) 启动协程,开始执行字节码
# 3. 遇到 await expr 时:
# - 调用 expr.__await__() 获取迭代器
# - YIELD FROM 该迭代器
# 4. 迭代器完成时,调用 coroutine.send(None) 恢复
3.3 awaitable 协议
任何实现了 __await__ 方法的对象都可以被 await。asyncio 构建了一套 awaitable 层次结构:
# awaitable 协议
class Awaitable:
async def __await__(self): ...
# Future 是 Awaitable 的子类
class Future(Awaitable):
def result(self): ...
def set_result(self, value): ...
def set_exception(self, exc): ...
def cancel(self): bool
# Task 继承自 Future
class Task(Future):
def get_name(self): ...
def get_coro(self): ...
# 典型 awaitable 对象
# - asyncio.Task (被 await)
# - asyncio.Future (被 await)
# - GeneratorExit (yield from 隐式 await)
# - trio.Nursery (外部库实现)
4. 嵌套事件循环
4.1 嵌套循环的挑战
在同一个线程中嵌套运行事件循环是一个常见陷阱。Python 的事件循环是单例的,在一个线程中同时运行两个事件循环会导致冲突。
# 错误示例:嵌套 run()
async def inner():
return 42
async def outer():
# 错误!在一个事件循环中启动另一个
result = asyncio.run(inner())
return result
# RuntimeError: asyncio.run() cannot be called from a running event loop
# 正确做法:使用 nest_asyncio
import nest_asyncio
nest_asyncio.apply()
# 或者使用 loop.run_until_complete 明确嵌套
4.2 Jupyter/IPython 中的嵌套循环
Jupyter Notebook 和 IPython 在 REPL 中已经运行一个事件循环,这导致用户无法直接调用 asyncio.run()。解决方案是使用 nest_asyncio 或 await 主协程。
# 方案1:使用 await 直接运行协程
await my_coro()
# 方案2:使用 nest_asyncio
import nest_asyncio
nest_asyncio.apply()
# 方案3:在 async context 中使用 run_until_complete
loop = asyncio.get_event_loop()
result = loop.run_until_complete(my_coro())
# 方案4:asyncio.run() 带明确事件循环
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 在同一循环中创建子协程
tasks = [asyncio.create_task(coro()) for coro in coros]
results = await asyncio.gather(*tasks)
return results
4.3 跨平台嵌套注意事项
在 Windows 上,SelectorEventLoop 不支持通过 add_reader/add_writer 注册文件描述符。如果需要在 Windows 上处理管道/子进程,需要使用 ProactorEventLoop 或切换到 asyncio.SelectorEventLoop 配合进程。
5. 内存泄漏陷阱
5.1 常见内存泄漏模式
asyncio 程序中常见的内存泄漏通常与未正确清理的 Task、Future 或循环引用有关:
❌ 泄漏模式
- 创建 Task 但永不 await/ancel
- 在长生命周期对象中注册短生命周期回调
- 事件循环中的强引用闭包捕获外部变量
- 未关闭的 StreamReader/StreamWriter
✅ 正确做法
- 使用
TaskGroup(3.11+)管理生命周期 - 显式
task.cancel()后 await 其完成 - 使用弱引用或显式清理函数
- 使用
async with确保资源释放
5.2 未取消的 Task 泄漏
# 危险:创建大量永不清理的 Task
async def background_task():
while True:
await asyncio.sleep(60)
do_something()
async def handle_request():
# 每次请求创建后台任务,但从不取消
asyncio.create_task(background_task()) # 泄漏!
return "OK"
# 正确做法:使用 TaskGroup 管理
async def handle_request_correct():
async with TaskGroup() as tg:
tg.create_task(background_task())
# TaskGroup 退出时自动取消所有 Task
return "OK"
5.3 循环引用导致的泄漏
# 危险:闭包捕获导致循环引用
class DataProcessor:
def __init__(self):
self.data = []
async def process(self, item):
# 这里的 lambda 捕获了 self,形成引用循环
loop = asyncio.get_event_loop()
loop.call_later(10, lambda: self.data.append(item))
# 调试:使用 objgraph 检测泄漏
# pip install objgraph
# objgraph.show_backrefs([obj], file=sys.stdout)
# 正确做法:使用 weakref 或显式清理
import weakref
class DataProcessorFixed:
def __init__(self):
self.data = []
self._callbacks = []
def cleanup(self):
for cb in self._callbacks:
cb.cancel()
self._callbacks.clear()
5.4 调试内存泄漏工具
# 1. asyncio.all_tasks() 检测未完成的任务
pending = [t for t in asyncio.all_tasks() if not t.done()]
print(ff"Pending tasks: {len(pending)}")
# 2. 使用 tracemalloc 追踪内存分配
import tracemalloc
tracemalloc.start()
# 业务代码
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
for stat in top_stats[:10]:
print(stat)
# 3. 使用 objgraph 可视化引用链
# pip install objgraph
import objgraph
objgraph.show_backrefs(
objgraph.at今夜(),
max_depth=3,
filename='refs.dot'
)
在 Python 3.11+ 中,优先使用 TaskGroup 管理并发任务,它会在上下文退出时自动取消和清理所有子任务,大大减少泄漏风险。