Celery
要全面和详细地讲解 Celery 手册,我会从基础概念到高级功能,逐步深入,并涵盖 Celery 的安装、配置、任务执行、错误处理、定时任务、监控工具、与 Django 集成等内容。Celery 是一个分布式任务队列框架,支持异步任务、调度任务,广泛应用于 Python 项目,特别是 Django 和 Flask 项目。
1. Celery 概述
1.1 什么是 Celery?
Celery
是一个用于处理分布式任务的异步任务队列系统,支持:
- 任务队列:用于处理异步操作,任务会被添加到队列中。
- 分布式执行:多个 worker 并行执行任务。
- 结果存储:任务执行的结果可以存储,供后续查询。
- 重试机制:任务失败时可以配置重试。
- 定时任务:支持周期性任务和一次性任务的调度。
1.2 Celery 的主要组件
- Celery 应用(App):Celery 的核心,用来管理任务、配置和 worker。
- Broker(消息代理):用于传递消息。常用的有
Redis
、RabbitMQ
。 - Worker(工作进程):用于处理任务的进程,多个 worker 可以并行处理任务。
- Result Backend(结果存储):用于存储任务的执行结果。常用的有
Redis
、数据库
。 - Task(任务):异步或同步执行的函数。
2. Celery 环境搭建和配置
2.1 安装 Celery 和 Redis
Celery 需要一个消息代理来处理任务调度,Redis
是一个常用的消息代理和结果存储。安装 Celery 和 Redis 依赖:
pip install celery[redis]
[redis]
安装额外的 Redis 依赖包。你也可以根据需求选择其他 broker(如 RabbitMQ)。
Redis 安装(MacOS 和 Ubuntu)
-
MacOS:
bash brew install redis brew services start redis
-
Ubuntu:
bash sudo apt update sudo apt install redis-server sudo systemctl enable redis-server sudo systemctl start redis-server
2.2 基本配置
在项目中,首先需要创建一个 Celery 应用,并指定消息代理和结果存储。
from celery import Celery
app = Celery('myapp',
broker='redis://localhost:6379/0', # Redis 作为消息代理
backend='redis://localhost:6379/0') # Redis 作为结果存储
- broker:指定消息代理的 URL。在本例中使用
Redis
。 - backend:任务执行结果的存储位置,使用
Redis
保存任务结果。
2.3 配置文件
可以通过 app.config_from_object
来加载配置文件。例如,在 celeryconfig.py
中定义配置:
# celeryconfig.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/0'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'UTC'
enable_utc = True
然后在 Celery 应用中引用该配置:
app.config_from_object('celeryconfig')
3. 定义和执行任务
3.1 创建任务
任务是 Celery 中的核心概念,任务可以是任何 Python 函数,只需要用 @app.task
装饰器标记为任务即可。以下是一个简单的任务:
@app.task
def add(x, y):
return x + y
@app.task
:用于将函数声明为 Celery 任务。
3.2 执行任务
Celery 任务可以同步或异步执行。
异步执行任务
result = add.delay(4, 6)
delay()
方法将任务发送给 Celery worker 并异步执行。任务将放入消息队列,等待 worker 执行。
同步获取任务结果
要获取任务结果,可以使用 result.get()
:
result = add.delay(4, 6)
print(result.get(timeout=10)) # 等待结果,超时时间10秒
get()
方法等待任务执行完成并获取结果。timeout
参数设置等待的超时时间。
3.3 检查任务状态
任务执行过程中,你可以随时检查其状态:
print(result.status) # 打印任务状态:PENDING, SUCCESS, FAILURE
3.4 常见的任务状态
- PENDING:任务还没有开始执行。
- STARTED:任务已经开始执行。
- SUCCESS:任务执行成功。
- FAILURE:任务执行失败。
- RETRY:任务失败并正在重试。
- REVOKED:任务被取消。
4. Worker 管理
4.1 启动 Worker
Celery Worker 是实际执行任务的进程。要启动一个 worker,使用以下命令:
celery -A myapp worker --loglevel=info
-A myapp
:指定 Celery 应用(myapp
是应用的名字)。worker
:启动 Celery 工作进程。--loglevel=info
:显示详细的日志信息。
4.2 停止 Worker
你可以通过 Ctrl+C
停止 Worker,或者使用以下命令优雅地关闭 Worker:
celery -A myapp control shutdown
4.3 多进程 Worker
为了提高任务的并发执行效率,Celery 允许你启动多个 worker 进程:
celery -A myapp worker --concurrency=4 --loglevel=info
--concurrency=4
:启动 4 个并发 worker 进程。
5. 任务重试和错误处理
5.1 任务重试
当任务由于某种原因失败时,可以使用 retry()
方法重新尝试执行任务。例如:
@app.task(bind=True, max_retries=3)
def error_prone_task(self):
try:
# 某些可能失败的代码
pass
except Exception as exc:
raise self.retry(exc=exc, countdown=60) # 失败后 60 秒后重试
max_retries=3
:最大重试次数为 3 次。countdown=60
:设置任务失败后的重试间隔时间为 60 秒。
5.2 任务超时
可以为任务设置超时时间,确保任务不会长时间挂起:
@app.task(time_limit=30)
def long_running_task():
# 执行耗时任务
pass
time_limit=30
:任务最多可以执行 30 秒,超时后会自动终止。
5.3 错误处理
在任务执行过程中,可以使用 try-except
结构来捕获和处理错误:
@app.task(bind=True)
def divide(self, x, y):
try:
return x / y
except ZeroDivisionError as e:
self.retry(exc=e, countdown=60) # 在 60 秒后重试
6. 定时任务(Periodic Tasks)
Celery 支持使用 beat
调度器来定期运行任务。
6.1 定义定时任务
你可以通过配置 Celery 的 beat_schedule
来定义定时任务。例如,每隔 30 秒运行一次任务:
from celery.schedules import crontab
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0, # 每隔 30 秒执行一次
'args': (16, 16)
},
'multiply-at-midnight': {
'task': 'tasks.mul',
'schedule': crontab(hour=0, minute=0), # 每天午夜执行
'args': (5, 5),
},
}
schedule=30.0
:任务每 30 秒执行一次。crontab()
:可以通过crontab
定义更复杂的调度规则。
6.2 启动 Celery Beat
要执行定时任务,你需要启动 Celery Beat:
celery -A myapp beat --loglevel=info
Beat 进程会按照定义的调度计划定期触发任务。
7. 监控 Celery
7.1 使用 Flower 监控 Celery
Flower
是一个基于 Web 的 Celery 实时监控工具。它允许
你监控任务执行、worker 状态等信息。
安装 Flower
pip install flower
启动 Flower
celery -A myapp flower
默认情况下,Flower 运行在 http://localhost:5555
,你可以在浏览器中查看实时监控页面。
7.2 使用命令行工具
你还可以使用 Celery 提供的命令行工具来监控和管理任务。
查看 Worker 状态
celery -A myapp status
撤销任务
celery -A myapp control revoke <task_id>
8. 与 Django 集成
Celery 很常用于 Django 项目来处理后台任务,如发送邮件、定时清理数据等。
8.1 安装 Celery 和 Django
在 Django 项目中安装 Celery:
pip install celery[redis]
8.2 配置 Celery 与 Django 的集成
在 settings.py
中为 Django 配置 Celery:
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
创建 celery.py
文件来初始化 Celery:
# celery.py
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# 使用 Django 的配置
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
在 Django 项目的 __init__.py
中导入 Celery 应用:
from .celery import app as celery_app
__all__ = ('celery_app',)
8.3 创建 Django 任务
你可以在 Django 应用的 tasks.py
文件中定义任务:
# tasks.py
from celery import shared_task
@shared_task
def send_email_task():
# 发送邮件的逻辑
pass
通过以上 Celery 的全面解读,你可以初步搭建和使用 Celery 处理异步任务。Celery 支持高度的扩展性,并能与不同框架和工具进行无缝集成。