Python 多线程编程

深入学习 Python threading 模块,掌握多线程编程的核心技术和最佳实践

分类: basics 难度: 中级 更新: 2024-01-15
多线程 threading Thread 线程池 ThreadPoolExecutor 线程锁

Python 多线程编程

📝 概述

Python 的 threading 模块提供了强大的多线程编程支持,允许程序同时执行多个任务。多线程特别适用于 I/O 密集型任务,如文件读写、网络请求等。本文档详细介绍了 threading 模块的核心功能,包括线程创建、线程池、线程同步和锁机制等。

🎯 学习目标

  • 掌握 Python threading 模块的基本使用
  • 理解线程的创建方式和生命周期管理
  • 学会使用线程池 ThreadPoolExecutor 进行批量任务处理
  • 掌握各种线程锁机制确保线程安全
  • 理解脏数据产生的原因和解决方案

📋 前置知识

  • Python 基础语法和面向对象编程
  • 函数定义和调用
  • 并发编程基本概念
  • 异常处理机制

🔍 详细内容

Thread 对象

线程的创建方式

方式1:继承 Thread 类

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name
    
    def run(self):
        """线程执行的主要逻辑"""
        for i in range(5):
            print(f"线程 {self.name} 正在执行第 {i+1} 次")
            time.sleep(1)

# 创建并启动线程
thread1 = MyThread("线程1")
thread2 = MyThread("线程2")

thread1.start()
thread2.start()

# 等待线程完成
thread1.join()
thread2.join()

print("所有线程执行完成")

方式2:直接注入任务函数

import threading
import time

def worker_task(name, count):
    """工作任务函数"""
    for i in range(count):
        print(f"工作线程 {name} 执行第 {i+1} 次任务")
        time.sleep(0.5)

# 创建线程对象
thread1 = threading.Thread(target=worker_task, args=("A", 3))
thread2 = threading.Thread(target=worker_task, args=("B", 3))

# 启动线程
thread1.start()
thread2.start()

# 等待线程完成
thread1.join()
thread2.join()

print("任务完成")

Thread 对象参数

参数名 类型 说明
target callable 线程要执行的目标函数
args tuple 传递给目标函数的位置参数
kwargs dict 传递给目标函数的关键字参数
name str 线程名称,用于标识
daemon bool 是否为守护线程

Thread 对象方法

import threading
import time

def sample_task():
    for i in range(3):
        print(f"执行任务 {i+1}")
        time.sleep(1)

# 创建线程
thread = threading.Thread(target=sample_task, name="示例线程")

print(f"线程名称: {thread.name}")
print(f"线程ID: {thread.ident}")
print(f"是否存活: {thread.is_alive()}")

# 启动线程
thread.start()

# 检查线程状态
print(f"启动后是否存活: {thread.is_alive()}")

# 等待线程完成
thread.join()

print(f"完成后是否存活: {thread.is_alive()}")

join() 方法详解

import threading
import time

def long_task(task_name, duration):
    print(f"{task_name} 开始执行")
    time.sleep(duration)
    print(f"{task_name} 执行完成")

# 创建多个线程
threads = []
for i in range(3):
    thread = threading.Thread(
        target=long_task, 
        args=(f"任务{i+1}", 2)
    )
    threads.append(thread)
    thread.start()

print("所有线程已启动")

# 等待所有线程完成
for thread in threads:
    thread.join()

print("所有任务执行完成")

setDaemon() 守护线程

import threading
import time

def daemon_task():
    """守护线程任务"""
    count = 0
    while True:
        count += 1
        print(f"守护线程运行中... {count}")
        time.sleep(1)

def main_task():
    """主任务"""
    for i in range(5):
        print(f"主任务执行 {i+1}")
        time.sleep(0.5)

# 创建守护线程
daemon_thread = threading.Thread(target=daemon_task)
daemon_thread.daemon = True  # 设置为守护线程
daemon_thread.start()

# 创建主任务线程
main_thread = threading.Thread(target=main_task)
main_thread.start()
main_thread.join()

print("主程序结束,守护线程也会自动结束")

线程池 ThreadPoolExecutor

基本使用

from concurrent.futures import ThreadPoolExecutor
import time

def process_item(item):
    """处理单个项目"""
    print(f"开始处理项目: {item}")
    time.sleep(1)  # 模拟处理时间
    result = f"项目{item}处理完成"
    print(result)
    return result

# 使用线程池
with ThreadPoolExecutor(max_workers=3) as executor:
    items = ['A', 'B', 'C', 'D', 'E']
    
    # 提交任务
    futures = [executor.submit(process_item, item) for item in items]
    
    # 获取结果
    for future in futures:
        result = future.result()
        print(f"获取结果: {result}")

as_completed() 方法

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def download_file(filename):
    """模拟文件下载"""
    download_time = random.uniform(1, 3)
    time.sleep(download_time)
    return f"{filename} 下载完成,耗时 {download_time:.2f}s"

files = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt']

