Huey - 轻量级任务队列库
学习使用Huey轻量级多线程任务队列库,实现异步任务处理和定时任务调度
Huey - 轻量级任务队列库
📝 概述
Huey是一个由Python编写的小型多线程任务队列库,它支持任务调度以及执行。Huey支持多种存储后端,如Redis、SQLite等,可以帮助你在后台执行耗时任务,或者在指定时间执行定时任务。
主要特点:
- 轻量级设计,简单易用
- 支持多种存储后端(Redis、SQLite、内存)
- 多线程任务执行
- 定时任务和周期性任务支持
- 任务结果存储和检索
- 任务重试机制
- 简洁的API设计
适用场景:
- Web应用的异步任务处理
- 定时数据清理和备份
- 邮件发送和通知
- 图像处理和文件转换
- 定期报表生成
- 小到中型项目的任务调度
项目地址: https://github.com/coleifer/huey
🎯 学习目标
通过本教程,你将掌握:
- Huey任务队列的基本概念和架构
- 安装和配置Huey的不同存储后端
- 创建和执行异步任务
- 实现定时任务和周期性任务
- 任务结果的存储和检索
- 错误处理和任务重试机制
- Huey在实际项目中的应用
📋 前置知识
- Python 基础语法和面向对象编程
- 多线程编程概念
- 基本的Web开发知识
- Redis 或 SQLite 数据库基础(可选)
🚀 安装和配置
安装Huey
安装Huey非常简单,你可以使用pip命令直接安装:
pip install huey
如果你打算使用Redis作为后端,你还需要安装Redis并启动服务。对于SQLite后端,Huey会自动管理,无需额外安装。
不同存储后端配置
from huey import RedisHuey, SqliteHuey, MemoryHuey
# Redis 后端 - 适合生产环境
redis_huey = RedisHuey('my_app', host='localhost', port=6379, db=0)
# SQLite 后端 - 适合开发和小型应用
sqlite_huey = SqliteHuey('my_app.db')
# 内存后端 - 适合测试
memory_huey = MemoryHuey('my_app')
💡 实际应用
使用示例一:异步发送欢迎邮件
假设你正在开发一个网站,需要在用户注册后发送一封欢迎邮件。我们可以使用Huey来异步处理这个任务,避免用户等待。
首先,创建一个tasks.py文件,并定义一个发送邮件的任务:
from huey import RedisHuey
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import time
# 实例化Huey对象,指定Redis作为后端
huey = RedisHuey('my_app')
@huey.task()
def send_welcome_email(user_email, username):
"""异步发送欢迎邮件任务"""
print(f"开始发送欢迎邮件给 {user_email}...")
try:
# 邮件配置
smtp_server = "smtp.gmail.com"
smtp_port = 587
sender_email = "your_email@gmail.com"
sender_password = "your_password"
# 创建邮件内容
message = MIMEMultipart("alternative")
message["Subject"] = "欢迎加入我们!"
message["From"] = sender_email
message["To"] = user_email
# 邮件正文
html = f"""
<html>
<body>
<h2>欢迎 {username}!</h2>
<p>感谢您注册我们的服务。</p>
<p>我们很高兴您的加入!</p>
</body>
</html>
"""
part = MIMEText(html, "html")
message.attach(part)
# 发送邮件
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(sender_email, sender_password)
server.sendmail(sender_email, user_email, message.as_string())
print(f"欢迎邮件已成功发送给 {user_email}!")
return {"status": "success", "email": user_email}
except Exception as e:
print(f"发送邮件失败: {e}")
return {"status": "error", "message": str(e)}
@huey.task(retries=3, retry_delay=60)
def send_simple_welcome_email(user_email):
"""简化版异步邮件发送(带重试机制)"""
print(f"Sending welcome email to {user_email}...")
# 模拟邮件发送过程
time.sleep(2)
# 模拟偶尔失败的情况
import random
if random.random() < 0.3: # 30% 失败率
raise Exception("邮件服务器临时不可用")
print(f"Welcome email sent to {user_email}!")
return f"邮件已发送给 {user_email}"
在用户注册的代码中,我们可以这样触发异步任务:
from tasks import send_welcome_email, send_simple_welcome_email
def user_signup(request):
"""用户注册处理函数"""
# 用户注册逻辑
user_email = request.form['email']
username = request.form['username']
# 保存用户信息到数据库
# save_user_to_database(user_email, username)
# 发送异步欢迎邮件任务
result = send_welcome_email(user_email, username)
# 也可以获取任务ID用于后续查询
task_id = result.id
print(f"邮件发送任务ID: {task_id}")
# 立即返回响应,不等待邮件发送完成
return "注册成功!请检查您的邮箱以获取欢迎信息!"
def check_email_status(task_id):
"""检查邮件发送状态"""
from huey.api import Result
result = Result(task_id, huey)
if result.is_ready():
return result.get()
else:
return "邮件正在发送中..."
当调用send_welcome_email()函数时,Huey会将任务放入队列,并在后台工作线程中执行,无需用户等待邮件发送过程。
使用示例二:定期清理过期数据
让我们再来看一个例子,如果你的应用需要定期清理过期的数据,Huey也可以帮你轻松实现定时任务。
首先,在tasks.py文件中定义一个清理数据的任务:
from huey import crontab
import datetime
import os
@huey.periodic_task(crontab(minute='0', hour='3'))
def cleanup_expired_data():
"""每天凌晨3点执行的数据清理任务"""
print("开始执行定期数据清理任务...")
# 获取当前时间
now = datetime.datetime.now()
# 设置过期时间(30天前)
expiry_time = now - datetime.timedelta(days=30)
print(f"清理 {expiry_time} 之前的过期数据...")
try:
# 示例:清理临时文件
temp_dir = "/tmp/app_temp"
if os.path.exists(temp_dir):
for filename in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, filename)
if os.path.isfile(file_path):
# 获取文件修改时间
file_mtime = datetime.datetime.fromtimestamp(
os.path.getmtime(file_path)
)
# 如果文件超过30天,则删除
if file_mtime < expiry_time:
os.remove(file_path)
print(f"已删除过期文件: {filename}")
# 示例:清理数据库中的过期记录
# from your_app.models import LogEntry
# expired_logs = LogEntry.objects.filter(created_at__lt=expiry_time)
# count = expired_logs.count()
# expired_logs.delete()
# print(f"已清理 {count} 条过期日志记录")
print("数据清理任务完成!")
return {
"status": "success",
"cleaned_at": now.isoformat(),
"expiry_threshold": expiry_time.isoformat()
}
except Exception as e:
print(f"数据清理任务失败: {e}")
return {"status": "error", "message": str(e)}
@huey.periodic_task(crontab(minute='*/15'))
def system_health_check():
"""每15分钟执行一次系统健康检查"""
print("执行系统健康检查...")
import psutil
# 检查CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
# 检查内存使用率
memory = psutil.virtual_memory()
memory_percent = memory.percent
# 检查磁盘使用率
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
health_data = {
"timestamp": datetime.datetime.now().isoformat(),
"cpu_percent": cpu_percent,
"memory_percent": memory_percent,
"disk_percent": disk_percent
}
# 如果资源使用率过高,发送警告
if cpu_percent > 80 or memory_percent > 80 or disk_percent > 80:
print(f"警告:系统资源使用率过高!{health_data}")
# 这里可以发送邮件或其他通知
print(f"系统健康检查完成: {health_data}")
return health_data
在这个例子中,我们使用了Huey的crontab来设置任务的执行时间。cleanup_expired_data任务会在每天凌晨3点执行,自动清理超过30天的数据。
高级功能示例
任务链和依赖
@huey.task()
def download_file(url):
"""下载文件任务"""
import requests
print(f"下载文件: {url}")
# 模拟下载
response = requests.get(url)
filename = url.split('/')[-1]
with open(filename, 'wb') as f:
f.write(response.content)
return filename
@huey.task()
def process_file(filename):
"""处理文件任务"""
print(f"处理文件: {filename}")
# 模拟文件处理
time.sleep(3)
processed_filename = f"processed_{filename}"
# 处理逻辑...
return processed_filename
@huey.task()
def upload_file(filename):
"""上传文件任务"""
print(f"上传文件: {filename}")
# 模拟上传
time.sleep(2)
return f"https://cdn.example.com/{filename}"
def process_workflow(url):
"""文件处理工作流"""
# 创建任务链
download_result = download_file(url)
# 等待下载完成后处理
if download_result.is_ready():
filename = download_result.get()
process_result = process_file(filename)
# 等待处理完成后上传
if process_result.is_ready():
processed_filename = process_result.get()
upload_result = upload_file(processed_filename)
return upload_result
return "工作流正在执行中..."
任务状态监控
@huey.task()
def long_running_task(duration):
"""长时间运行的任务"""
import time
for i in range(duration):
print(f"任务进度: {i+1}/{duration}")
time.sleep(1)
return f"任务完成,耗时 {duration} 秒"
def monitor_task():
"""监控任务执行状态"""
# 启动任务
result = long_running_task(10)
task_id = result.id
print(f"任务已启动,ID: {task_id}")
# 监控任务状态
while not result.is_ready():
print("任务仍在执行中...")
time.sleep(2)
# 获取结果
final_result = result.get()
print(f"任务完成: {final_result}")
return final_result
🔧 启动和管理
启动 Huey Consumer
要运行Huey任务,需要启动Huey Consumer(消费者)进程:
# 基本启动命令
huey_consumer.py tasks.huey
# 指定工作进程数
huey_consumer.py tasks.huey -w 4
# 启用详细日志
huey_consumer.py tasks.huey -v
# 后台运行
nohup huey_consumer.py tasks.huey -w 4 > huey.log 2>&1 &
生产环境配置
# production_tasks.py
from huey import RedisHuey
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 生产环境配置
huey = RedisHuey(
'production_app',
host='redis.example.com',
port=6379,
db=1,
password='your_redis_password',
# 连接池配置
connection_pool_kwargs={
'max_connections': 20,
'retry_on_timeout': True
}
)
@huey.task(retries=5, retry_delay=300) # 5次重试,每次间隔5分钟
def critical_task(data):
"""关键任务处理"""
try:
logger.info(f"处理关键任务: {data}")
# 任务逻辑
result = process_critical_data(data)
logger.info(f"关键任务完成: {result}")
return result
except Exception as e:
logger.error(f"关键任务失败: {e}")
raise # 重新抛出异常以触发重试
⚠️ 注意事项
任务设计最佳实践
- 任务幂等性
@huey.task() def idempotent_task(user_id, operation_id): """幂等任务示例""" # 检查操作是否已经执行 if is_operation_completed(operation_id): return get_operation_result(operation_id) # 执行操作 result = perform_operation(user_id) # 记录操作完成状态 mark_operation_completed(operation_id, result) return result - 任务超时处理 ```python import signal
@huey.task() def task_with_timeout(): “"”带超时的任务””” def timeout_handler(signum, frame): raise TimeoutError(“任务执行超时”)
# 设置30秒超时
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(30)
try:
# 执行任务逻辑
result = long_running_operation()
signal.alarm(0) # 取消超时
return result
except TimeoutError:
print("任务因超时被终止")
raise ```
- 错误处理和日志记录 ```python import logging
logger = logging.getLogger(name)
@huey.task(retries=3, retry_delay=60) def robust_task(data): “"”健壮的任务处理””” try: logger.info(f”开始处理任务: {data}”)
# 任务逻辑
result = process_data(data)
logger.info(f"任务处理成功: {result}")
return result
except ValueError as e:
# 不可重试的错误
logger.error(f"数据格式错误,不进行重试: {e}")
raise huey.TaskException(f"数据错误: {e}")
except ConnectionError as e:
# 可重试的错误
logger.warning(f"连接错误,将进行重试: {e}")
raise # 重新抛出以触发重试
except Exception as e:
logger.error(f"未知错误: {e}")
raise ```
性能优化建议
- 合理设置工作进程数量
- 避免任务中的阻塞操作
- 使用连接池减少连接开销
- 定期清理已完成的任务结果
- 监控队列长度和任务执行时间
🔗 相关内容
📚 扩展阅读
🏷️ 标签
任务队列 异步任务 定时任务 后台处理 Huey Redis SQLite 轻量级
最后更新: 2024-01-15
作者: Python 技术文档工程师
版本: 1.0
讨论与反馈
欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。