目 录CONTENT

文章目录

Python-Queue库教程

~梓
2025-02-23 / 0 评论 / 0 点赞 / 17 阅读 / 0 字
温馨提示:
本文最后更新于2025-02-23,若内容或图片失效,请留言反馈。 部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

一、Queue 模块简介

queue 模块提供线程安全的队列实现,适用于多线程编程中的数据交换。支持先进先出(FIFO)、后进先出(LIFO)和优先级队列。

队列类型:

  1. Queue:先进先出队列(FIFO)
  2. LifoQueue:后进先出队列(LIFO)
  3. PriorityQueue:按优先级排序的队列
  4. SimpleQueue:更简单的 FIFO 队列(Python 3.7+)

二、核心方法

  • put(item):添加元素到队列(阻塞)
  • get():获取并移除元素(阻塞)
  • task_done():标记任务完成
  • join():阻塞直到所有任务完成
  • empty():判断队列是否为空
  • full():判断队列是否满
  • qsize():返回队列大小

三、实际案例

案例 1:生产者-消费者模型(基础)

import threading
import queue
import time

# 创建队列,最多容纳5个元素
q = queue.Queue(maxsize=5)

def producer():
    for i in range(10):
        item = f"任务-{i}"
        q.put(item)  # 阻塞直到有空位
        print(f"[生产者] 发布 {item},队列大小: {q.qsize()}")
        time.sleep(0.1)

def consumer():
    while True:
        item = q.get()  # 阻塞直到有数据
        if item is None:  # 终止信号
            break
        print(f"[消费者] 处理 {item},剩余任务: {q.qsize()}")
        q.task_done()

# 启动消费者线程
consumer_thread = threading.Thread(target=consumer)
consumer_thread.start()

# 启动生产者线程
producer_thread = threading.Thread(target=producer)
producer_thread.start()

# 等待生产者完成
producer_thread.join()

# 发送终止信号
q.put(None)
consumer_thread.join()
print("所有任务完成")

案例 2:优先级队列

from queue import PriorityQueue

pq = PriorityQueue()

# 添加任务(优先级数字越小越优先)
pq.put((3, "低优先级任务"))
pq.put((1, "高优先级任务"))
pq.put((2, "中优先级任务"))

while not pq.empty():
    priority, task = pq.get()
    print(f"处理任务: {task} (优先级: {priority})")

# 输出:
# 处理任务: 高优先级任务 (优先级: 1)
# 处理任务: 中优先级任务 (优先级: 2)
# 处理任务: 低优先级任务 (优先级: 3)

案例 3:限制线程池并发数

import queue
import threading
import time

class ThreadPool:
    def __init__(self, max_threads):
        self.task_queue = queue.Queue()
        for _ in range(max_threads):
            worker = threading.Thread(target=self._worker)
            worker.daemon = True  # 后台线程
            worker.start()

    def _worker(self):
        while True:
            func, args = self.task_queue.get()
            try:
                func(*args)
            finally:
                self.task_queue.task_done()

    def submit(self, func, args=()):
        self.task_queue.put((func, args))

# 使用线程池
def demo_task(num):
    print(f"任务 {num} 开始")
    time.sleep(1)
    print(f"任务 {num} 完成")

pool = ThreadPool(max_threads=2)
for i in range(5):
    pool.submit(demo_task, (i,))

pool.task_queue.join()  # 等待所有任务完成
print("所有线程任务完成")

案例 4:LifoQueue 实现逆序处理

from queue import LifoQueue

lq = LifoQueue()
lq.put("第1个任务")
lq.put("第2个任务")
lq.put("第3个任务")

while not lq.empty():
    print(lq.get())

# 输出:
# 第3个任务
# 第2个任务
# 第1个任务

四、高级技巧

  1. 超时控制

    try:
        item = q.get(timeout=2)  # 最多等待2秒
    except queue.Empty:
        print("队列为空")
    
  2. 毒丸终止法:发送特殊对象(如 None)通知消费者停止

  3. 动态调整优先级

    # 重新插入修改优先级的任务
    pq.put((new_priority, task))
    
  4. 队列大小监控

    while True:
        print(f"当前队列大小: {q.qsize()}")
        time.sleep(1)
    

