目 录CONTENT

文章目录

Redis应用01_分布式任务锁机制

~梓
2026-02-25 / 0 评论 / 0 点赞 / 1 阅读 / 0 字
温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。
# Redis 分布式任务锁机制 ## 业务场景 视频平台中,用户上传视频后需要进行转码处理。转码是一个耗时的异步任务,由 Celery Worker 执行。 \*\*问题\*\*:如果有多个 Worker 同时处理同一个视频,会导致: - 重复转码,浪费服务器资源 - 生成多份相同的文件,占用磁盘空间 - 可能产生文件冲突,导致转码失败 \*\*解决方案\*\*:使用 Redis 实现分布式锁,确保同一时刻只有一个 Worker 能处理某个视频。 ## 核心原理 ### Redis SETNX 命令 SETNX = SET if Not eXists(如果不存在则设置) \`\`\`redis SETNX key value \`\`\` - 如果 key 不存在,设置成功,返回 1 - 如果 key 已存在,设置失败,返回 0 这是一个\*\*原子操作\*\*,多个客户端同时执行时,只有一个能成功。 ### Django Cache 封装 Django 的 \`cache.add()\` 方法底层使用 Redis 的 SETNX: \`\`\`python from django.core.cache import cache # 尝试获取锁 success = cache.add('lock_key', 'locked', timeout=3600) # success = True 表示获取锁成功(key 原本不存在) # success = False 表示锁已被占用(key 已经存在) \`\`\` \*\*\`cache.add\` vs \`cache.set\`\*\*: - \`cache.set(key, value)\`:\*\*覆盖写\*\*。无论 key 是否存在,都写入新值。 - \`cache.add(key, value)\`:\*\*仅在不存在时写入\*\*。这是实现"锁"互斥性的关键。 ## 代码实现 ### 1. 锁的 Key 设计 \`\`\`python def get_video_lock_key(video_id): """生成视频处理锁的 key""" return f"video_processing_lock:{video_id}" \`\`\` \*\*设计要点\*\*: - 使用前缀 \`video_processing_lock:\` 便于识别和管理 - 使用 \`video_id\` 作为唯一标识,不同视频的锁互不影响 ### 2. 获取锁 \`\`\`python def acquire_video_lock(video_id, timeout=3600): """ 获取视频处理锁(使用 Redis) timeout: 锁的超时时间(秒),默认1小时 返回: True 如果获取成功,False 如果已被锁定 """ lock_key = get_video_lock_key(video_id) # 使用 Redis 的 SETNX 原子操作 # add() 只在 key 不存在时设置,返回 True;如果 key 已存在,返回 False return cache.add(lock_key, "locked", timeout) \`\`\` \*\*关键点\*\*: - \`timeout\` 参数设置锁的过期时间,防止死锁 - 如果 Worker 崩溃,锁会在超时后自动释放 - 返回布尔值,方便判断是否获取成功 ### 3. 释放锁 \`\`\`python def release_video_lock(video_id): """释放视频处理锁""" lock_key = get_video_lock_key(video_id) cache.delete(lock_key) \`\`\` \*\*注意\*\*: - 处理完成后必须释放锁,否则其他 Worker 无法处理 - 使用 \`try...finally\` 确保锁一定会被释放 ### 4. 检查锁状态 \`\`\`python def is_video_locked(video_id): """检查视频是否正在被处理""" lock_key = get_video_lock_key(video_id) return cache.get(lock_key) is not None \`\`\` \*\*用途\*\*: - 在触发任务前检查是否已有任务在处理 - 避免重复提交任务到队列 ## 在 Celery 任务中使用 ### 完整流程 \`\`\`python @shared_task(bind=True, max_retries=3, default_retry_delay=60) def process_video(self, video_id): """处理视频文件,生成HLS格式、缩略图等""" # 第一层防护:Redis 分布式锁 if not acquire_video_lock(video_id, timeout=7200): # 2小时超时 logger.warning(f"视频 {video_id} 正在被其他任务处理(Redis锁),跳过") return {"status": "skipped", "reason": "already_processing"} try: # 第二层防护:数据库状态检查 + 原子更新 updated = Video.objects.filter( id=video_id, status__in=\['uploading', 'processing', 'transcoding'\] ).exclude( hls_file__isnull=False # 排除已处理完成的 ).update(status='processing') if updated == 0: logger.warning(f"视频 {video_id} 不符合处理条件,跳过") return {"status": "skipped", "reason": "invalid_state"} # 执行转码处理... video = Video.objects.get(id=video_id) # ... 转码逻辑 ... logger.info(f"视频 {video_id} 处理完成") return {"status": "success", "video_id": video_id} except Exception as e: logger.error(f"处理视频 {video_id} 失败: {str(e)}") # 如果还有重试次数,抛出异常让 Celery 重试 if self.request.retries \< self.max_retries: raise self.retry(exc=e) return {"status": "error", "reason": str(e)} finally: # 无论成功失败,都释放锁 release_video_lock(video_id) logger.info(f"释放视频 {video_id} 的处理锁") \`\`\` ### 防护机制分析 \*\*为什么需要两层防护?\*\* 1. \*\*Redis 锁(第一层)\*\*: - 防止多个 Worker 同时处理 - 跨进程、跨服务器生效 - 响应速度快 2. \*\*数据库状态检查(第二层)\*\*: - 防止重复提交任务 - 确保业务状态正确 - 持久化保证 \*\*双重保险的好处\*\*: - Redis 锁可能因网络问题失效 - 数据库状态是最终的真实状态 - 两层结合,确保万无一失 ## 实际执行流程 ### 场景 1:正常处理 \`\`\` Worker A: 尝试获取锁 video_id=123 Redis: key 不存在,设置成功 ✅ Worker A: 开始处理视频 123 Worker A: 转码中... Worker A: 处理完成,释放锁 Redis: 删除 key ✅ \`\`\` ### 场景 2:并发冲突 \`\`\` Worker A: 尝试获取锁 video_id=123 Redis: key 不存在,设置成功 ✅ Worker A: 开始处理视频 123 Worker B: 尝试获取锁 video_id=123 Redis: key 已存在,设置失败 ❌ Worker B: 跳过处理,返回 "already_processing" Worker A: 处理完成,释放锁 Redis: 删除 key ✅ \`\`\` ### 场景 3:Worker 崩溃 \`\`\` Worker A: 尝试获取锁 video_id=123 Redis: key 不存在,设置成功 ✅ Worker A: 开始处理视频 123 Worker A: 崩溃 💥(未释放锁) ... 2小时后 ... Redis: key 自动过期 ⏰ Worker B: 尝试获取锁 video_id=123 Redis: key 不存在,设置成功 ✅ Worker B: 开始处理视频 123(重新处理) \`\`\` ## 配置说明 ### Redis 配置 \`\`\`python # settings.py CACHES = { 'default': { 'BACKEND': 'django.core.cache.backends.redis.RedisCache', 'LOCATION': 'redis://localhost:6379/1', # 使用 DB 1 'KEY_PREFIX': 'video_web', 'TIMEOUT': 3600, } } \`\`\` ### 锁超时时间选择 \`\`\`python # 视频处理锁:2小时 acquire_video_lock(video_id, timeout=7200) \`\`\` \*\*考虑因素\*\*: - 视频转码通常需要几分钟到几十分钟 - 设置 2 小时是为了应对超大视频 - 如果 Worker 崩溃,最多等待 2 小时后锁自动释放 ## 优势与局限 ### 优势 1. \*\*高性能\*\*:Redis 内存操作,响应速度快 2. \*\*分布式\*\*:支持多服务器部署 3. \*\*自动过期\*\*:防止死锁 4. \*\*原子操作\*\*:SETNX 保证并发安全 ### 局限 1. \*\*依赖 Redis\*\*:Redis 故障会影响锁机制 2. \*\*时钟依赖\*\*:超时依赖服务器时钟同步 3. \*\*不可重入\*\*:同一个 Worker 不能重复获取锁 ### 改进方案 如果需要更强的锁机制,可以使用 Redlock 算法: - 使用多个 Redis 实例 - 需要在大多数实例上获取锁才算成功 - 提高可靠性,但增加复杂度 ## 调试技巧 ### 查看 Redis 中的锁 \`\`\`bash # 连接 Redis redis-cli -n 1 # 查看所有锁 KEYS video_processing_lock:\* # 查看特定视频的锁 GET video_processing_lock:123 # 查看锁的剩余时间(秒) TTL video_processing_lock:123 # 手动删除锁(调试用) DEL video_processing_lock:123 \`\`\` ### 日志分析 \`\`\`python logger.info(f"\[Task {task_id}\] 收到视频处理任务: video_id={video_id}") logger.warning(f"\[Task {task_id}\] 视频 {video_id} 正在被其他任务处理(Redis锁),跳过") logger.info(f"\[Task {task_id}\] 释放视频 {video_id} 的处理锁") \`\`\` 通过日志可以追踪: - 任务何时开始 - 是否因为锁而跳过 - 锁何时释放 ## 最佳实践与注意事项 ### 1. 超时时间 (TTL) 的选择 - \*\*原则\*\*:\`timeout\` 必须大于任务预估的最长执行时间。 - \*\*风险\*\*:如果 \`timeout\` 过短,任务还没执行完锁就释放了,会导致第二个 Worker 介入,产生并发冲突。 - \*\*建议\*\*:对于视频转码,建议设置为预估时间的 2 倍以上。 ### 2. 释放锁的安全性 释放锁时使用 \`cache.delete(lock_key)\` 是最简单的方式。但在极其严苛的场景下,需要考虑"只释放自己加的锁",防止因任务超时导致误删他人的锁。 ### 3. 可重入性 目前的实现是\*\*不可重入\*\*的。即同一个进程在没释放锁的情况下,再次调用 \`acquire_video_lock\` 会失败。在简单的 Celery 任务中这通常是安全的,但在复杂递归逻辑中需要注意。 ## 总结 Redis 分布式锁通过 \`SETNX\` 原子操作,实现了跨进程、跨服务器的互斥访问控制。在视频转码场景中,配合数据库状态检查,形成双重防护机制。 \*\*核心三要素\*\*: 1. \*\*互斥性\*\*:利用 \`cache.add\` 确保只有一个抢到锁。 2. \*\*安全性\*\*:利用 \`timeout\` 防止 Worker 崩溃导致的死锁。 3. \*\*释放性\*\*:利用 \`try...finally\` 确保任务结束后资源被回收。
0

评论区