1、安装
pip install celery django-celery-results gevent flower
2、配置
setting.py
# settings.py
# 添加应用到 INSTALLED_APPS
INSTALLED_APPS = [
...,
'django_celery_results',
]
# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0' # 使用 Redis 作为消息代理
CELERY_RESULT_BACKEND = 'django-db' # 使用 Django 数据库作为结果后端
CELERY_CACHE_BACKEND = 'django-cache' # 可选:使用 Django 缓存作为中间结果存储
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai' # 设置时区
CELERY_ENABLE_UTC = True
# 可选:配置任务结果过期时间(秒)
CELERY_RESULT_EXPIRES = 60 * 60 * 24 * 7 # 7天
# 可选:配置任务跟踪
CELERY_TASK_TRACK_STARTED = True # 跟踪任务开始时间
celery.py
这个文件需要在项目目录下创建,也就是setting所在的目录。如 admin/setting.py, 则创建一个 admin/celery.py 文件,内容如下
# admin/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'admin.settings')
app = Celery('admin')
# 从 Django 设置中加载 Celery 配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现所有 Django 应用中的 tasks.py
app.autodiscover_tasks()
# 可选:添加自定义任务路由
app.conf.task_routes = {
'tasks.high_priority.*': {'queue': 'high_priority'},
'tasks.low_priority.*': {'queue': 'low_priority'},
}
__init__.py
这个文件也是与setting.py同级的,一般是原来有的文件。在里面添加以下内容
# admin/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
3、数据迁移
运行迁移命令创建结果存储表
python manage.py migrate django_celery_results
4、创建任务
在应用目录下创建 tasks.py
# myapp/tasks.py
from celery import shared_task
from time import sleep
from loguru import logger # type: ignore
@shared_task(bind=True, name='process_data_task')
def process_data_task(self, data_id):
"""处理数据的任务示例"""
logger.info(f'process_data_task>>>>{data_id} >>> {self.request.id}')
try:
# 获取任务状态对象
task = self.AsyncResult(self.request.id)
sleep(2)
return {
'status': 'success',
'object_id': data_id
}
except Exception as e:
# 记录错误
self.update_state(
state='FAILURE',
meta={
'exc_type': type(e).__name__,
'exc_message': str(e)
}
)
raise
5、调用任务
在业务代码中调用任务,如
# 异步调用任务
task = process_data_task.delay(data_id)
# 重写修改
def update(self, request, *args, **kwargs):
print(request.data)
partial = kwargs.pop('partial', False)
instance = self.get_object()
id = instance.id
task = process_data_task.delay(id)
logger.info(f'task>>>>{task}')
# 获取序列化器
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
role_id = serializer.data['id']
6、启动
# 启动 worker (异步任务)
celery -A admin worker --loglevel=info [-P gevent]
# 启动 beat 调度器(定时任务)
celery -A admin beat --loglevel=info
# 管理工具
celery -A admin flower --port-5555

7、监控与调试技巧
在 Django shell 中查询任务
from django_celery_results.models import TaskResult
# 获取所有成功的任务
success_tasks = TaskResult.objects.filter(status='SUCCESS')
# 获取最近24小时失败的任务
from django.utils import timezone
from datetime import timedelta
failed_tasks = TaskResult.objects.filter(
status='FAILURE',
date_done__gte=timezone.now() - timedelta(days=1)
)
# 获取特定任务的结果
task = TaskResult.objects.get(task_id='your-task-id')
print(task.result) # 任务返回值
print(task.traceback) # 错误堆栈
清理旧任务
# management/commands/cleanup_tasks.py
from django.core.management.base import BaseCommand
from django.utils import timezone
from django_celery_results.models import TaskResult
from datetime import timedelta
class Command(BaseCommand):
help = '清理过期任务结果'
def add_arguments(self, parser):
parser.add_argument(
'--days',
type=int,
default=30,
help='保留天数(默认30天)'
)
def handle(self, *args, **options):
days = options['days']
cutoff = timezone.now() - timedelta(days=days)
# 删除过期任务
deleted, _ = TaskResult.objects.filter(
date_done__lt=cutoff
).delete()
self.stdout.write(
self.style.SUCCESS(
f'成功删除 {deleted} 个超过 {days} 天的任务记录'
)
)
运行清理命令
python manage.py cleanup_tasks --days=7