目 录CONTENT

文章目录

Django-Celery-Redis异步任务完整流程

~梓
2026-01-06 / 0 评论 / 0 点赞 / 5 阅读 / 0 字
温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

Django + Celery + Redis 异步任务完整流程详解

📚 目录


核心概念

三大组件

组件 作用 比喻
Django Web 应用,发送任务 老板(下达任务)
Redis 消息队列,存储任务 任务板(贴便签)
Celery Worker 异步执行任务 员工(执行任务)

为什么需要异步任务?

# ❌ 同步方式(用户等待很久)
def upload_video(request):
    video = save_video(request.FILES['video'])
    process_video(video.id)  # 转码需要 5 分钟!用户一直等待...
    return Response({"status": "ok"})

# ✅ 异步方式(用户立即得到响应)
def upload_video(request):
    video = save_video(request.FILES['video'])
    process_video.delay(video.id)  # 发送到后台处理,立即返回
    return Response({"status": "processing"})

架构图

整体架构

┌─────────────┐           ┌─────────────┐         ┌─────────────┐
│   Django    │           │    Redis    │         │   Celery    │
│  (Web App)  │────────▶│  (Broker)   │────────▶│   Worker    │
│             │  发送任务 │             │  取出任务 │             │
└─────────────┘           └─────────────┘         └─────────────┘
     │                        │                        │
     │ 1. 用户上传视频        │ 2. 存储任务队列        │ 3. 执行转码
     │ process_video.delay()  │ LPUSH celery {...}     │ 处理视频
     │                        │                        │
     └────────────────────────┴────────────────────────┘
                    4. 返回结果(可选)
                    SETEX celery-task-meta-xxx {...}

数据流向

用户上传视频
    ↓
前端: POST /api/videos/merge-chunks/
    ↓
Django 视图: MergeChunksView.post()
    ↓
创建视频记录: Video.objects.create(...)
    ↓
发送异步任务: process_video.delay(video.id)
    ↓
┌─────────────────────────────────────────────┐
│  Redis 存储任务                              │
│  LPUSH celery '{"task": "videos.tasks.     │
│  process_video", "args": [12], ...}'        │
└─────────────────────────────────────────────┘
    ↓
Celery Worker 监听队列
    ↓
BRPOP celery (从队列取出任务)
    ↓
执行任务: process_video(12)
    ├─ 获取视频信息 (ffprobe)
    ├─ 生成缩略图 (ffmpeg)
    ├─ 转码多分辨率 (ffmpeg)
    └─ 更新数据库
    ↓
更新 Redis 结果: SETEX celery-task-meta-xxx 
    '{"status": "SUCCESS", "result": true, ...}'
    ↓
任务完成!

配置详解

1. Django Settings (settings.py)

# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'        # Redis 作为消息队列
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'    # Redis 存储任务结果
CELERY_ACCEPT_CONTENT = ['json']                       # 接受 JSON 格式
CELERY_TASK_SERIALIZER = 'json'                        # 任务序列化为 JSON
CELERY_RESULT_SERIALIZER = 'json'                      # 结果序列化为 JSON
CELERY_TIMEZONE = 'Asia/Shanghai'                      # 时区设置
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True      # 启动时重试连接

配置说明:

  • BROKER_URL: 任务队列地址(任务发送到这里)
  • RESULT_BACKEND: 结果存储地址(任务结果保存到这里)
  • TASK_SERIALIZER: 任务如何序列化(JSON 格式)

2. Celery 应用配置 (video/celery.py)

import os
from celery import Celery
from celery.schedules import crontab

# 设置 Django 环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'video.settings')

# 创建 Celery 实例
app = Celery('video')

# 从 Django settings 加载配置(所有以 CELERY_ 开头的配置)
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现所有 app 中的 tasks.py 文件
app.autodiscover_tasks()

# 配置定时任务(可选)
app.conf.beat_schedule = {
    'cleanup-deleted-videos-daily': {
        'task': 'videos.tasks.cleanup_deleted_videos',
        'schedule': crontab(hour=3, minute=0),  # 每天凌晨3点执行
    },
}

3. Django 项目初始化 (video/init.py)

# 确保 Django 启动时加载 Celery
from .celery import app as celery_app

__all__ = ('celery_app',)

4. 定义异步任务 (videos/tasks.py)

from celery import shared_task
import logging

logger = logging.getLogger(__name__)

@shared_task  # 装饰器标记这是一个 Celery 任务
def process_video(video_id):
    """处理视频:转码、生成缩略图等"""
    from .models import Video
  
    try:
        video = Video.objects.get(id=video_id)
        logger.info(f"开始处理视频 {video_id}")
    
        # 1. 获取视频信息
        # 2. 生成缩略图
        # 3. 转码多分辨率
        # 4. 更新数据库
    
        logger.info(f"视频 {video_id} 处理完成")
        return True
    except Exception as e:
        logger.error(f"处理视频失败: {str(e)}")
        return False

