一、Queue 模块简介
queue
模块提供线程安全的队列实现,适用于多线程编程中的数据交换。支持先进先出(FIFO)、后进先出(LIFO)和优先级队列。
队列类型:
- Queue:先进先出队列(FIFO)
- LifoQueue:后进先出队列(LIFO)
- PriorityQueue:按优先级排序的队列
- SimpleQueue:更简单的 FIFO 队列(Python 3.7+)
二、核心方法
put(item)
:添加元素到队列(阻塞)get()
:获取并移除元素(阻塞)task_done()
:标记任务完成join()
:阻塞直到所有任务完成empty()
:判断队列是否为空full()
:判断队列是否满qsize()
:返回队列大小
三、实际案例
案例 1:生产者-消费者模型(基础)
import threading
import queue
import time
# 创建队列,最多容纳5个元素
q = queue.Queue(maxsize=5)
def producer():
for i in range(10):
item = f"任务-{i}"
q.put(item) # 阻塞直到有空位
print(f"[生产者] 发布 {item},队列大小: {q.qsize()}")
time.sleep(0.1)
def consumer():
while True:
item = q.get() # 阻塞直到有数据
if item is None: # 终止信号
break
print(f"[消费者] 处理 {item},剩余任务: {q.qsize()}")
q.task_done()
# 启动消费者线程
consumer_thread = threading.Thread(target=consumer)
consumer_thread.start()
# 启动生产者线程
producer_thread = threading.Thread(target=producer)
producer_thread.start()
# 等待生产者完成
producer_thread.join()
# 发送终止信号
q.put(None)
consumer_thread.join()
print("所有任务完成")
案例 2:优先级队列
from queue import PriorityQueue
pq = PriorityQueue()
# 添加任务(优先级数字越小越优先)
pq.put((3, "低优先级任务"))
pq.put((1, "高优先级任务"))
pq.put((2, "中优先级任务"))
while not pq.empty():
priority, task = pq.get()
print(f"处理任务: {task} (优先级: {priority})")
# 输出:
# 处理任务: 高优先级任务 (优先级: 1)
# 处理任务: 中优先级任务 (优先级: 2)
# 处理任务: 低优先级任务 (优先级: 3)
案例 3:限制线程池并发数
import queue
import threading
import time
class ThreadPool:
def __init__(self, max_threads):
self.task_queue = queue.Queue()
for _ in range(max_threads):
worker = threading.Thread(target=self._worker)
worker.daemon = True # 后台线程
worker.start()
def _worker(self):
while True:
func, args = self.task_queue.get()
try:
func(*args)
finally:
self.task_queue.task_done()
def submit(self, func, args=()):
self.task_queue.put((func, args))
# 使用线程池
def demo_task(num):
print(f"任务 {num} 开始")
time.sleep(1)
print(f"任务 {num} 完成")
pool = ThreadPool(max_threads=2)
for i in range(5):
pool.submit(demo_task, (i,))
pool.task_queue.join() # 等待所有任务完成
print("所有线程任务完成")
案例 4:LifoQueue 实现逆序处理
from queue import LifoQueue
lq = LifoQueue()
lq.put("第1个任务")
lq.put("第2个任务")
lq.put("第3个任务")
while not lq.empty():
print(lq.get())
# 输出:
# 第3个任务
# 第2个任务
# 第1个任务
四、高级技巧
-
超时控制:
try: item = q.get(timeout=2) # 最多等待2秒 except queue.Empty: print("队列为空")
-
毒丸终止法:发送特殊对象(如
None
)通知消费者停止 -
动态调整优先级:
# 重新插入修改优先级的任务 pq.put((new_priority, task))
-
队列大小监控:
while True: print(f"当前队列大小: {q.qsize()}") time.sleep(1)
五、注意事项
qsize()
在 Unix 系统上可能不准确- 多进程环境应使用
multiprocessing.Queue
- 使用
SimpleQueue
时没有task_done()
和join()
方法
六、扩展应用场景
1. 结合协程与 asyncio 使用队列
import asyncio
import random
async def async_producer(queue):
for i in range(5):
item = f"Async任务-{i}"
await queue.put(item)
print(f"[协程生产者] 发布 {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
async def async_consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f"[协程消费者] 处理 {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=3)
producer_task = asyncio.create_task(async_producer(queue))
consumer_task = asyncio.create_task(async_consumer(queue))
await producer_task
await queue.put(None) # 发送终止信号
await consumer_task
await queue.join()
asyncio.run(main())
2. 多进程间通信(跨进程队列)
from multiprocessing import Process, Queue
import time
def process_producer(q):
for i in range(3):
q.put(f"进程任务-{i}")
time.sleep(0.5)
def process_consumer(q):
while True:
item = q.get()
if item == "END":
break
print(f"收到进程消息: {item}")
if __name__ == "__main__":
mp_queue = Queue()
p1 = Process(target=process_producer, args=(mp_queue,))
p2 = Process(target=process_consumer, args=(mp_queue,))
p1.start()
p2.start()
p1.join()
mp_queue.put("END") # 终止信号
p2.join()
七、高级设计模式
1. 动态优先级调整(电商订单处理)
from queue import PriorityQueue
import threading
class OrderSystem:
def __init__(self):
self.orders = PriorityQueue()
self.lock = threading.Lock()
def add_order(self, order_id, priority):
with self.lock:
self.orders.put((priority, order_id))
def upgrade_order(self, order_id, new_priority):
# 实际应用中需要更高效的数据结构
with self.lock:
temp = []
while not self.orders.empty():
p, oid = self.orders.get()
if oid == order_id:
temp.append((new_priority, oid))
else:
temp.append((p, oid))
for item in temp:
self.orders.put(item)
def process_orders(self):
while True:
priority, order_id = self.orders.get()
print(f"处理订单 {order_id} (优先级: {priority})")
self.orders.task_done()
# 使用示例
system = OrderSystem()
system.add_order("A1001", 3)
system.add_order("A1002", 1)
system.upgrade_order("A1001", 0) # 紧急升级订单
worker = threading.Thread(target=system.process_orders, daemon=True)
worker.start()
system.orders.join()
2. 多队列路由系统(日志分级处理)
from queue import Queue
import logging
class LogDispatcher:
def __init__(self):
self.queues = {
'DEBUG': Queue(),
'INFO': Queue(),
'ERROR': Queue()
}
def dispatch(self, level, message):
self.queues[level].put(message)
def start_workers(self):
for level in self.queues:
t = threading.Thread(target=self._process_log, args=(level,))
t.daemon = True
t.start()
def _process_log(self, level):
while True:
msg = self.queues[level].get()
# 模拟不同级别的处理逻辑
if level == 'ERROR':
print(f"!! 紧急处理: {msg}")
else:
print(f"{level}: {msg}")
self.queues[level].task_done()
# 使用示例
dispatcher = LogDispatcher()
dispatcher.start_workers()
dispatcher.dispatch('INFO', '系统启动')
dispatcher.dispatch('ERROR', '数据库连接失败')
dispatcher.dispatch('DEBUG', '变量x=42')
八、性能优化技巧
1. 批量处理优化吞吐量
class BatchQueue(queue.Queue):
def put_batch(self, items):
with self.mutex:
for item in items:
self._put(item)
self.not_empty.notify_all()
def get_batch(self, max_size):
with self.mutex:
items = []
while len(items) < max_size and not self.empty():
items.append(self._get())
return items
# 使用示例
bq = BatchQueue()
bq.put_batch([f"任务-{i}" for i in range(100)])
def batch_worker():
while True:
batch = bq.get_batch(10) # 每次取10个
if not batch:
break
print(f"处理批量任务: {len(batch)}条")
bq.task_done() # 注意:需要根据实际处理次数调用
threading.Thread(target=batch_worker).start()
2. 队列监控与动态扩容
class SmartQueue(queue.Queue):
def __init__(self, maxsize=0, scale_factor=2):
super().__init__(maxsize)
self.scale_factor = scale_factor
def dynamic_put(self, item):
try:
self.put(item, block=False)
except queue.Full:
new_size = self.maxsize * self.scale_factor
print(f"队列扩容: {self.maxsize} → {new_size}")
self.maxsize = new_size
self.put(item)
# 使用示例
sq = SmartQueue(maxsize=2)
for i in range(5):
sq.dynamic_put(i)
九、调试与故障排除
1. 死锁检测工具
import traceback
import sys
class DebugQueue(queue.Queue):
def __init__(self, maxsize=0):
super().__init__(maxsize)
self.active_tasks = 0
def put(self, item, block=True, timeout=None):
print(f"PUT 调用栈:")
traceback.print_stack(limit=3, file=sys.stdout)
super().put(item, block, timeout)
def get(self, block=True, timeout=None):
print(f"GET 调用栈:")
traceback.print_stack(limit=3, file=sys.stdout)
return super().get(block, timeout)
2. 队列性能分析装饰器
import time
from functools import wraps
def queue_perf_monitor(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__} 耗时: {elapsed:.4f}秒")
return result
return wrapper
# 装饰关键队列操作
PriorityQueue.put = queue_perf_monitor(PriorityQueue.put)
PriorityQueue.get = queue_perf_monitor(PriorityQueue.get)
十、与其他技术栈集成
1. 与 Redis 实现分布式队列
import redis
from queue import Queue
class RedisHybridQueue:
def __init__(self, name, maxsize=0):
self.local = Queue(maxsize)
self.redis = redis.Redis()
self.queue_name = name
def put(self, item):
try:
self.local.put_nowait(item)
except queue.Full:
self.redis.lpush(self.queue_name, item)
def get(self):
try:
return self.local.get_nowait()
except queue.Empty:
item = self.redis.rpop(self.queue_name)
return item.decode() if item else None
# 使用示例
hybrid_q = RedisHybridQueue('global_task')
hybrid_q.put("本地任务")
hybrid_q.put("远程任务")
print(hybrid_q.get()) # 本地任务
print(hybrid_q.get()) # 远程任务
2. 与 SQLAlchemy 集成(持久化队列)
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
engine = create_engine('sqlite:///:memory:')
class PersistentQueue(Base):
__tablename__ = 'queue'
id = Column(Integer, primary_key=True)
priority = Column(Integer)
data = Column(String)
Base.metadata.create_all(engine)
class DBAwareQueue:
def __init__(self):
self.Session = sessionmaker(bind=engine)
def put(self, data, priority=0):
session = self.Session()
session.add(PersistentQueue(priority=priority, data=data))
session.commit()
def get(self):
session = self.Session()
item = session.query(PersistentQueue).order_by(
PersistentQueue.priority
).first()
if item:
session.delete(item)
session.commit()
return item.data
return None
十一、最佳实践总结
-
容量规划:
- 根据内存限制设置合理
maxsize
- 使用监控工具观察队列水位线
- 采用动态扩容策略应对突发流量
- 根据内存限制设置合理
-
异常处理:
try: item = q.get(timeout=5) except queue.Empty: logger.warning("队列获取超时") except Exception as e: logger.error(f"队列错误: {str(e)}")
-
资源清理:
def graceful_shutdown(q): q.put(None) # 发送终止信号 q.join() # 等待剩余任务完成 print("队列已安全关闭")
-
性能权衡:
- 简单队列 vs 线程安全队列
- 内存队列 vs 持久化队列
- 同步操作 vs 异步操作
评论区