Python 生产者消费者模型
1. 简介
生产者和消费者问题是线程模型中的经典问题:
Python 的内置模块 queue 提供了对生产者和消费者模型的支持,模块 queue 定义了类 Queue,类 Queue 表示一个被生产者和消费者共享的队列,类 Queue 提供如下常用方法:
方法 | 功能 |
---|---|
get() | 从队列中取走数据,如果队列为空,则阻塞 |
put(item) | 向队列中放置数据,如果队列为慢,则阻塞 |
join() | 如果队列不为空,则等待队列变为空 |
task_done() | 消费者从队列中取走一项数据,当队列变为空时,唤醒调用 join() 的线程 |
2. 实现生产者消费者模型
创建生产者线程和消费者线程,使用一个共享队列连接这两个线程,代码如下:
import threadingimport queue q = queue.Queue()
导入 threading 模块和 queue 模块
创建共享队列 q
def produce():for item in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']:q.put(item)print('produce %s' % item)
def consume():for i in range():item = q.get()print(' consume %s' % item)
producer = threading.Thread(target=produce, args=())consumer = threading.Thread(target=consume, args=())producer.start()consumer.start()producer.join()consumer.join()
创建生产者线程 producer,线程入口为 produce
创建消费者线程 consumer,线程入口为 consume
启动生产者线程和消费者线程,并等待它们结束
运行程序,输出结果如下:
produce a produce b consume a produce c consume b consume c produce d consume d produce e consume e produce f consume f produce g consume g produce h consume h
生产者生产了 8 个数据:a、b、c、d、e、f、g、h
消费者取走了 8 个数据:a、b、c、d、e、f、g、h
3. 实现生产者、计算者、消费者模型
创建生产者、计算者、消费者线程:
import threadingimport queue q0 = queue.Queue()q1 = queue.Queue()
导入模块 threading 和模块 queue
使用两个共享队列连接这三个线程
共享队列 q0 连接生产者和计算者
共享队列 q1 连接计算者和消费者
def produce():for item in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']:q0.put(item)print('produce %s' % item)
def compute():for i in range():item = q0.get()item = item.upper() q1.put(item)
def consume():for i in range():item = q1.get()print(' consume %s' % item)
producer = threading.Thread(target=produce, args=())computer = threading.Thread(target=compute, args=())consumer = threading.Thread(target=consume, args=())producer.start()computer.start()consumer.start()producer.join()computer.join()consumer.join()
创建生产者线程 producer,线程入口为 produce
创建消费者线程 consumer,线程入口为 consume
启动生产者线程、计算者线程、消费者线程,并等待它们结束
运行程序,输出结果如下:
produce a produce b produce c consume A produce d produce e consume B produce f consume C produce g consume D produce h consume E consume F consume G consume H
生产者生产了 8 个数据:a、b、c、d、e、f、g、h
计算者将数据加工为:A、B、C、D、E、F、G、H
消费者取走了 8 个数据:A、B、C、D、E、F、G、H
4. 同步生产者与消费者的推进速度
在生产者、消费者模型中,可能会存在两者推进速度不匹配的问题:生产者生产数据的速度较快,但是,消费者取走数据的速度较慢。
可以使用 queue 的 task_done() 方法和 join() 方法同步生产者与消费者的推进速度:
import threadingimport queue q = queue.Queue()
导入 threading 模块和 queue 模块
创建共享队列 q
def produce():for item in ['A', 'B', 'C', 'D']:q.put(item)print('produce %s' % item)q.join()print('------------ q is empty')for item in ['E', 'F', 'G', 'H']:q.put(item) print('produce %s' % item)q.join() print('------------ q is empty')
创建生产者线程的入口函数 produce
首先,生产 4 个数据:A、B、C、D
然后,生产 4 个数据:E、F、G、G
def consume():for i in range():item = q.get()print(' consume %s' % item)q.task_done()
producer = threading.Thread(target=produce, args=())consumer = threading.Thread(target=consume, args=())producer.start()consumer.start()
创建生产者线程 producer,线程入口为 produce
创建消费者线程 consumer,线程入口为 consume
启动生产者线程和消费者线程,并等待它们结束
运行程序,输出结果如下:
produce A produce B consume A consume B produce C consume C produce D consume D ------------ q is empty produce E consume E produce F consume F produce G produce H consume G consume H ------------ q is empty
生产者生产第一批数据 A、B、C、D,消费者将其取走
当第一批数据完全被消费者取走后,生产者才开始生产第二批数据
生产者生产第二批数据 E、F、G、H,消费者将其取走