五、注意事项

  1. qsize() 在 Unix 系统上可能不准确
  2. 多进程环境应使用 multiprocessing.Queue
  3. 使用 SimpleQueue 时没有 task_done()join() 方法

六、扩展应用场景

1. 结合协程与 asyncio 使用队列

import asyncio
import random

async def async_producer(queue):
    for i in range(5):
        item = f"Async任务-{i}"
        await queue.put(item)
        print(f"[协程生产者] 发布 {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

async def async_consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"[协程消费者] 处理 {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=3)
    producer_task = asyncio.create_task(async_producer(queue))
    consumer_task = asyncio.create_task(async_consumer(queue))
  
    await producer_task
    await queue.put(None)  # 发送终止信号
    await consumer_task
    await queue.join()

asyncio.run(main())

2. 多进程间通信(跨进程队列)

from multiprocessing import Process, Queue
import time

def process_producer(q):
    for i in range(3):
        q.put(f"进程任务-{i}")
        time.sleep(0.5)

def process_consumer(q):
    while True:
        item = q.get()
        if item == "END":
            break
        print(f"收到进程消息: {item}")

if __name__ == "__main__":
    mp_queue = Queue()
  
    p1 = Process(target=process_producer, args=(mp_queue,))
    p2 = Process(target=process_consumer, args=(mp_queue,))
  
    p1.start()
    p2.start()
  
    p1.join()
    mp_queue.put("END")  # 终止信号
    p2.join()

七、高级设计模式

1. 动态优先级调整(电商订单处理)

from queue import PriorityQueue
import threading

class OrderSystem:
    def __init__(self):
        self.orders = PriorityQueue()
        self.lock = threading.Lock()
  
    def add_order(self, order_id, priority):
        with self.lock:
            self.orders.put((priority, order_id))
  
    def upgrade_order(self, order_id, new_priority):
        # 实际应用中需要更高效的数据结构
        with self.lock:
            temp = []
            while not self.orders.empty():
                p, oid = self.orders.get()
                if oid == order_id:
                    temp.append((new_priority, oid))
                else:
                    temp.append((p, oid))
            for item in temp:
                self.orders.put(item)

    def process_orders(self):
        while True:
            priority, order_id = self.orders.get()
            print(f"处理订单 {order_id} (优先级: {priority})")
            self.orders.task_done()

# 使用示例
system = OrderSystem()
system.add_order("A1001", 3)
system.add_order("A1002", 1)
system.upgrade_order("A1001", 0)  # 紧急升级订单

worker = threading.Thread(target=system.process_orders, daemon=True)
worker.start()
system.orders.join()

2. 多队列路由系统(日志分级处理)

from queue import Queue
import logging

class LogDispatcher:
    def __init__(self):
        self.queues = {
            'DEBUG': Queue(),
            'INFO': Queue(),
            'ERROR': Queue()
        }
  
    def dispatch(self, level, message):
        self.queues[level].put(message)
  
    def start_workers(self):
        for level in self.queues:
            t = threading.Thread(target=self._process_log, args=(level,))
            t.daemon = True
            t.start()
  
    def _process_log(self, level):
        while True:
            msg = self.queues[level].get()
            # 模拟不同级别的处理逻辑
            if level == 'ERROR':
                print(f"!! 紧急处理: {msg}")
            else:
                print(f"{level}: {msg}")
            self.queues[level].task_done()

# 使用示例
dispatcher = LogDispatcher()
dispatcher.start_workers()

dispatcher.dispatch('INFO', '系统启动')
dispatcher.dispatch('ERROR', '数据库连接失败')
dispatcher.dispatch('DEBUG', '变量x=42')

八、性能优化技巧

1. 批量处理优化吞吐量

class BatchQueue(queue.Queue):
    def put_batch(self, items):
        with self.mutex:
            for item in items:
                self._put(item)
            self.not_empty.notify_all()
  
    def get_batch(self, max_size):
        with self.mutex:
            items = []
            while len(items) < max_size and not self.empty():
                items.append(self._get())
            return items

# 使用示例
bq = BatchQueue()
bq.put_batch([f"任务-{i}" for i in range(100)])

def batch_worker():
    while True:
        batch = bq.get_batch(10)  # 每次取10个
        if not batch:
            break
        print(f"处理批量任务: {len(batch)}条")
        bq.task_done()  # 注意:需要根据实际处理次数调用

threading.Thread(target=batch_worker).start()

2. 队列监控与动态扩容

class SmartQueue(queue.Queue):
    def __init__(self, maxsize=0, scale_factor=2):
        super().__init__(maxsize)
        self.scale_factor = scale_factor
  
    def dynamic_put(self, item):
        try:
            self.put(item, block=False)
        except queue.Full:
            new_size = self.maxsize * self.scale_factor
            print(f"队列扩容: {self.maxsize} → {new_size}")
            self.maxsize = new_size
            self.put(item)

# 使用示例
sq = SmartQueue(maxsize=2)
for i in range(5):
    sq.dynamic_put(i)

九、调试与故障排除

1. 死锁检测工具

import traceback
import sys

class DebugQueue(queue.Queue):
    def __init__(self, maxsize=0):
        super().__init__(maxsize)
        self.active_tasks = 0
  
    def put(self, item, block=True, timeout=None):
        print(f"PUT 调用栈:")
        traceback.print_stack(limit=3, file=sys.stdout)
        super().put(item, block, timeout)
  
    def get(self, block=True, timeout=None):
        print(f"GET 调用栈:")
        traceback.print_stack(limit=3, file=sys.stdout)
        return super().get(block, timeout)

2. 队列性能分析装饰器

import time
from functools import wraps

def queue_perf_monitor(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        print(f"{func.__name__} 耗时: {elapsed:.4f}秒")
        return result
    return wrapper

# 装饰关键队列操作
PriorityQueue.put = queue_perf_monitor(PriorityQueue.put)
PriorityQueue.get = queue_perf_monitor(PriorityQueue.get)

十、与其他技术栈集成

1. 与 Redis 实现分布式队列

import redis
from queue import Queue

class RedisHybridQueue:
    def __init__(self, name, maxsize=0):
        self.local = Queue(maxsize)
        self.redis = redis.Redis()
        self.queue_name = name
  
    def put(self, item):
        try:
            self.local.put_nowait(item)
        except queue.Full:
            self.redis.lpush(self.queue_name, item)
  
    def get(self):
        try:
            return self.local.get_nowait()
        except queue.Empty:
            item = self.redis.rpop(self.queue_name)
            return item.decode() if item else None

# 使用示例
hybrid_q = RedisHybridQueue('global_task')
hybrid_q.put("本地任务")
hybrid_q.put("远程任务")
print(hybrid_q.get())  # 本地任务
print(hybrid_q.get())  # 远程任务

2. 与 SQLAlchemy 集成(持久化队列)

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()
engine = create_engine('sqlite:///:memory:')

class PersistentQueue(Base):
    __tablename__ = 'queue'
    id = Column(Integer, primary_key=True)
    priority = Column(Integer)
    data = Column(String)

Base.metadata.create_all(engine)

class DBAwareQueue:
    def __init__(self):
        self.Session = sessionmaker(bind=engine)
  
    def put(self, data, priority=0):
        session = self.Session()
        session.add(PersistentQueue(priority=priority, data=data))
        session.commit()
  
    def get(self):
        session = self.Session()
        item = session.query(PersistentQueue).order_by(
            PersistentQueue.priority
        ).first()
        if item:
            session.delete(item)
            session.commit()
            return item.data
        return None

十一、最佳实践总结

  1. 容量规划

    • 根据内存限制设置合理 maxsize
    • 使用监控工具观察队列水位线
    • 采用动态扩容策略应对突发流量
  2. 异常处理

    try:
        item = q.get(timeout=5)
    except queue.Empty:
        logger.warning("队列获取超时")
    except Exception as e:
        logger.error(f"队列错误: {str(e)}")
    
  3. 资源清理

    def graceful_shutdown(q):
        q.put(None)  # 发送终止信号
        q.join()     # 等待剩余任务完成
        print("队列已安全关闭")
    
  4. 性能权衡

    • 简单队列 vs 线程安全队列
    • 内存队列 vs 持久化队列
    • 同步操作 vs 异步操作
0

评论区