进程池 Pool 深入指南
全面介绍 multiprocessing.Pool 的核心方法、参数、最佳实践与性能优化
进程池 Pool 深入指南
📝 概述
multiprocessing.Pool 通过维护一组工作进程来并行执行任务,提供了同步与异步两类接口(apply/map 系列)。本指南系统讲解 Pool 的方法、参数、注意事项与性能优化策略。
🎯 学习目标
- 理解 Pool 的工作原理与常用方法
- 熟练掌握同步/异步任务提交与结果收集
- 掌握
AsyncResult的使用 - 学会在实际项目中优化 Pool 的性能
📋 前置知识
- Python 函数与异常处理
- multiprocessing 基础
🔍 详细内容
1. 核心方法总览
apply(func, args, kwds):同步调用apply_async(func, args, kwds, callback, error_callback):异步调用map(func, iterable, chunksize):同步批量映射map_async(func, iterable, chunksize, callback, error_callback):异步批量映射imap(func, iterable, chunksize):惰性有序迭代imap_unordered(func, iterable, chunksize):惰性无序迭代close():关闭池,不再接收新任务terminate():立即终止工作进程join():等待子进程退出(需 close/terminate 后)
2. 基本使用
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == '__main__':
with Pool(processes=4) as pool:
results = pool.map(square, range(10))
print(results)
3. 异步提交与回调
from multiprocessing import Pool
def work(x):
return x + 100
def on_ok(result):
print("回调成功:", result)
def on_err(e):
print("回调异常:", e)
if __name__ == '__main__':
with Pool(4) as pool:
async_res = pool.apply_async(work, (1,), callback=on_ok, error_callback=on_err)
print("ready?", async_res.ready())
print("result:", async_res.get(timeout=3))
4. AsyncResult 对象
res = pool.apply_async(work, (5,))
print(res.ready())
print(res.successful())
print(res.get(timeout=2))
5. 关键参数
processes:工作进程数量,默认os.cpu_count()maxtasksperchild:每个进程在处理一定数量任务后重启(缓解内存泄漏)initializer/initargs:子进程启动时执行的初始化函数及其参数chunksize:批处理切块大小
from multiprocessing import Pool
def init_worker():
print("初始化子进程")
if __name__ == '__main__':
with Pool(processes=4, maxtasksperchild=200, initializer=init_worker) as pool:
for r in pool.imap_unordered(square, range(100), chunksize=5):
print(r)
6. 注意事项与陷阱
- 避免共享可变全局状态,使用参数传递或
Manager - 传参与返回值必须可被
pickle序列化 - Windows 平台必须使用
if __name__ == '__main__':保护 - 及时释放资源:
with或close()/join()
7. 性能优化建议
- 合理设置
processes(一般与 CPU 核心数相当) - 对大量小任务使用合适的
chunksize - 使用
maxtasksperchild限制长期运行造成的内存膨胀 - 避免在回调里执行耗时操作
💡 实际应用
大文件并行处理
from multiprocessing import Pool
from pathlib import Path
def line_count(path: Path) -> int:
return sum(1 for _ in path.open('r', encoding='utf-8', errors='ignore'))
if __name__ == '__main__':
files = list(Path('.').rglob('*.log'))
with Pool() as pool:
counts = pool.map(line_count, files, chunksize=10)
print('总行数:', sum(counts))
⚠️ 注意事项
- 进程池任务异常会在
get()时重新抛出 - 回调函数在主进程中执行,注意线程安全
- 关闭顺序:先
close()再join()
🔗 相关内容
📚 扩展阅读
🏷️ 标签
Pool 进程池 multiprocessing AsyncResult
最后更新: 2024-01-15
作者: Python 编程指南
版本: 1.0
讨论与反馈
欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。