在 asyncio 中引入多进程(CPU 密集型任务协作)

将 CPU 密集型任务交由多进程处理,并与 asyncio 事件循环协作以提升吞吐与响应性。

分类: basics 难度: 中高级 更新: 2024-01-20
asyncio multiprocessing 并发 并行 进程池

在 asyncio 中引入多进程

当任务为 CPU 密集型时,单线程事件循环难以有效利用多核。此时可通过多进程并行计算,结合 asyncio 调度 IO 密集型部分,实现更好的吞吐与响应。

方式一:concurrent.futures + ProcessPoolExecutor

# -*- coding: utf-8 -*-
import asyncio
from concurrent.futures import ProcessPoolExecutor

# CPU 密集型函数(纯阻塞)
def cpu_bound(n: int) -> int:
    # 简单计算:斐波那契(递归版仅演示)
    if n < 2:
        return n
    return cpu_bound(n - 1) + cpu_bound(n - 2)

async def main():
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor(max_workers=4) as pool:
        tasks = [loop.run_in_executor(pool, cpu_bound, i) for i in range(25, 30)]
        results = await asyncio.gather(*tasks)
        print(results)

asyncio.run(main())

方式二:asyncio.to_thread vs 进程

  • asyncio.to_thread 适合 IO 阻塞或轻量 CPU 任务,受 GIL 限制无法并行多核。
  • CPU 密集建议使用 ProcessPoolExecutormultiprocessing 原生 API。

方式三:原生 multiprocessing 协作(简单示例)

from multiprocessing import Pool
import os

# 在子进程执行的纯函数

def work(x):
    # 这里避免调用协程/事件循环相关代码
    return x * x

if __name__ == '__main__':
    with Pool(processes=os.cpu_count()) as p:
        print(p.map(work, range(10)))

在 asyncio 应用中,可将计算任务封装为进程池提交;事件循环继续处理网络 IO,实现“计算-IO”解耦。

性能与注意事项

  • 进程间通信与序列化成本较高,宜用于粒度较大的任务。
  • 可重入/可序列化:提交到进程池的函数必须可被 pickle。
  • 在 Windows 上使用 if __name__ == "__main__": 保护入口,避免子进程递归创建。
  • 留意关闭时机:在应用退出前显式关闭进程池或使用上下文管理器。

相关阅读

作者: Python 编程指南

版本: 1.0

讨论与反馈

欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。