跳转至

Celery

要全面和详细地讲解 Celery 手册,我会从基础概念到高级功能,逐步深入,并涵盖 Celery 的安装、配置、任务执行、错误处理、定时任务、监控工具、与 Django 集成等内容。Celery 是一个分布式任务队列框架,支持异步任务、调度任务,广泛应用于 Python 项目,特别是 Django 和 Flask 项目。


1. Celery 概述

1.1 什么是 Celery?

Celery 是一个用于处理分布式任务的异步任务队列系统,支持: - 任务队列:用于处理异步操作,任务会被添加到队列中。 - 分布式执行:多个 worker 并行执行任务。 - 结果存储:任务执行的结果可以存储,供后续查询。 - 重试机制:任务失败时可以配置重试。 - 定时任务:支持周期性任务和一次性任务的调度。

1.2 Celery 的主要组件

  • Celery 应用(App):Celery 的核心,用来管理任务、配置和 worker。
  • Broker(消息代理):用于传递消息。常用的有 RedisRabbitMQ
  • Worker(工作进程):用于处理任务的进程,多个 worker 可以并行处理任务。
  • Result Backend(结果存储):用于存储任务的执行结果。常用的有 Redis数据库
  • Task(任务):异步或同步执行的函数。

2. Celery 环境搭建和配置

2.1 安装 Celery 和 Redis

Celery 需要一个消息代理来处理任务调度,Redis 是一个常用的消息代理和结果存储。安装 Celery 和 Redis 依赖:

pip install celery[redis]

[redis] 安装额外的 Redis 依赖包。你也可以根据需求选择其他 broker(如 RabbitMQ)。

Redis 安装(MacOS 和 Ubuntu)

  • MacOSbash brew install redis brew services start redis

  • Ubuntubash sudo apt update sudo apt install redis-server sudo systemctl enable redis-server sudo systemctl start redis-server

2.2 基本配置

在项目中,首先需要创建一个 Celery 应用,并指定消息代理和结果存储。

from celery import Celery

app = Celery('myapp', 
             broker='redis://localhost:6379/0',  # Redis 作为消息代理
             backend='redis://localhost:6379/0')  # Redis 作为结果存储
  • broker:指定消息代理的 URL。在本例中使用 Redis
  • backend:任务执行结果的存储位置,使用 Redis 保存任务结果。

2.3 配置文件

可以通过 app.config_from_object 来加载配置文件。例如,在 celeryconfig.py 中定义配置:

# celeryconfig.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/0'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'UTC'
enable_utc = True

然后在 Celery 应用中引用该配置:

app.config_from_object('celeryconfig')

3. 定义和执行任务

3.1 创建任务

任务是 Celery 中的核心概念,任务可以是任何 Python 函数,只需要用 @app.task 装饰器标记为任务即可。以下是一个简单的任务:

@app.task
def add(x, y):
    return x + y
  • @app.task:用于将函数声明为 Celery 任务。

3.2 执行任务

Celery 任务可以同步或异步执行。

异步执行任务

result = add.delay(4, 6)
  • delay() 方法将任务发送给 Celery worker 并异步执行。任务将放入消息队列,等待 worker 执行。

同步获取任务结果

要获取任务结果,可以使用 result.get()

result = add.delay(4, 6)
print(result.get(timeout=10))  # 等待结果,超时时间10秒
  • get() 方法等待任务执行完成并获取结果。timeout 参数设置等待的超时时间。

3.3 检查任务状态

任务执行过程中,你可以随时检查其状态:

print(result.status)  # 打印任务状态:PENDING, SUCCESS, FAILURE

3.4 常见的任务状态

  • PENDING:任务还没有开始执行。
  • STARTED:任务已经开始执行。
  • SUCCESS:任务执行成功。
  • FAILURE:任务执行失败。
  • RETRY:任务失败并正在重试。
  • REVOKED:任务被取消。

4. Worker 管理

4.1 启动 Worker

Celery Worker 是实际执行任务的进程。要启动一个 worker,使用以下命令:

