Python Threading.Timer 定时器

学习使用 Python 标准库中的 threading.Timer 类实现异步定时任务,适合中等复杂度的定时需求

分类: basics 难度: 中级 更新: 2024-01-15
定时器 threading Timer 线程定时器 多线程

Python Threading.Timer 定时器

📝 概述

Threading.Timer 是 Python 标准库中的线程化定时器实现,基于 Threading 模块构建。与基础的 while+sleep 方法相比,Threading.Timer 提供了异步执行能力,能够在不阻塞主线程的情况下执行定时任务。这种实现特别适合需要在后台执行任务,但又不需要复杂调度框架的场景。

主要特点:

  • 基于线程实现,支持异步执行
  • 内置于 Python 标准库,无需额外安装
  • 提供基本的控制接口 (start/cancel)
  • 支持一次性任务和重复任务
  • 可以方便地构建自定义定时器类

适用场景:

  • 需要在后台执行的定时任务
  • 不阻塞主线程的一次性延迟执行
  • 对精确时间控制要求不高的应用
  • 中等复杂度的定时需求

🎯 学习目标

通过本教程,你将掌握:

  • Threading.Timer 的工作原理和基本用法
  • 如何创建可重复执行的定时器
  • 多定时器的创建和控制
  • 定时器的取消和资源管理
  • 线程安全考虑和最佳实践
  • 与其他定时器实现的比较

📋 前置知识

  • Python 基础语法
  • 函数和类的定义与使用
  • 基本的线程概念
  • Python 中的异常处理
  • 回调函数的概念

🧰 安装

Threading.Timer 是 Python 标准库的一部分,无需额外安装。本示例中用于系统监控的 psutil 库需要安装:

# 使用 pip 安装
pip install psutil

# 或使用 conda-forge 安装(推荐 conda 环境)
conda install -c conda-forge psutil

🔍 详细内容

系统监控函数准备

我们将继续使用系统监控函数来演示定时器的使用:

# 系统监控功能实现
import psutil
import time
import datetime

def monitor_system(logfile=None):
    """监测系统CPU和内存使用情况"""
    # 获取CPU使用率
    cpu_percent = psutil.cpu_percent()
    
    # 获取内存使用情况
    memory = psutil.virtual_memory()
    memory_percent = memory.percent
    
    # 获取当前时间
    now = datetime.datetime.now()
    timestamp = now.strftime('%Y-%m-%d %H:%M:%S')
    
    # 格式化输出信息
    line = f'{timestamp} CPU:{cpu_percent}%, 内存:{memory_percent}%'
    print(line)
    
    # 可选:写入日志文件
    if logfile:
        logfile.write(line + '\n')
        logfile.flush()
    
    return cpu_percent, memory_percent

def monitor_network(logfile=None):
    """监测网络收发字节数"""
    # 获取网络IO统计
    net_info = psutil.net_io_counters()
    
    # 获取当前时间
    now = datetime.datetime.now()
    timestamp = now.strftime('%Y-%m-%d %H:%M:%S')
    
    # 格式化输出信息
    line = f'{timestamp} 发送字节={net_info.bytes_sent}, 接收字节={net_info.bytes_recv}'
    print(line)
    
    # 可选:写入日志文件
    if logfile:
        logfile.write(line + '\n')
        logfile.flush()
    
    return net_info.bytes_sent, net_info.bytes_recv

Threading.Timer 基本用法

import threading

def basic_timer_example():
    """Threading.Timer 基本用法示例"""
    print("Threading.Timer 基本用法演示")
    
    def delayed_message(message):
        print(f"延迟消息: {message}")
    
    # 创建一个3秒后执行的定时器
    timer = threading.Timer(3.0, delayed_message, args=["这是一个延迟3秒的消息"])
    
    print("启动定时器...")
    timer.start()  # 非阻塞,立即返回
    
    print("主线程继续执行其他任务...")
    # 模拟主线程工作
    for i in range(5):
        print(f"主线程工作中... {i+1}")
        time.sleep(1)  # 主线程中的等待不会影响定时器
    
    print("演示结束")

