Python schedule 模块定时调度
学习使用 Python 的 schedule 模块实现轻量级的任务定时调度,适合简单的周期性任务
Python schedule 模块定时调度
📝 概述
schedule 是一个轻量级的 Python 定时调度库,使用简单的 API 语法即可实现灵活的定时任务配置。适合在脚本和小型应用中实现周期性任务,不需要复杂的线程管理和持久化。
主要特点:
- API 直观,链式调用易读易写
- 支持多种调度表达式(every、seconds/minutes/hours/days、at 等)
- 轻量级实现,不依赖线程池或复杂框架
- 适合脚本和小型应用中的定时需求
适用场景:
- 简单的周期性任务
- 需要快速实现任务调度的脚本
- 没有持久化与复杂调度需求的应用
🎯 学习目标
- schedule 的基本使用方法
- 多任务调度与标签过滤
- 条件执行与动态调度
- 与其他定时器实现方式的对比
- 注意事项与最佳实践
🧰 安装
# 使用 pip 安装
pip install schedule
# 或使用 conda-forge 安装(推荐 conda 环境)
conda install -c conda-forge schedule
🔍 详细内容
准备:监控函数
import psutil
import time
import datetime
def monitor_system():
"""监测系统CPU和内存使用情况"""
cpu_percent = psutil.cpu_percent()
memory_percent = psutil.virtual_memory().percent
print(f"{datetime.datetime.now():%H:%M:%S} CPU:{cpu_percent}% 内存:{memory_percent}%")
def monitor_network():
"""监测网络收发字节数"""
net_info = psutil.net_io_counters()
print(f"{datetime.datetime.now():%H:%M:%S} 发送={net_info.bytes_sent} 接收={net_info.bytes_recv}")
基本用法
import schedule
# 每2秒执行一次
schedule.every(2).seconds.do(monitor_system)
# 每5秒执行一次
schedule.every(5).seconds.do(monitor_network)
# 主循环:检查待执行任务
print("启动schedule调度器...")
while True:
schedule.run_pending()
time.sleep(0.5) # 适度休眠,降低CPU使用率
指定时间与复杂表达式
# 每天 10:30 执行
schedule.every().day.at("10:30").do(lambda: print("每天固定时间执行"))
# 每周一、周三、周五执行
schedule.every().monday.do(lambda: print("每周一执行"))
schedule.every().wednesday.do(lambda: print("每周三执行"))
schedule.every().friday.do(lambda: print("每周五执行"))
# 每小时的第 15 分钟执行
schedule.every().hour.at(":15").do(lambda: print("每小时第15分钟执行"))
使用标签管理任务
# 使用标签标识任务
schedule.every(2).seconds.do(monitor_system).tag("system", "fast")
schedule.every(5).seconds.do(monitor_network).tag("network")
# 只运行带有特定标签的任务
print("只运行 system 标签的任务 5 秒...")
start_time = time.time()
while time.time() - start_time < 5:
schedule.run_pending("system")
time.sleep(0.5)
# 清除特定标签的任务
schedule.clear("fast")
条件执行与动态调度
# 条件执行:当CPU使用率高于80%时增加监控频率
import random
def dynamic_task():
# 模拟系统负载
cpu_load = random.randint(50, 100)
print(f"当前模拟CPU负载: {cpu_load}%")
if cpu_load > 80:
# 增加监控频率
print("CPU高负载,增加监控频率")
schedule.every(1).seconds.do(monitor_system).tag("dynamic")
else:
# 降低监控频率
print("CPU较空闲,降低监控频率")
schedule.clear("dynamic")
# 每10秒检查一次系统状态
schedule.every(10).seconds.do(dynamic_task)
# 主循环
print("启动动态调度示例...")
for _ in range(30): # 运行一段时间
schedule.run_pending()
time.sleep(1)
并发与长任务处理
# 将长任务放入线程池执行,避免阻塞schedule主循环
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=3)
# 使用线程池包装任务
schedule.every(3).seconds.do(lambda: executor.submit(monitor_system))
schedule.every(7).seconds.do(lambda: executor.submit(monitor_network))
# 定时清理线程池完成的任务(示例)
def cleanup_finished_futures():
# 这里可以维护 futures 列表并清理已完成的任务
print("清理完成的后台任务...")
schedule.every(30).seconds.do(cleanup_finished_futures)
# 主循环
print("启动并发调度示例...")
for _ in range(20):
schedule.run_pending()
time.sleep(0.5)
executor.shutdown(wait=True)
💡 优缺点分析
优点
def schedule_advantages():
print("schedule 的优点:")
print("1. API 简洁直观,易于编写与阅读")
print("2. 适合脚本和小型服务中的周期任务")
print("3. 轻量级,无需复杂配置")
缺点和限制
def schedule_limitations():
print("schedule 的限制:")
print("1. 需要显式循环 run_pending,无法脱离主循环")
print("2. 不内置持久化与分布式功能")
print("3. 对复杂调度场景支持有限(如错过任务触发、持久化存储)")
⚠️ 注意事项
- 调度检查需要在主循环中执行,考虑合理的 sleep 间隔
- 考虑长任务的并发执行,避免阻塞主循环
- 无持久化与分布式能力,不适合企业级复杂调度
- 程序退出前清理线程池与资源
🔗 相关内容
📚 扩展阅读
- schedule 官方文档: https://schedule.readthedocs.io/
- 任务调度基础概念
🏷️ 标签
定时器 schedule 任务调度 轻量级调度
最后更新: 2024-01-15
作者: Python 编程指南
版本: 1.0
讨论与反馈
欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。