Celery - 分布式任务队列与定时任务

学习使用Celery实现分布式异步任务处理与定时任务调度,包含安装、基本用法、周期性任务与结果存储等实战示例

分类: thirdparty 难度: 中级 更新: 2025-08-22
celery 分布式任务 异步任务 定时任务 任务调度 RabbitMQ Redis

Celery - 分布式任务队列与定时任务

📝 概述

在现代应用程序开发中,处理异步任务和分布式计算是常见的需求。Celery 是一个强大的分布式任务队列,它可以将任务异步执行,并在多台计算机上分布式运行。本文将深入介绍 Celery,包括其基本概念、安装方法、示例代码以及一些高级用法,以帮助你充分利用这一工具来处理异步任务和分布式计算。

主要特点:

  • 分布式执行:支持将任务分发到多个计算机上,以实现分布式执行,从而提高处理能力和性能
  • 异步任务:可以将任务交给 Celery 执行,而不必等待任务完成
  • 任务调度:可在特定时间或间隔内运行任务
  • 任务结果存储:支持将任务结果存储在后端以便检索
  • 可扩展性:支持多个消息代理,包括 RabbitMQ、Redis 等
  • 容错性:具备容错能力,处理任务执行过程中的错误和故障

项目地址: https://github.com/celery/celery

🎯 学习目标

  • 理解 Celery 的核心概念:App、Task、Broker、Backend
  • 完成 Celery 的安装与环境配置
  • 编写并运行基础任务:创建应用、启动 worker、调用任务
  • 实现周期性任务与 crontab 调度
  • 使用 Redis/RabbitMQ 作为消息代理与结果后端
  • 了解分布式部署、监控与调优要点

📋 前置知识

  • Python 基础语法与模块化
  • 进程/线程与并发基础
  • 消息队列基础:RabbitMQ 或 Redis 的基本使用
  • Linux/容器化部署基础(可选)

⚙️ 安装 Celery

pip install celery
# 如果使用 Redis 作为 broker/backend
pip install "celery[redis]"
# 如果使用 RabbitMQ,请安装并运行 RabbitMQ 服务器

🚀 基本用法

1) 创建 Celery 应用

创建一个名为 celery_app.py 的 Python 文件:

from celery import Celery

# broker 使用 RabbitMQ,格式示例:pyamqp://用户:密码@主机:端口/虚拟主机
app = Celery('myapp', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

2) 启动 Worker

在终端启动 Celery Worker:

celery -A celery_app worker --loglevel=info

3) 调用任务

创建 main.py

from celery_app import add

result = add.delay(4, 5)
print(result.get())  # 获取结果(会阻塞直到任务完成)

运行:

python main.py

⏰ 周期性与定时任务

Celery 内置周期性任务调度能力,可以使用 add_periodic_task 或 crontab 表达式。

from celery import Celery
from celery.schedules import crontab

app = Celery('myapp', broker='pyamqp://guest@localhost//')

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每分钟执行一次 my_task
    sender.add_periodic_task(60.0, my_task.s(), name='run every 1 minute')

@app.task
def my_task():
    # 定时任务逻辑
    print('Periodic task executed')

🌐 分布式任务与 Broker 配置

Celery 支持多种 Broker(消息代理)。以 Redis 为例:

# 安装 redis 扩展
pip install "celery[redis]"
from celery import Celery

# 使用 Redis 作为 broker
app = Celery('myapp', broker='redis://localhost:6379/0')

在多台计算机上运行多个 Worker,即可实现分布式执行。

🗃️ 结果存储 Backend 配置

将任务结果存储到 Redis,便于后续查询:

from celery import Celery
from celery.result import AsyncResult

app = Celery(
    'myapp',
    broker='pyamqp://guest@localhost//',
    backend='redis://localhost:6379/0'
)

@app.task
def add(x, y):
    return x + y

result = add.delay(4, 5)

# 获取任务ID与结果
result_id = result.id
result_obj = AsyncResult(result_id, app=app)
print(result_obj.get())

