Django + Celery + Redis 异步任务完整流程详解
📚 目录
核心概念
三大组件
| 组件 | 作用 | 比喻 |
|---|---|---|
| Django | Web 应用,发送任务 | 老板(下达任务) |
| Redis | 消息队列,存储任务 | 任务板(贴便签) |
| Celery Worker | 异步执行任务 | 员工(执行任务) |
为什么需要异步任务?
# ❌ 同步方式(用户等待很久)
def upload_video(request):
video = save_video(request.FILES['video'])
process_video(video.id) # 转码需要 5 分钟!用户一直等待...
return Response({"status": "ok"})
# ✅ 异步方式(用户立即得到响应)
def upload_video(request):
video = save_video(request.FILES['video'])
process_video.delay(video.id) # 发送到后台处理,立即返回
return Response({"status": "processing"})
架构图
整体架构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Django │ │ Redis │ │ Celery │
│ (Web App) │────────▶│ (Broker) │────────▶│ Worker │
│ │ 发送任务 │ │ 取出任务 │ │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
│ 1. 用户上传视频 │ 2. 存储任务队列 │ 3. 执行转码
│ process_video.delay() │ LPUSH celery {...} │ 处理视频
│ │ │
└────────────────────────┴────────────────────────┘
4. 返回结果(可选)
SETEX celery-task-meta-xxx {...}
数据流向
用户上传视频
↓
前端: POST /api/videos/merge-chunks/
↓
Django 视图: MergeChunksView.post()
↓
创建视频记录: Video.objects.create(...)
↓
发送异步任务: process_video.delay(video.id)
↓
┌─────────────────────────────────────────────┐
│ Redis 存储任务 │
│ LPUSH celery '{"task": "videos.tasks. │
│ process_video", "args": [12], ...}' │
└─────────────────────────────────────────────┘
↓
Celery Worker 监听队列
↓
BRPOP celery (从队列取出任务)
↓
执行任务: process_video(12)
├─ 获取视频信息 (ffprobe)
├─ 生成缩略图 (ffmpeg)
├─ 转码多分辨率 (ffmpeg)
└─ 更新数据库
↓
更新 Redis 结果: SETEX celery-task-meta-xxx
'{"status": "SUCCESS", "result": true, ...}'
↓
任务完成!
配置详解
1. Django Settings (settings.py)
# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0' # Redis 作为消息队列
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # Redis 存储任务结果
CELERY_ACCEPT_CONTENT = ['json'] # 接受 JSON 格式
CELERY_TASK_SERIALIZER = 'json' # 任务序列化为 JSON
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化为 JSON
CELERY_TIMEZONE = 'Asia/Shanghai' # 时区设置
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True # 启动时重试连接
配置说明:
BROKER_URL: 任务队列地址(任务发送到这里)RESULT_BACKEND: 结果存储地址(任务结果保存到这里)TASK_SERIALIZER: 任务如何序列化(JSON 格式)
2. Celery 应用配置 (video/celery.py)
import os
from celery import Celery
from celery.schedules import crontab
# 设置 Django 环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'video.settings')
# 创建 Celery 实例
app = Celery('video')
# 从 Django settings 加载配置(所有以 CELERY_ 开头的配置)
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现所有 app 中的 tasks.py 文件
app.autodiscover_tasks()
# 配置定时任务(可选)
app.conf.beat_schedule = {
'cleanup-deleted-videos-daily': {
'task': 'videos.tasks.cleanup_deleted_videos',
'schedule': crontab(hour=3, minute=0), # 每天凌晨3点执行
},
}
3. Django 项目初始化 (video/init.py)
# 确保 Django 启动时加载 Celery
from .celery import app as celery_app
__all__ = ('celery_app',)
4. 定义异步任务 (videos/tasks.py)
from celery import shared_task
import logging
logger = logging.getLogger(__name__)
@shared_task # 装饰器标记这是一个 Celery 任务
def process_video(video_id):
"""处理视频:转码、生成缩略图等"""
from .models import Video
try:
video = Video.objects.get(id=video_id)
logger.info(f"开始处理视频 {video_id}")
# 1. 获取视频信息
# 2. 生成缩略图
# 3. 转码多分辨率
# 4. 更新数据库
logger.info(f"视频 {video_id} 处理完成")
return True
except Exception as e:
logger.error(f"处理视频失败: {str(e)}")
return False
5. 视图中调用任务 (videos/views.py)
from rest_framework.views import APIView
from rest_framework.response import Response
from .tasks import process_video
from .models import Video
class MergeChunksView(APIView):
def post(self, request):
# 1. 合并文件分片
# 2. 创建视频记录
video = Video.objects.create(
title=request.data.get('file_name'),
user=request.user,
video_file='videos/uploads/2025/01/06/abc123.mp4'
)
# 3. 🔥 发送异步任务(关键!)
process_video.delay(video.id)
# ↑
# .delay() 方法将任务发送到 Redis
# 4. 立即返回响应(不等待任务完成)
return Response({
"detail": "文件合并成功",
"video": {"id": video.id, "status": "processing"}
})
完整流程演示
场景:用户上传视频
步骤 1:前端发起请求
// 前端代码
const response = await axios.post('/api/videos/merge-chunks/', {
file_name: 'my-video.mp4',
file_md5: 'abc123...',
chunks_total: 10
});
console.log(response.data);
// { "detail": "文件合并成功", "video": { "id": 12, "status": "processing" } }
步骤 2:Django 处理请求
# views.py
def post(self, request):
# 创建视频记录
video = Video.objects.create(...)
# 发送异步任务
task = process_video.delay(video.id)
print(f"任务ID: {task.id}") # 4d94e1c1-de02-4772-9fea-2fad4872b8b0
return Response({"video": {"id": video.id}})
步骤 3:Redis 存储任务
此时在 Redis 中执行:
# 查看队列长度
redis-cli LLEN celery
# (integer) 1 ← 有一个任务在队列中
# 查看任务内容
redis-cli LRANGE celery 0 -1
# {
# "task": "videos.tasks.process_video",
# "id": "4d94e1c1-de02-4772-9fea-2fad4872b8b0",
# "args": [12],
# "kwargs": {},
# "retries": 0,
# ...
# }
步骤 4:Celery Worker 执行任务
# Celery Worker 日志
[2026-01-06 21:52:20,762] INFO: 开始处理视频 12
[2026-01-06 21:52:20,762] INFO: 从 format.duration 获取到时长: 47.018333秒
[2026-01-06 21:52:20,762] INFO: 检测到视频分辨率: 1280x720
[2026-01-06 21:52:23,631] INFO: [1/3] 完成 720p 分辨率转码
[2026-01-06 21:52:27,228] INFO: [2/3] 完成 480p 分辨率转码
[2026-01-06 21:52:30,039] INFO: [3/3] 完成 360p 分辨率转码
[2026-01-06 21:52:30,049] INFO: 视频 12 处理完成
步骤 5:查看任务结果
# 查看任务结果
redis-cli GET "celery-task-meta-4d94e1c1-de02-4772-9fea-2fad4872b8b0"
# {
# "status": "SUCCESS",
# "result": true,
# "traceback": null,
# "date_done": "2026-01-06T14:55:40.890936",
# "task_id": "4d94e1c1-de02-4772-9fea-2fad4872b8b0"
# }
Redis 数据结构
1. 任务队列
# 键名
celery
# 数据类型
List (列表)
# 操作
LPUSH celery '{"task": "...", ...}' # Django 发送任务(左侧推入)
BRPOP celery 1 # Celery Worker 取任务(右侧弹出)
2. 任务结果
# 键名格式
celery-task-meta-<task_id>
# 数据类型
String (字符串)
# 操作
SETEX celery-task-meta-xxx 86400 '{"status": "SUCCESS", ...}' # 设置结果(24小时过期)
GET celery-task-meta-xxx # 获取结果
3. 队列绑定(元数据)
# Celery 自动创建的键
_kombu.binding.celery # 默认任务队列绑定
_kombu.binding.celery.pidbox # 控制命令队列(revoke、terminate等)
_kombu.binding.celeryev # 事件监控队列
4. 实时监控 Redis
# 监控所有 Redis 命令
redis-cli MONITOR
# 输出示例:
# 1. 任务被推入队列
"LPUSH" "celery" "{\"task\":\"videos.tasks.process_video\",\"args\":[12],...}"
# 2. Worker 取出任务
"BRPOP" "celery" "1"
# 3. 保存任务结果
"SETEX" "celery-task-meta-abc-123" "86400" "{\"status\":\"SUCCESS\",...}"
实战示例
示例 1:发送简单任务
# 在 Django shell 中测试
python manage.py shell
>>> from videos.tasks import process_video
>>> task = process_video.delay(12)
>>> print(f"任务ID: {task.id}")
任务ID: 4d94e1c1-de02-4772-9fea-2fad4872b8b0
>>> # 检查任务状态
>>> task.status
'SUCCESS'
>>> # 获取任务结果
>>> task.result
True
示例 2:带参数的任务
# tasks.py
@shared_task
def send_email(user_id, subject, message):
from users.models import User
user = User.objects.get(id=user_id)
# 发送邮件逻辑
return f"邮件已发送给 {user.email}"
# views.py
send_email.delay(
user_id=request.user.id,
subject="欢迎注册",
message="感谢您的注册!"
)
示例 3:定时任务
# celery.py
app.conf.beat_schedule = {
# 每天凌晨3点清理已删除视频
'cleanup-deleted-videos': {
'task': 'videos.tasks.cleanup_deleted_videos',
'schedule': crontab(hour=3, minute=0),
},
# 每小时统计一次数据
'hourly-stats': {
'task': 'videos.tasks.calculate_stats',
'schedule': crontab(minute=0), # 每小时的第0分钟
},
# 每5分钟检查一次
'check-processing-videos': {
'task': 'videos.tasks.check_processing_status',
'schedule': 300.0, # 300秒 = 5分钟
},
}
示例 4:任务链(Task Chain)
from celery import chain
# 按顺序执行多个任务
result = chain(
process_video.s(video_id), # 1. 处理视频
generate_thumbnail.s(), # 2. 生成缩略图
send_notification.s(user_id) # 3. 发送通知
).apply_async()
示例 5:任务组(Task Group)
from celery import group
# 并行执行多个任务
job = group([
process_video.s(1),
process_video.s(2),
process_video.s(3),
])
result = job.apply_async()
常用命令
Celery 命令
# 启动 Worker(处理异步任务)
celery -A video worker -l info
# 启动 Worker(Windows)
celery -A video worker -l info --pool=solo
# 启动 Beat(处理定时任务)
celery -A video beat -l info
# 同时启动 Worker 和 Beat
celery -A video worker -B -l info
# 查看注册的任务
celery -A video inspect registered
# 查看活跃的任务
celery -A video inspect active
# 查看统计信息
celery -A video inspect stats
# 撤销任务
celery -A video control revoke <task_id>
# 清空队列
celery -A video purge
Redis 命令
# 连接 Redis
redis-cli
# 查看所有键
keys *
# 查看 Celery 相关的键
keys celery*
keys _kombu*
# 查看队列长度
LLEN celery
# 查看队列中的任务(不删除)
LRANGE celery 0 -1
# 查看任务结果
GET celery-task-meta-<task_id>
# 删除任务结果
DEL celery-task-meta-<task_id>
# 清空所有 Celery 数据
FLUSHDB
# 监控 Redis 实时命令
MONITOR
Django 命令
# 测试 Celery 任务
python manage.py shell
>>> from videos.tasks import process_video
>>> process_video.delay(12)
# 查看数据库中的视频
python manage.py shell
>>> from videos.models import Video
>>> Video.objects.get(id=12).duration
47.018333
常见问题
Q1: 任务发送了但没有执行?
检查清单:
-
Celery Worker 是否启动?
celery -A video worker -l info -
Redis 是否运行?
redis-cli ping # 应该返回 PONG -
查看队列中是否有任务?
redis-cli LLEN celery
Q2: 如何查看任务执行日志?
# 启动 Worker 时设置日志级别
celery -A video worker -l debug
# 或在 Django settings.py 中配置日志
LOGGING = {
'loggers': {
'celery': {
'handlers': ['console'],
'level': 'INFO',
},
},
}
Q3: 任务执行失败如何重试?
@shared_task(bind=True, max_retries=3)
def process_video(self, video_id):
try:
# 处理视频
pass
except Exception as exc:
# 5分钟后重试
raise self.retry(exc=exc, countdown=300)
Q4: 如何限制任务并发数?
# settings.py
CELERY_WORKER_CONCURRENCY = 4 # 同时执行4个任务
# 或启动时指定
celery -A video worker -l info --concurrency=4
总结
核心流程
1. Django 视图调用 task.delay()
↓
2. 任务序列化为 JSON 并发送到 Redis
↓
3. Celery Worker 从 Redis 取出任务
↓
4. 执行任务函数
↓
5. 将结果保存到 Redis(可选)
关键点
- ✅ 异步执行:用户不需要等待耗时操作
- ✅ 解耦:Web 应用和任务处理分离
- ✅ 可靠性:任务存储在 Redis,Worker 崩溃后可重新执行
- ✅ 可扩展:可以启动多个 Worker 并行处理
- ✅ 定时任务:支持 cron 表达式的定时任务
最佳实践
- 任务要幂等:同一个任务执行多次结果一致
- 任务要短小:避免单个任务执行时间过长
- 合理设置超时:防止任务卡死
- 监控任务状态:及时发现失败的任务
- 日志记录:方便排查问题
作者: Kiro AI
日期: 2026-01-06
版本: 1.0
评论区