with ThreadPoolExecutor(max_workers=2) as executor:
    # 提交所有任务
    future_to_file = {
        executor.submit(download_file, file): file 
        for file in files
    }
    
    # 按完成顺序处理结果
    for future in as_completed(future_to_file):
        filename = future_to_file[future]
        try:
            result = future.result()
            print(f"[完成] {result}")
        except Exception as exc:
            print(f"[错误] {filename} 下载失败: {exc}")

Executor.map() 方法

from concurrent.futures import ThreadPoolExecutor
import time

def calculate_square(number):
    """计算平方"""
    time.sleep(0.1)  # 模拟计算时间
    result = number ** 2
    print(f"{number}^2 = {result}")
    return result

numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

print("使用 map() 方法批量处理:")
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(calculate_square, numbers))

print(f"所有结果: {results}")

wait() 方法

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
import time
import random

def task_with_random_time(task_id):
    """随机执行时间的任务"""
    execution_time = random.uniform(1, 4)
    time.sleep(execution_time)
    return f"任务{task_id}完成,耗时{execution_time:.2f}s"

with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交任务
    futures = [
        executor.submit(task_with_random_time, i) 
        for i in range(5)
    ]
    
    print("等待第一个任务完成...")
    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    
    for future in done:
        print(f"[最先完成] {future.result()}")
    
    print(f"剩余 {len(not_done)} 个任务继续执行...")
    
    # 等待所有任务完成
    done, not_done = wait(not_done, return_when=ALL_COMPLETED)
    
    for future in done:
        print(f"[后续完成] {future.result()}")

线程锁机制

脏数据问题演示

import threading
import time

# 共享资源
counter = 0

def increment_counter():
    """不安全的计数器增加"""
    global counter
    for _ in range(100000):
        # 这里存在竞态条件
        temp = counter
        temp += 1
        counter = temp

# 创建多个线程同时修改计数器
threads = []
for i in range(5):
    thread = threading.Thread(target=increment_counter)
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

print(f"最终计数器值: {counter}")
print(f"期望值: {5 * 100000}")
print(f"数据丢失: {5 * 100000 - counter}")

Lock 互斥锁

import threading
import time

# 共享资源和锁
counter = 0
lock = threading.Lock()

def safe_increment_counter():
    """使用锁保护的计数器增加"""
    global counter
    for _ in range(100000):
        with lock:  # 使用 with 语句自动管理锁
            counter += 1

# 或者手动管理锁
def manual_lock_increment():
    """手动管理锁的示例"""
    global counter
    for _ in range(100000):
        lock.acquire()
        try:
            counter += 1
        finally:
            lock.release()

# 测试线程安全的版本
counter = 0
threads = []

for i in range(5):
    thread = threading.Thread(target=safe_increment_counter)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"使用锁后的计数器值: {counter}")
print(f"期望值: {5 * 100000}")

RLock 可重入锁

import threading
import time

class Counter:
    def __init__(self):
        self._value = 0
        self._lock = threading.RLock()  # 可重入锁
    
    def increment(self):
        """增加计数"""
        with self._lock:
            self._value += 1
    
    def decrement(self):
        """减少计数"""
        with self._lock:
            self._value -= 1
    
    def increment_twice(self):
        """增加两次 - 演示重入特性"""
        with self._lock:
            self.increment()  # 再次获取同一个锁
            self.increment()
    
    def get_value(self):
        """获取当前值"""
        with self._lock:
            return self._value

# 测试可重入锁
counter = Counter()

def worker():
    for _ in range(1000):
        counter.increment_twice()

threads = []
for i in range(5):
    thread = threading.Thread(target=worker)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"最终计数值: {counter.get_value()}")
print(f"期望值: {5 * 1000 * 2}")

Semaphore 信号量

import threading
import time
import random

# 创建信号量,限制同时访问资源的线程数
semaphore = threading.Semaphore(3)  # 最多3个线程同时访问

def access_resource(thread_id):
    """访问有限资源"""
    print(f"线程 {thread_id} 等待访问资源...")
    
    with semaphore:
        print(f"线程 {thread_id} 获得资源访问权")
        # 模拟使用资源
        time.sleep(random.uniform(1, 3))
        print(f"线程 {thread_id} 释放资源")