# basic_timer_example()

使用 Timer 执行系统监控

def timer_monitor_example():
    """使用Threading.Timer执行系统监控"""
    # 创建定时器执行系统监控
    timer = threading.Timer(2.0, monitor_system)
    
    print("启动系统监控定时器...")
    timer.start()
    
    print("主线程继续执行...")
    time.sleep(3)  # 等待定时器执行完成
    
    print("监控示例结束")

# timer_monitor_example()

可控制的重复执行定时器类

class RepeatingTimer:
    """可重复执行的定时器类"""
    
    def __init__(self, interval, function, *args, **kwargs):
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.timer = None
        self.is_running = False
        self.next_call = 0
        self.iteration_count = 0
    
    def _run(self):
        """内部运行函数,执行任务并安排下一次执行"""
        self.is_running = True
        self.iteration_count += 1
        
        try:
            # 执行目标函数
            self.function(*self.args, **self.kwargs)
        except Exception as e:
            print(f"定时任务执行异常: {str(e)}")
        
        # 只有当仍在运行时才安排下一次执行
        if self.is_running:
            # 计算下一次执行时间
            self.next_call = time.time() + self.interval
            self.timer = threading.Timer(self.interval, self._run)
            self.timer.daemon = True  # 设为守护线程,主程序退出时自动结束
            self.timer.start()
    
    def start(self):
        """启动定时器"""
        if not self.is_running:
            self.next_call = time.time() + self.interval
            self.timer = threading.Timer(self.interval, self._run)
            self.timer.daemon = True
            self.timer.start()
            self.is_running = True
            print(f"定时器已启动,间隔 {self.interval} 秒")
    
    def stop(self):
        """停止定时器"""
        if self.is_running and self.timer:
            self.timer.cancel()
            self.is_running = False
            print(f"定时器已停止,共执行 {self.iteration_count} 次")
    
    def get_stats(self):
        """获取定时器状态"""
        return {
            "is_running": self.is_running,
            "interval": self.interval,
            "iteration_count": self.iteration_count,
            "next_call_in": max(0, self.next_call - time.time()) if self.is_running else None
        }

def demo_repeating_timer():
    """演示可重复执行的定时器"""
    # 创建定时器
    timer = RepeatingTimer(2, monitor_system)
    
    # 启动定时器
    timer.start()
    
    # 让定时器运行一段时间
    for i in range(5):
        print(f"主线程工作中... ({i+1}/5)")
        # 输出定时器状态
        stats = timer.get_stats()
        print(f"定时器状态: 运行={stats['is_running']}, 次数={stats['iteration_count']}, 下次执行={stats['next_call_in']:.1f}秒后")
        time.sleep(1)
    
    # 停止定时器
    timer.stop()
    
    print("演示结束")

# 执行演示
# demo_repeating_timer()

多任务定时器管理器

