asyncio 高阶常用方法与事件循环精要
面向库/框架编写者的 asyncio 进阶指南:事件循环获取、运行与停止、回调调度、线程安全投递等高级用法。
asyncio 高阶常用方法与事件循环精要
事件循环是每个 asyncio 应用的核心,运行异步任务和回调,执行网络 IO 操作,以及运行子进程。应用开发者通常应当使用高层级的 asyncio.run(),应尽量少引用循环对象;本文聚焦库和框架开发者需要的底层控制能力。
获取事件循环
asyncio.get_running_loop():返回当前 OS 线程中正在运行的事件循环(只能在协程或回调中调用)。asyncio.get_event_loop():在协程或回调内与get_running_loop()等价;否则遵循策略返回/创建循环。asyncio.set_event_loop(loop):设置当前线程的事件循环。asyncio.new_event_loop():创建并返回一个新的事件循环对象。
由于策略较复杂,推荐在协程和回调中优先使用
get_running_loop()。
运行与停止事件循环
loop.run_until_complete(fut):运行直到Future/Task完成并返回结果;传入协程会被隐式封装成 Task。loop.run_forever():持续运行事件循环,直至loop.stop()被调用;停止后可再次run_forever()或run_until_complete()。
示例:
import asyncio
async def foo():
print("这是一个协程")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
print("开始运行协程")
coro = foo()
print("进入事件循环")
loop.run_until_complete(coro)
finally:
print("关闭事件循环")
loop.close()
带返回值:
import asyncio
async def foo():
print("这是一个协程")
return "返回值"
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
result = loop.run_until_complete(foo())
print(f"run_until_complete可以获取协程的{result},默认输出None")
finally:
loop.close()
安排回调(Callbacks)
loop.call_soon
在下一个事件循环迭代中立即调用回调,按注册顺序执行,每个回调仅调用一次。
import asyncio
def callback_foo(name):
print(f'callback_foo...{name}')
async def foo():
print('async foo...')
async def main(loop):
print('Start main...')
loop.call_soon(callback_foo, 'Jackson')
print('call_soon...')
await foo()
await asyncio.sleep(0.2)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
- 线程安全版本:
loop.call_soon_threadsafe()适用于跨线程投递回调。
loop.call_later
延时调度回调:loop.call_later(delay, callback, *args)。
import asyncio
def callback(n):
print(f"callback {n} invoked")
async def main(loop):
print("注册callbacks")
loop.call_later(0.2, callback, 1)
loop.call_later(0.1, callback, 2)
loop.call_soon(callback, 3)
await asyncio.sleep(0.4)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
线程池/进程池协作(简述)
loop.run_in_executor(executor, func, *args):在线程/进程池执行阻塞函数,返回Future;与asyncio.to_thread类似但更通用。loop.call_soon_threadsafe(cb, *args):在其它线程安全地投递回调。
详见:
最佳实践
- 常规应用优先使用
asyncio.run()管理事件循环的生命周期。 - 库/框架编写者才需要直接操作
loop,并确保在关闭阶段清理任务与句柄。 - 跨线程协作务必使用线程安全 API:
call_soon_threadsafe、run_coroutine_threadsafe。
讨论与反馈
欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。