5. 视图中调用任务 (videos/views.py)

from rest_framework.views import APIView
from rest_framework.response import Response
from .tasks import process_video
from .models import Video

class MergeChunksView(APIView):
    def post(self, request):
        # 1. 合并文件分片
        # 2. 创建视频记录
        video = Video.objects.create(
            title=request.data.get('file_name'),
            user=request.user,
            video_file='videos/uploads/2025/01/06/abc123.mp4'
        )
    
        # 3. 🔥 发送异步任务(关键!)
        process_video.delay(video.id)
        #              ↑
        #         .delay() 方法将任务发送到 Redis
    
        # 4. 立即返回响应(不等待任务完成)
        return Response({
            "detail": "文件合并成功",
            "video": {"id": video.id, "status": "processing"}
        })

完整流程演示

场景:用户上传视频

步骤 1:前端发起请求

// 前端代码
const response = await axios.post('/api/videos/merge-chunks/', {
  file_name: 'my-video.mp4',
  file_md5: 'abc123...',
  chunks_total: 10
});

console.log(response.data);
// { "detail": "文件合并成功", "video": { "id": 12, "status": "processing" } }

步骤 2:Django 处理请求

# views.py
def post(self, request):
    # 创建视频记录
    video = Video.objects.create(...)
  
    # 发送异步任务
    task = process_video.delay(video.id)
    print(f"任务ID: {task.id}")  # 4d94e1c1-de02-4772-9fea-2fad4872b8b0
  
    return Response({"video": {"id": video.id}})

步骤 3:Redis 存储任务

此时在 Redis 中执行:

# 查看队列长度
redis-cli LLEN celery
# (integer) 1  ← 有一个任务在队列中

# 查看任务内容
redis-cli LRANGE celery 0 -1
# {
#   "task": "videos.tasks.process_video",
#   "id": "4d94e1c1-de02-4772-9fea-2fad4872b8b0",
#   "args": [12],
#   "kwargs": {},
#   "retries": 0,
#   ...
# }

步骤 4:Celery Worker 执行任务

# Celery Worker 日志
[2026-01-06 21:52:20,762] INFO: 开始处理视频 12
[2026-01-06 21:52:20,762] INFO: 从 format.duration 获取到时长: 47.018333秒
[2026-01-06 21:52:20,762] INFO: 检测到视频分辨率: 1280x720
[2026-01-06 21:52:23,631] INFO: [1/3] 完成 720p 分辨率转码
[2026-01-06 21:52:27,228] INFO: [2/3] 完成 480p 分辨率转码
[2026-01-06 21:52:30,039] INFO: [3/3] 完成 360p 分辨率转码
[2026-01-06 21:52:30,049] INFO: 视频 12 处理完成

步骤 5:查看任务结果

# 查看任务结果
redis-cli GET "celery-task-meta-4d94e1c1-de02-4772-9fea-2fad4872b8b0"
# {
#   "status": "SUCCESS",
#   "result": true,
#   "traceback": null,
#   "date_done": "2026-01-06T14:55:40.890936",
#   "task_id": "4d94e1c1-de02-4772-9fea-2fad4872b8b0"
# }

Redis 数据结构

1. 任务队列

# 键名
celery

# 数据类型
List (列表)

# 操作
LPUSH celery '{"task": "...", ...}'  # Django 发送任务(左侧推入)
BRPOP celery 1                        # Celery Worker 取任务(右侧弹出)

2. 任务结果

# 键名格式
celery-task-meta-<task_id>

# 数据类型
String (字符串)

# 操作
SETEX celery-task-meta-xxx 86400 '{"status": "SUCCESS", ...}'  # 设置结果(24小时过期)
GET celery-task-meta-xxx                                         # 获取结果

3. 队列绑定(元数据)

# Celery 自动创建的键
_kombu.binding.celery          # 默认任务队列绑定
_kombu.binding.celery.pidbox   # 控制命令队列(revoke、terminate等)
_kombu.binding.celeryev        # 事件监控队列

4. 实时监控 Redis

# 监控所有 Redis 命令
redis-cli MONITOR

# 输出示例:
# 1. 任务被推入队列
"LPUSH" "celery" "{\"task\":\"videos.tasks.process_video\",\"args\":[12],...}"

# 2. Worker 取出任务
"BRPOP" "celery" "1"

# 3. 保存任务结果
"SETEX" "celery-task-meta-abc-123" "86400" "{\"status\":\"SUCCESS\",...}"

实战示例

示例 1:发送简单任务

# 在 Django shell 中测试
python manage.py shell

>>> from videos.tasks import process_video
>>> task = process_video.delay(12)
>>> print(f"任务ID: {task.id}")
任务ID: 4d94e1c1-de02-4772-9fea-2fad4872b8b0

>>> # 检查任务状态
>>> task.status
'SUCCESS'

>>> # 获取任务结果
>>> task.result
True

示例 2:带参数的任务

