Python 进程间通信(IPC)
掌握 Python 中进程间通信的各种机制,包括管道、队列和同步原语
Python 进程间通信(IPC)
📝 概述
由于进程间不共享内存空间,需要特殊的通信机制进行数据交换。Python multiprocessing 模块提供了多种进程间通信(IPC)方式,包括管道(Pipe)、队列(Queue)和同步原语(Lock、RLock)等。
🎯 学习目标
- 掌握管道(Pipe)的使用和注意事项
- 理解各种队列(Queue、SimpleQueue、JoinableQueue)的特点
- 学会使用多进程同步原语
- 了解并发处理中的安全问题和解决方案
- 掌握进程间数据传递的最佳实践
📋 前置知识
- 多进程编程基础
- Python 异常处理
- 并发编程概念
- 序列化与反序列化(pickle)
🔍 详细内容
1. 管道(Pipe)
管道提供了两个进程间的双向通信通道。
基本使用
from multiprocessing import Process, Pipe
def sender(conn):
"""发送数据的进程"""
conn.send(['数据', 42, None])
conn.send('你好,进程通信!')
conn.close()
def receiver(conn):
"""接收数据的进程"""
print("接收到:", conn.recv()) # ['数据', 42, None]
print("接收到:", conn.recv()) # '你好,进程通信!'
conn.close()
if __name__ == '__main__':
# 创建管道
parent_conn, child_conn = Pipe()
# 创建进程
p1 = Process(target=sender, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
Connection 对象的属性和方法
from multiprocessing import Pipe
import os
def explore_connection():
"""探索 Connection 对象的属性和方法"""
parent_conn, child_conn = Pipe()
print("Connection 对象属性:")
print(f"可读: {parent_conn.readable}")
print(f"可写: {parent_conn.writable}")
print(f"文件描述符: {parent_conn.fileno()}")
# 主要方法
print("\nConnection 对象方法:")
print("send(obj) - 发送对象")
print("recv() - 接收对象")
print("send_bytes(buffer) - 发送字节")
print("recv_bytes() - 接收字节")
print("poll([timeout]) - 检查是否有数据可读")
print("close() - 关闭连接")
if __name__ == '__main__':
explore_connection()
管道的并发问题
from multiprocessing import Process, Pipe
import time
import random
def concurrent_sender(conn, sender_id, count=5):
"""并发发送数据"""
for i in range(count):
message = f"发送者{sender_id}: 消息{i}"
conn.send(message)
print(f"[发送] {message}")
time.sleep(random.uniform(0.1, 0.5))
conn.close()
def concurrent_receiver(conn, total_messages):
"""并发接收数据"""
received = 0
while received < total_messages:
try:
if conn.poll(1): # 超时1秒
message = conn.recv()
print(f"[接收] {message}")
received += 1
else:
print("接收超时...")
break
except EOFError:
print("管道已关闭")
break
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
# 创建多个发送进程
sender_count = 3
message_per_sender = 3
total_messages = sender_count * message_per_sender
senders = []
for i in range(sender_count):
p = Process(target=concurrent_sender,
args=(child_conn, i, message_per_sender))
senders.append(p)
p.start()
# 创建接收进程
receiver = Process(target=concurrent_receiver,
args=(parent_conn, total_messages))
receiver.start()
# 等待所有进程完成
for p in senders:
p.join()
receiver.join()
print("管道通信演示完成")
2. 队列(Queue)
队列提供了更安全的多进程通信方式,内置了同步机制。
Queue - 标准队列
from multiprocessing import Process, Queue
import time
import random
def producer(q, producer_id):
"""生产者进程"""
for i in range(5):
item = f"生产者{producer_id}的产品{i}"
q.put(item)
print(f"[生产] {item}")
time.sleep(random.uniform(0.1, 0.3))
# 发送结束信号
q.put(None)
def consumer(q, consumer_id):
"""消费者进程"""
while True:
item = q.get()
if item is None:
# 收到结束信号,重新放回队列供其他消费者
q.put(None)
break
print(f"[消费者{consumer_id}] 处理: {item}")
time.sleep(random.uniform(0.2, 0.4))
if __name__ == '__main__':
# 创建队列
q = Queue(maxsize=10) # 限制队列大小
# 创建生产者进程
producers = []
for i in range(2):
p = Process(target=producer, args=(q, i))
producers.append(p)
p.start()
# 创建消费者进程
consumers = []
for i in range(3):
p = Process(target=consumer, args=(q, i))
consumers.append(p)
p.start()
# 等待生产者完成
for p in producers:
p.join()
# 发送结束信号
q.put(None)
# 等待消费者完成
for p in consumers:
p.join()
print("队列通信演示完成")
SimpleQueue - 简化队列
from multiprocessing import Process, SimpleQueue
import time
def simple_producer(q):
"""简单生产者"""
for i in range(5):
q.put(f"简单消息{i}")
print(f"发送: 简单消息{i}")
time.sleep(0.5)
def simple_consumer(q):
"""简单消费者"""
for _ in range(5):
message = q.get()
print(f"接收: {message}")
if __name__ == '__main__':
# SimpleQueue 不支持 maxsize 参数
sq = SimpleQueue()
p1 = Process(target=simple_producer, args=(sq,))
p2 = Process(target=simple_consumer, args=(sq,))
p1.start()
p2.start()
p1.join()
p2.join()
print("SimpleQueue 演示完成")
JoinableQueue - 可连接队列
from multiprocessing import Process, JoinableQueue
import time
import random
def task_producer(q):
"""任务生产者"""
tasks = ['任务A', '任务B', '任务C', '任务D', '任务E']
for task in tasks:
q.put(task)
print(f"添加任务: {task}")
time.sleep(0.2)
def task_worker(q, worker_id):
"""任务工作者"""
while True:
task = q.get()
if task is None:
q.task_done() # 标记 None 任务完成
break
print(f"[工作者{worker_id}] 开始处理: {task}")
time.sleep(random.uniform(1, 2)) # 模拟任务处理时间
print(f"[工作者{worker_id}] 完成: {task}")
q.task_done() # 标记任务完成
if __name__ == '__main__':
jq = JoinableQueue()
# 启动生产者
producer = Process(target=task_producer, args=(jq,))
producer.start()
# 启动工作者
workers = []
for i in range(3):
w = Process(target=task_worker, args=(jq, i))
w.start()
workers.append(w)
# 等待生产者完成
producer.join()
# 等待所有任务完成
jq.join()
print("所有任务已完成!")
# 发送停止信号给工作者
for _ in workers:
jq.put(None)
# 等待工作者退出
for w in workers:
w.join()
print("JoinableQueue 演示完成")
3. 多进程同步原语
Lock - 互斥锁
from multiprocessing import Process, Lock
import time
# 共享资源(文件)
SHARED_FILE = 'shared_counter.txt'
def write_to_file(lock, process_id, count):
"""使用锁保护文件写入"""
for i in range(count):
with lock: # 获取锁
try:
# 读取当前值
try:
with open(SHARED_FILE, 'r') as f:
current = int(f.read().strip() or '0')
except FileNotFoundError:
current = 0
# 增加值
current += 1
# 写回文件
with open(SHARED_FILE, 'w') as f:
f.write(str(current))
print(f"进程{process_id}: 写入值 {current}")
except Exception as e:
print(f"进程{process_id} 出错: {e}")
time.sleep(0.1) # 模拟其他工作
if __name__ == '__main__':
# 清理文件
try:
with open(SHARED_FILE, 'w') as f:
f.write('0')
except:
pass
lock = Lock()
# 创建多个进程
processes = []
for i in range(3):
p = Process(target=write_to_file, args=(lock, i, 5))
processes.append(p)
p.start()
# 等待所有进程完成
for p in processes:
p.join()
# 读取最终结果
try:
with open(SHARED_FILE, 'r') as f:
final_value = f.read().strip()
print(f"最终计数器值: {final_value}")
except FileNotFoundError:
print("文件未找到")
RLock - 可重入锁
from multiprocessing import Process, RLock
import time
def recursive_function(rlock, process_id, depth, max_depth=3):
"""递归函数演示 RLock 的可重入性"""
if depth > max_depth:
return
with rlock:
print(f"进程{process_id}: 进入层级 {depth}")
time.sleep(0.1)
# 递归调用 - RLock 允许同一进程多次获取锁
recursive_function(rlock, process_id, depth + 1, max_depth)
print(f"进程{process_id}: 退出层级 {depth}")
if __name__ == '__main__':
rlock = RLock()
processes = []
for i in range(2):
p = Process(target=recursive_function, args=(rlock, i, 1))
processes.append(p)
p.start()
for p in processes:
p.join()
print("RLock 演示完成")
💡 实际应用
多进程日志收集系统
from multiprocessing import Process, Queue, current_process
import time
import random
import logging
from datetime import datetime
def log_collector(log_queue):
"""日志收集进程"""
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='multiprocess.log'
)
logger = logging.getLogger('LogCollector')
while True:
try:
# 获取日志消息(超时3秒)
log_record = log_queue.get(timeout=3)
if log_record is None: # 结束信号
logger.info("日志收集器正在关闭...")
break
# 写入日志
logger.info(f"[{log_record['process']}] {log_record['message']}")
except Exception as e:
logger.error(f"日志收集出错: {e}")
def worker_process(process_id, log_queue, work_duration=10):
"""工作进程 - 产生日志"""
start_time = time.time()
while time.time() - start_time < work_duration:
# 模拟工作并产生日志
work_type = random.choice(['处理数据', '网络请求', '文件操作', '计算任务'])
log_message = {
'process': f'Worker-{process_id}',
'message': f'正在执行: {work_type}',
'timestamp': datetime.now().isoformat()
}
log_queue.put(log_message)
# 随机工作时间
time.sleep(random.uniform(0.5, 1.5))
# 发送完成消息
final_message = {
'process': f'Worker-{process_id}',
'message': '工作完成',
'timestamp': datetime.now().isoformat()
}
log_queue.put(final_message)
if __name__ == '__main__':
# 创建日志队列
log_queue = Queue()
# 启动日志收集进程
log_collector_process = Process(target=log_collector, args=(log_queue,))
log_collector_process.start()
# 启动工作进程
workers = []
for i in range(4):
w = Process(target=worker_process, args=(i, log_queue, 5))
workers.append(w)
w.start()
# 等待所有工作进程完成
for w in workers:
w.join()
# 发送结束信号给日志收集器
log_queue.put(None)
# 等待日志收集器完成
log_collector_process.join()
print("多进程日志收集演示完成")
print("查看 multiprocess.log 文件获取详细日志")
⚠️ 注意事项
- 管道并发问题:
- 多个进程同时写入一个管道可能导致数据混乱
- 使用队列替代管道进行多对多通信
- 队列的选择:
Queue:功能完整,支持大小限制SimpleQueue:轻量级,无大小限制JoinableQueue:支持任务跟踪
- 锁的使用:
- 进程锁开销比线程锁大
- 避免死锁:统一获取锁的顺序
- 使用
with语句确保锁的释放
- 序列化限制:
- 通过 IPC 传递的对象必须可 pickle 序列化
- 某些对象(如文件句柄、锁)无法跨进程传递
🔗 相关内容
📚 扩展阅读
- Python 官方文档 - multiprocessing
- Python 官方文档 - multiprocessing.Pipe
- Python 官方文档 - multiprocessing.Queue
🏷️ 标签
进程间通信 IPC Pipe Queue multiprocessing 同步原语
最后更新: 2024-01-15
作者: Python 编程指南
版本: 1.0
讨论与反馈
欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。