celery -A myapp worker --loglevel=info
  • -A myapp:指定 Celery 应用(myapp 是应用的名字)。
  • worker:启动 Celery 工作进程。
  • --loglevel=info:显示详细的日志信息。

4.2 停止 Worker

你可以通过 Ctrl+C 停止 Worker,或者使用以下命令优雅地关闭 Worker:

celery -A myapp control shutdown

4.3 多进程 Worker

为了提高任务的并发执行效率,Celery 允许你启动多个 worker 进程:

celery -A myapp worker --concurrency=4 --loglevel=info
  • --concurrency=4:启动 4 个并发 worker 进程。

5. 任务重试和错误处理

5.1 任务重试

当任务由于某种原因失败时,可以使用 retry() 方法重新尝试执行任务。例如:

@app.task(bind=True, max_retries=3)
def error_prone_task(self):
    try:
        # 某些可能失败的代码
        pass
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)  # 失败后 60 秒后重试
  • max_retries=3:最大重试次数为 3 次。
  • countdown=60:设置任务失败后的重试间隔时间为 60 秒。

5.2 任务超时

可以为任务设置超时时间,确保任务不会长时间挂起:

@app.task(time_limit=30)
def long_running_task():
    # 执行耗时任务
    pass
  • time_limit=30:任务最多可以执行 30 秒,超时后会自动终止。

5.3 错误处理

在任务执行过程中,可以使用 try-except 结构来捕获和处理错误:

@app.task(bind=True)
def divide(self, x, y):
    try:
        return x / y
    except ZeroDivisionError as e:
        self.retry(exc=e, countdown=60)  # 在 60 秒后重试

6. 定时任务(Periodic Tasks)

Celery 支持使用 beat 调度器来定期运行任务。

6.1 定义定时任务

你可以通过配置 Celery 的 beat_schedule 来定义定时任务。例如,每隔 30 秒运行一次任务:

from celery.schedules import crontab

app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,  # 每隔 30 秒执行一次
        'args': (16, 16)
    },
    'multiply-at-midnight': {
        'task': 'tasks.mul',
        'schedule': crontab(hour=0, minute=0),  # 每天午夜执行
        'args': (5, 5),
    },
}
  • schedule=30.0:任务每 30 秒执行一次。
  • crontab():可以通过 crontab 定义更复杂的调度规则。

6.2 启动 Celery Beat

要执行定时任务,你需要启动 Celery Beat:

celery -A myapp beat --loglevel=info

Beat 进程会按照定义的调度计划定期触发任务。


7. 监控 Celery

7.1 使用 Flower 监控 Celery

Flower 是一个基于 Web 的 Celery 实时监控工具。它允许

你监控任务执行、worker 状态等信息。

安装 Flower

pip install flower

启动 Flower

celery -A myapp flower

默认情况下,Flower 运行在 http://localhost:5555,你可以在浏览器中查看实时监控页面。

7.2 使用命令行工具

你还可以使用 Celery 提供的命令行工具来监控和管理任务。

查看 Worker 状态

celery -A myapp status

撤销任务

celery -A myapp control revoke <task_id>

8. 与 Django 集成

Celery 很常用于 Django 项目来处理后台任务,如发送邮件、定时清理数据等。

8.1 安装 Celery 和 Django

在 Django 项目中安装 Celery:

pip install celery[redis]

8.2 配置 Celery 与 Django 的集成

settings.py 中为 Django 配置 Celery:

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

创建 celery.py 文件来初始化 Celery:

# celery.py
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# 使用 Django 的配置
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

在 Django 项目的 __init__.py 中导入 Celery 应用:

from .celery import app as celery_app

__all__ = ('celery_app',)

8.3 创建 Django 任务

你可以在 Django 应用的 tasks.py 文件中定义任务:

# tasks.py
from celery import shared_task

@shared_task
def send_email_task():
    # 发送邮件的逻辑
    pass

通过以上 Celery 的全面解读,你可以初步搭建和使用 Celery 处理异步任务。Celery 支持高度的扩展性,并能与不同框架和工具进行无缝集成。