class TimerManager:
    """多任务定时器管理器"""
    
    def __init__(self):
        self.timers = {}
        self.lock = threading.Lock()  # 用于线程安全操作
    
    def add_timer(self, name, interval, function, *args, **kwargs):
        """添加定时器"""
        with self.lock:
            if name in self.timers:
                print(f"定时器 '{name}' 已存在,将被替换")
                self.stop_timer(name)
            
            timer = RepeatingTimer(interval, function, *args, **kwargs)
            self.timers[name] = timer
            print(f"定时器 '{name}' 已添加,间隔 {interval} 秒")
            return timer
    
    def start_timer(self, name):
        """启动指定名称的定时器"""
        with self.lock:
            if name in self.timers:
                self.timers[name].start()
                return True
            else:
                print(f"定时器 '{name}' 不存在")
                return False
    
    def stop_timer(self, name):
        """停止指定名称的定时器"""
        with self.lock:
            if name in self.timers:
                self.timers[name].stop()
                return True
            else:
                print(f"定时器 '{name}' 不存在")
                return False
    
    def remove_timer(self, name):
        """移除指定名称的定时器"""
        with self.lock:
            if name in self.timers:
                self.stop_timer(name)
                del self.timers[name]
                print(f"定时器 '{name}' 已移除")
                return True
            else:
                print(f"定时器 '{name}' 不存在")
                return False
    
    def start_all(self):
        """启动所有定时器"""
        with self.lock:
            if not self.timers:
                print("没有定时器可启动")
                return
            
            count = 0
            for name, timer in self.timers.items():
                timer.start()
                count += 1
            
            print(f"已启动 {count} 个定时器")
    
    def stop_all(self):
        """停止所有定时器"""
        with self.lock:
            if not self.timers:
                print("没有定时器可停止")
                return
            
            count = 0
            for name, timer in self.timers.items():
                timer.stop()
                count += 1
            
            print(f"已停止 {count} 个定时器")
    
    def list_timers(self):
        """列出所有定时器状态"""
        with self.lock:
            if not self.timers:
                print("当前没有注册的定时器")
                return []
            
            result = []
            print("\n当前定时器列表:")
            print("-" * 70)
            print(f"{'名称':<15} {'状态':<10} {'间隔(秒)':<10} {'执行次数':<10} {'下次执行':<15}")
            print("-" * 70)
            
            for name, timer in self.timers.items():
                stats = timer.get_stats()
                status = "运行中" if stats["is_running"] else "已停止"
                next_call = f"{stats['next_call_in']:.1f}秒后" if stats["is_running"] else "N/A"
                
                print(f"{name:<15} {status:<10} {stats['interval']:<10.1f} {stats['iteration_count']:<10} {next_call:<15}")
                
                result.append({
                    "name": name,
                    "status": status,
                    "interval": stats["interval"],
                    "iteration_count": stats["iteration_count"],
                    "next_call": next_call
                })
            
            print("-" * 70)
            return result

def demo_timer_manager():
    """演示定时器管理器的使用"""
    manager = TimerManager()
    
    # 添加多个定时器
    manager.add_timer("system", 3, monitor_system)
    manager.add_timer("network", 5, monitor_network)
    manager.add_timer("clock", 2, lambda: print(f"当前时间: {datetime.datetime.now().strftime('%H:%M:%S')}"))
    
    # 列出所有定时器
    manager.list_timers()
    
    # 启动所有定时器
    manager.start_all()
    
    # 等待一段时间
    time.sleep(6)
    
    # 查看定时器状态
    manager.list_timers()
    
    # 停止特定定时器
    manager.stop_timer("network")
    
    # 再等待一段时间
    time.sleep(4)
    
    # 最终状态
    manager.list_timers()
    
    # 停止所有定时器
    manager.stop_all()
    
    print("演示结束")

# 执行演示
# demo_timer_manager()

高级功能:带优先级的任务定时器

class PriorityTask:
    """带优先级的任务"""
    
    def __init__(self, priority, function, *args, **kwargs):
        self.priority = priority  # 优先级:1(高) - 5(低)
        self.function = function
        self.args = args
        self.kwargs = kwargs
    
    def execute(self):
        """执行任务"""
        return self.function(*self.args, **self.kwargs)

class PriorityTimer:
    """带优先级的定时器"""
    
    def __init__(self, interval):
        self.interval = interval
        self.tasks = []  # 任务列表
        self.timer = None
        self.is_running = False
        self.lock = threading.Lock()
    
    def add_task(self, priority, function, *args, **kwargs):
        """添加任务"""
        with self.lock:
            task = PriorityTask(priority, function, *args, **kwargs)
            self.tasks.append(task)
            # 按优先级排序(低数字 = 高优先级)
            self.tasks.sort(key=lambda t: t.priority)
            return len(self.tasks)
    
    def _run(self):
        """内部运行函数"""
        if not self.is_running:
            return
        
        with self.lock:
            # 按优先级执行任务
            for task in self.tasks:
                try:
                    print(f"执行优先级 {task.priority} 的任务...")
                    task.execute()
                except Exception as e:
                    print(f"任务执行错误: {str(e)}")
        
        # 安排下一次执行
        if self.is_running:
            self.timer = threading.Timer(self.interval, self._run)
            self.timer.daemon = True
            self.timer.start()
    
    def start(self):
        """启动定时器"""
        with self.lock:
            if not self.is_running:
                self.is_running = True
                self.timer = threading.Timer(self.interval, self._run)
                self.timer.daemon = True
                self.timer.start()
                print(f"优先级定时器已启动,间隔 {self.interval} 秒,共 {len(self.tasks)} 个任务")
    
    def stop(self):
        """停止定时器"""
        with self.lock:
            if self.is_running and self.timer:
                self.is_running = False
                self.timer.cancel()
                print("优先级定时器已停止")

