Django 集成 Celery 实现分布式任务

1、安装

pip install celery django-celery-results gevent flower

2、配置

setting.py

# settings.py

# 添加应用到 INSTALLED_APPS
INSTALLED_APPS = [
    ...,
    'django_celery_results',
]

# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # 使用 Redis 作为消息代理
CELERY_RESULT_BACKEND = 'django-db'  # 使用 Django 数据库作为结果后端
CELERY_CACHE_BACKEND = 'django-cache'  # 可选:使用 Django 缓存作为中间结果存储
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'  # 设置时区
CELERY_ENABLE_UTC = True

# 可选:配置任务结果过期时间(秒)
CELERY_RESULT_EXPIRES = 60 * 60 * 24 * 7  # 7天

# 可选:配置任务跟踪
CELERY_TASK_TRACK_STARTED = True  # 跟踪任务开始时间

celery.py

这个文件需要在项目目录下创建,也就是setting所在的目录。如 admin/setting.py, 则创建一个 admin/celery.py 文件,内容如下

# admin/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'admin.settings')

app = Celery('admin')

# 从 Django 设置中加载 Celery 配置
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现所有 Django 应用中的 tasks.py
app.autodiscover_tasks()

# 可选:添加自定义任务路由
app.conf.task_routes = {
    'tasks.high_priority.*': {'queue': 'high_priority'},
    'tasks.low_priority.*': {'queue': 'low_priority'},
}

__init__.py

这个文件也是与setting.py同级的,一般是原来有的文件。在里面添加以下内容

# admin/__init__.py
from .celery import app as celery_app

__all__ = ('celery_app',)

3、数据迁移

运行迁移命令创建结果存储表

python manage.py migrate django_celery_results

4、创建任务

在应用目录下创建 tasks.py

# myapp/tasks.py
from celery import shared_task
from time import sleep
from loguru import logger # type: ignore

@shared_task(bind=True, name='process_data_task')
def process_data_task(self, data_id):
    """处理数据的任务示例"""
    logger.info(f'process_data_task>>>>{data_id} >>> {self.request.id}')
    try:
        # 获取任务状态对象
        task = self.AsyncResult(self.request.id)
        
        sleep(2)
        
        return {
            'status': 'success',
            'object_id': data_id
        }
    except Exception as e:
        # 记录错误
        self.update_state(
            state='FAILURE',
            meta={
                'exc_type': type(e).__name__,
                'exc_message': str(e)
            }
        )
        raise

5、调用任务

在业务代码中调用任务,如

# 异步调用任务
task = process_data_task.delay(data_id)

# 重写修改
def update(self, request, *args, **kwargs):

    print(request.data)
    partial = kwargs.pop('partial', False)
    instance = self.get_object()
    id = instance.id
    task = process_data_task.delay(id)
    logger.info(f'task>>>>{task}')
    # 获取序列化器
    serializer = self.get_serializer(instance, data=request.data, partial=partial)
    serializer.is_valid(raise_exception=True)
    self.perform_update(serializer)
    role_id = serializer.data['id']

6、启动

# 启动 worker (异步任务)
celery -A admin worker --loglevel=info [-P gevent]

# 启动 beat 调度器(定时任务)
celery -A admin beat --loglevel=info

# 管理工具
celery -A admin flower --port-5555

7、监控与调试技巧

在 Django shell 中查询任务

from django_celery_results.models import TaskResult

# 获取所有成功的任务
success_tasks = TaskResult.objects.filter(status='SUCCESS')

# 获取最近24小时失败的任务
from django.utils import timezone
from datetime import timedelta

failed_tasks = TaskResult.objects.filter(
    status='FAILURE',
    date_done__gte=timezone.now() - timedelta(days=1)
)

# 获取特定任务的结果
task = TaskResult.objects.get(task_id='your-task-id')
print(task.result)  # 任务返回值
print(task.traceback)  # 错误堆栈

清理旧任务

# management/commands/cleanup_tasks.py
from django.core.management.base import BaseCommand
from django.utils import timezone
from django_celery_results.models import TaskResult
from datetime import timedelta

class Command(BaseCommand):
    help = '清理过期任务结果'
    
    def add_arguments(self, parser):
        parser.add_argument(
            '--days',
            type=int,
            default=30,
            help='保留天数(默认30天)'
        )
    
    def handle(self, *args, **options):
        days = options['days']
        cutoff = timezone.now() - timedelta(days=days)
        
        # 删除过期任务
        deleted, _ = TaskResult.objects.filter(
            date_done__lt=cutoff
        ).delete()
        
        self.stdout.write(
            self.style.SUCCESS(
                f'成功删除 {deleted} 个超过 {days} 天的任务记录'
            )
        )

运行清理命令

python manage.py cleanup_tasks --days=7
This entry was posted in 应用. Bookmark the permalink.