# 创建多个线程尝试访问资源
threads = []
for i in range(8):
    thread = threading.Thread(target=access_resource, args=(i,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print("所有线程完成")

Event 事件对象

import threading
import time

# 创建事件对象
event = threading.Event()

def waiter(name):
    """等待事件的线程"""
    print(f"{name} 等待事件...")
    event.wait()  # 阻塞直到事件被设置
    print(f"{name} 收到事件,开始工作")

def setter():
    """设置事件的线程"""
    time.sleep(3)
    print("设置事件...")
    event.set()  # 设置事件,唤醒所有等待的线程

# 创建等待线程
for i in range(3):
    thread = threading.Thread(target=waiter, args=(f"等待者{i+1}",))
    thread.start()

# 创建设置事件的线程
setter_thread = threading.Thread(target=setter)
setter_thread.start()

setter_thread.join()
time.sleep(1)  # 等待所有线程完成

Condition 条件变量

import threading
import time
import random

# 创建条件变量
condition = threading.Condition()
items = []
MAX_ITEMS = 10

def consumer(name):
    """消费者"""
    with condition:
        while True:
            # 等待有商品可消费
            while len(items) == 0:
                print(f"消费者 {name} 等待商品...")
                condition.wait()
            
            # 消费商品
            item = items.pop(0)
            print(f"消费者 {name} 消费了商品 {item}")
            
            # 通知生产者可以继续生产
            condition.notify_all()
            
            time.sleep(random.uniform(0.5, 1.5))

def producer(name):
    """生产者"""
    for i in range(5):
        with condition:
            # 等待有空间生产
            while len(items) >= MAX_ITEMS:
                print(f"生产者 {name} 等待空间...")
                condition.wait()
            
            # 生产商品
            item = f"{name}-商品{i+1}"
            items.append(item)
            print(f"生产者 {name} 生产了 {item}")
            
            # 通知消费者有新商品
            condition.notify_all()
        
        time.sleep(random.uniform(0.5, 1))

# 创建生产者和消费者线程
producer_thread = threading.Thread(target=producer, args=("生产者A",))
consumer_thread1 = threading.Thread(target=consumer, args=("消费者1",))
consumer_thread2 = threading.Thread(target=consumer, args=("消费者2",))

# 设置消费者为守护线程
consumer_thread1.daemon = True
consumer_thread2.daemon = True

# 启动线程
producer_thread.start()
consumer_thread1.start()
consumer_thread2.start()

# 等待生产者完成
producer_thread.join()
print("生产者完成,程序结束")

💡 实际应用

网络请求并发处理

import threading
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch_url(url):
    """获取URL内容"""
    try:
        start_time = time.time()
        response = requests.get(url, timeout=5)
        end_time = time.time()
        
        return {
            'url': url,
            'status_code': response.status_code,
            'response_time': end_time - start_time,
            'content_length': len(response.content)
        }
    except Exception as e:
        return {
            'url': url,
            'error': str(e)
        }

# 要请求的URL列表
urls = [
    'https://httpbin.org/delay/1',
    'https://httpbin.org/delay/2',
    'https://httpbin.org/delay/1',
    'https://httpbin.org/delay/3',
]

print("开始并发请求...")
start_time = time.time()

with ThreadPoolExecutor(max_workers=4) as executor:
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}
    
    for future in as_completed(future_to_url):
        result = future.result()
        if 'error' in result:
            print(f"请求失败: {result['url']} - {result['error']}")
        else:
            print(f"请求成功: {result['url']} - "
                  f"状态码: {result['status_code']}, "
                  f"响应时间: {result['response_time']:.2f}s")

total_time = time.time() - start_time
print(f"总耗时: {total_time:.2f}s")

文件处理并发

import threading
import os
import time
from pathlib import Path

class FileProcessor:
    def __init__(self, max_workers=4):
        self.lock = threading.Lock()
        self.processed_count = 0
        self.max_workers = max_workers
    
    def process_file(self, filepath):
        """处理单个文件"""
        try:
            # 模拟文件处理
            file_size = os.path.getsize(filepath)
            time.sleep(0.1)  # 模拟处理时间
            
            with self.lock:
                self.processed_count += 1
                print(f"已处理: {filepath.name} ({file_size} bytes) - "
                      f"进度: {self.processed_count}")
            
            return {'filepath': filepath, 'size': file_size, 'status': 'success'}
            
        except Exception as e:
            return {'filepath': filepath, 'error': str(e), 'status': 'error'}
    
    def process_directory(self, directory_path):
        """并发处理目录中的所有文件"""
        directory = Path(directory_path)
        files = [f for f in directory.rglob('*') if f.is_file()]
        
        print(f"找到 {len(files)} 个文件待处理")
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = [executor.submit(self.process_file, file) for file in files]
            
            results = []
            for future in as_completed(futures):
                result = future.result()
                results.append(result)
        
        return results

# 使用示例
# processor = FileProcessor(max_workers=4)
# results = processor.process_directory('/path/to/directory')
# print(f"处理完成,共处理 {len(results)} 个文件")

⚠️ 注意事项

  1. GIL 限制
    • Python 的全局解释器锁(GIL)限制了真正的并行执行
    • 多线程主要适用于 I/O 密集型任务
    • CPU 密集型任务建议使用多进程
  2. 线程安全
    • 共享数据必须使用适当的锁机制保护
    • 避免竞态条件和数据竞争
    • 注意死锁的预防
  3. 资源管理
    • 及时关闭线程池和释放资源
    • 使用 with 语句管理资源生命周期
    • 避免创建过多线程导致系统资源耗尽
  4. 异常处理
    • 线程中的异常不会传播到主线程
    • 使用 future.result() 获取异常信息
    • 在线程函数中妥善处理异常

🔗 相关内容

📚 扩展阅读

🏷️ 标签

多线程 threading Thread 线程池 ThreadPoolExecutor 线程锁


最后更新: 2024-01-15
作者: Python 编程指南
版本: 1.0

作者: Python 编程指南

版本: 1.0

讨论与反馈

欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。