# tasks.py
@shared_task
def send_email(user_id, subject, message):
    from users.models import User
    user = User.objects.get(id=user_id)
    # 发送邮件逻辑
    return f"邮件已发送给 {user.email}"

# views.py
send_email.delay(
    user_id=request.user.id,
    subject="欢迎注册",
    message="感谢您的注册!"
)

示例 3:定时任务

# celery.py
app.conf.beat_schedule = {
    # 每天凌晨3点清理已删除视频
    'cleanup-deleted-videos': {
        'task': 'videos.tasks.cleanup_deleted_videos',
        'schedule': crontab(hour=3, minute=0),
    },
  
    # 每小时统计一次数据
    'hourly-stats': {
        'task': 'videos.tasks.calculate_stats',
        'schedule': crontab(minute=0),  # 每小时的第0分钟
    },
  
    # 每5分钟检查一次
    'check-processing-videos': {
        'task': 'videos.tasks.check_processing_status',
        'schedule': 300.0,  # 300秒 = 5分钟
    },
}

示例 4:任务链(Task Chain)

from celery import chain

# 按顺序执行多个任务
result = chain(
    process_video.s(video_id),           # 1. 处理视频
    generate_thumbnail.s(),              # 2. 生成缩略图
    send_notification.s(user_id)         # 3. 发送通知
).apply_async()

示例 5:任务组(Task Group)

from celery import group

# 并行执行多个任务
job = group([
    process_video.s(1),
    process_video.s(2),
    process_video.s(3),
])
result = job.apply_async()

常用命令

Celery 命令

# 启动 Worker(处理异步任务)
celery -A video worker -l info

# 启动 Worker(Windows)
celery -A video worker -l info --pool=solo

# 启动 Beat(处理定时任务)
celery -A video beat -l info

# 同时启动 Worker 和 Beat
celery -A video worker -B -l info

# 查看注册的任务
celery -A video inspect registered

# 查看活跃的任务
celery -A video inspect active

# 查看统计信息
celery -A video inspect stats

# 撤销任务
celery -A video control revoke <task_id>

# 清空队列
celery -A video purge

Redis 命令

# 连接 Redis
redis-cli

# 查看所有键
keys *

# 查看 Celery 相关的键
keys celery*
keys _kombu*

# 查看队列长度
LLEN celery

# 查看队列中的任务(不删除)
LRANGE celery 0 -1

# 查看任务结果
GET celery-task-meta-<task_id>

# 删除任务结果
DEL celery-task-meta-<task_id>

# 清空所有 Celery 数据
FLUSHDB

# 监控 Redis 实时命令
MONITOR

Django 命令

# 测试 Celery 任务
python manage.py shell
>>> from videos.tasks import process_video
>>> process_video.delay(12)

# 查看数据库中的视频
python manage.py shell
>>> from videos.models import Video
>>> Video.objects.get(id=12).duration
47.018333

常见问题

Q1: 任务发送了但没有执行?

检查清单:

  1. Celery Worker 是否启动?

    celery -A video worker -l info
    
  2. Redis 是否运行?

    redis-cli ping
    # 应该返回 PONG
    
  3. 查看队列中是否有任务?

    redis-cli LLEN celery
    

Q2: 如何查看任务执行日志?

# 启动 Worker 时设置日志级别
celery -A video worker -l debug

# 或在 Django settings.py 中配置日志
LOGGING = {
    'loggers': {
        'celery': {
            'handlers': ['console'],
            'level': 'INFO',
        },
    },
}

Q3: 任务执行失败如何重试?

@shared_task(bind=True, max_retries=3)
def process_video(self, video_id):
    try:
        # 处理视频
        pass
    except Exception as exc:
        # 5分钟后重试
        raise self.retry(exc=exc, countdown=300)

Q4: 如何限制任务并发数?

# settings.py
CELERY_WORKER_CONCURRENCY = 4  # 同时执行4个任务

# 或启动时指定
celery -A video worker -l info --concurrency=4

总结

核心流程

1. Django 视图调用 task.delay()
   ↓
2. 任务序列化为 JSON 并发送到 Redis
   ↓
3. Celery Worker 从 Redis 取出任务
   ↓
4. 执行任务函数
   ↓
5. 将结果保存到 Redis(可选)

关键点

  • 异步执行:用户不需要等待耗时操作
  • 解耦:Web 应用和任务处理分离
  • 可靠性:任务存储在 Redis,Worker 崩溃后可重新执行
  • 可扩展:可以启动多个 Worker 并行处理
  • 定时任务:支持 cron 表达式的定时任务

最佳实践

  1. 任务要幂等:同一个任务执行多次结果一致
  2. 任务要短小:避免单个任务执行时间过长
  3. 合理设置超时:防止任务卡死
  4. 监控任务状态:及时发现失败的任务
  5. 日志记录:方便排查问题

作者: Kiro AI
日期: 2026-01-06
版本: 1.0

0

评论区