def demo_priority_timer():
    """演示带优先级的定时器"""
    # 创建定时器
    timer = PriorityTimer(interval=2)
    
    # 添加不同优先级的任务
    timer.add_task(1, lambda: print("高优先级任务 - 紧急系统检查"))
    timer.add_task(3, monitor_system)
    timer.add_task(5, monitor_network)
    timer.add_task(2, lambda: print(f"重要任务 - 当前时间: {datetime.datetime.now().strftime('%H:%M:%S')}"))
    
    # 启动定时器
    timer.start()
    
    # 等待几个周期
    time.sleep(7)
    
    # 停止定时器
    timer.stop()
    
    print("演示结束")

# 执行演示
# demo_priority_timer()

💡 优缺点分析

优点

def threading_timer_advantages():
    """演示Threading.Timer的优点"""
    print("Threading.Timer 的优点:")
    print("1. 基于标准库,无需额外依赖")
    print("2. 支持异步执行,不阻塞主线程")
    print("3. 使用简单,API 直观")
    print("4. 可以方便地扩展为自定义定时器类")
    print("5. 适合中等复杂度的定时需求")
    
    # 演示异步非阻塞特性
    def delayed_task():
        print("定时任务执行...")
    
    print("\n异步执行演示:")
    timer = threading.Timer(2, delayed_task)
    timer.start()
    
    print("主线程继续执行,不受定时器影响")
    time.sleep(3)  # 等待定时器执行完毕

# threading_timer_advantages()

缺点和限制

def threading_timer_limitations():
    """演示Threading.Timer的缺点"""
    print("Threading.Timer 的限制:")
    print("1. 只支持一次性执行,需要自行实现重复执行")
    print("2. 没有内置的错误处理机制")
    print("3. 没有提供复杂的调度能力(如cron表达式)")
    print("4. 需要手动管理线程资源")
    print("5. 多定时器协调需要额外代码")
    
    # 演示一次性执行的限制
    print("\n一次性执行限制演示:")
    timer = threading.Timer(1, lambda: print("这个定时器只会执行一次"))
    timer.start()
    time.sleep(2)  # 等待定时器执行
    
    print("如需再次执行,需要创建新的定时器实例")
    timer2 = threading.Timer(1, lambda: print("这是第二个定时器实例"))
    timer2.start()
    time.sleep(2)  # 等待第二个定时器执行

# threading_timer_limitations()

⚠️ 注意事项

  1. 线程安全考虑
    • 在多个定时器访问共享资源时需要使用锁
    • 注意线程间的数据共享和状态管理
    • 避免死锁和线程竞争条件
  2. 资源管理
    • 确保不再使用的定时器被正确取消
    • 长时间运行的应用中要避免创建过多线程
    • 考虑使用线程池来限制并发线程数
  3. 错误处理
    • 定时器线程中的异常不会传播到主线程
    • 需要在定时器中添加 try-except 块处理异常
    • 考虑添加日志记录机制跟踪错误
  4. 时间精度
    • 定时器精度受 Python 解释器和操作系统影响
    • 任务执行时间会影响定时精度
    • 不适合需要高精度定时的场景(如音频处理)

🔗 相关内容

📚 扩展阅读

🏷️ 标签

定时器 threading Timer 线程定时器 多线程


最后更新: 2024-01-15
作者: Python 编程指南
版本: 1.0

作者: Python 编程指南

版本: 1.0

讨论与反馈

欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。