🔧 进阶配置与实践建议

  • 使用配置文件或环境变量管理 broker/backend 地址
  • 区分开发、测试、生产配置(队列名、并发数、序列化方式等)
  • 任务应保持幂等性,必要时实现重试(autoretry_for)
  • 使用 Flower 等工具监控任务执行
  • 合理拆分任务粒度,避免长时间阻塞

更多入门示例与实践补充(来自原始资料)

Celery 是什么?优势

  • 异步任务:将耗时操作(发送短信/邮件、消息推送、音视频处理等)交给 Celery 异步执行
  • 定时任务:例如每天定时运行爬虫
  • 分布式:可扩展为分布式爬虫系统等
  • 简单、高可用、快速、灵活:大部分组件可扩展或独立使用

执行流程图

Celery 执行流程图

使用 Redis 作为 Broker/Backend 的完整示例

# tasks.py
import time
from celery import Celery

# 消息中间件与结果存储均使用 Redis
app = Celery(
    'celeryDemo',
    broker='redis://localhost:6379/1',
    backend='redis://localhost:6379/2',
)

@app.task
def add(x, y):
    print('task enter ....')
    time.sleep(5)
    return x + y
# app.py
from tasks import add

if __name__ == '__main__':
    print('task start....')
    result = add.delay(2, 3)
    print('task end....')
    print(result)

启动 worker:

celery worker -A tasks -l INFO

配置文件与多任务拆分

# celery_demo/__init__.py
from celery import Celery

app = Celery(
    'demo',
    include=[
        'celery_demo.task1',
        'celery_demo.task2',
    ]
)
app.config_from_object('celery_demo.celeryconfig')
# celery_demo/celeryconfig.py (节选)
BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# celery_demo/task1.py
import time
from celery_demo import app

@app.task
def add(x, y):
    time.sleep(5)
    return x + y
# celery_demo/task2.py
import time
from celery_demo import app

@app.task
def mut(x, y):
    time.sleep(5)
    return x * y

定时任务(celery beat)

# celery_demo/celeryconfig.py (定时任务节选)
from datetime import timedelta
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'task1': {
        'task': 'celery_demo.task1.add',
        'schedule': timedelta(seconds=10),  # 每 10 秒
        'args': (10, 20),
    },
    'task2': {
        'task': 'celery_demo.task2.mut',
        'schedule': crontab(hour=22, minute=24),  # 每天 22:24
        'args': (10, 10),
    },
}

启动:

celery beat -A celery_demo -l INFO

队列与路由、限速与专门 worker

# celeryconf.py (节选)
from kombu import Exchange, Queue

CELERY_QUEUES = (
    Queue('default', exchange=Exchange('default'), routing_key='default'),
    Queue('crawl_caipu_list', exchange='crawl_caipu_list', routing_key='crawl_caipu_list'),
    Queue('crawl_caipu_detail', exchange='crawl_caipu_detail', routing_key='crawl_caipu_detail'),
)

CELERY_ROUTES = {
    'celery_app.teskone.crawl_caipu_list': {
        'queue': 'crawl_caipu_list',
        'routing_key': 'crawl_caipu_list',
    },
    'celery_app.teskone.crawl_caipu_detail': {
        'queue': 'crawl_caipu_detail',
        'routing_key': 'crawl_caipu_detail',
    },
}

# 限制所有任务的请求频率(示例)
CELERY_ANNOTATIONS = {'*': {'rate_limit': '1/s'}}

运行只消费特定队列的 worker:

celery worker -A celery项目 -l INFO -Q queuename

分布式爬虫示例

简单的使用 Celery 完成“下厨房”菜谱详情分布式爬虫示例项目:

  • https://github.com/ljhyigehaoren/celery_best.git

🔗 相关内容

📚 扩展阅读

  • 官方文档: https://docs.celeryq.dev/
  • GitHub: https://github.com/celery/celery
  • Brocker/Backend 选型: Redis vs RabbitMQ

🏷️ 标签

分布式任务 异步任务 定时任务 任务调度 Celery Redis RabbitMQ


最后更新: 2025-08-22
作者: Python 技术文档工程师
版本: 1.0

作者: Python 技术文档工程师

版本: 1.0

讨论与反馈

欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。