图片来源于网络

版权声明

© 著作权归作者所有

允许自由转载,但请保持署名和原文链接。 不允许商业用途、盈利行为及衍生盈利行为。

什么是Queue?

Queue是Python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者()线程之间的信息传递。

为什么使用Queue,而不是list或者dict?

list或者dict是非线程安全的,Queue是线程安全的

也即意味着:如果使用list或者dict,我们必须把它放到lock程序块中(acquire和release),以防止发生竞态条件。

使用list或者dict,需要考虑线程同步的问题,即需要额外考虑wait和notify。

生成者不能向满队列添加数据,如果使用list或者dict,需要额外的代码实现。

Queue则封装了Condition行为,wait() notify() acquire() release() 满队列问题等等,无须额外考虑。

先来了解一些概念

生产者-消费者模式(Producer-Consumer)

Producer-Consumer模式是多线程编程中最常用的设计模式。生产者负责生产数据,并将数据存入队列,消费负责消费数据,不断从队列中取数据来使用。这里面有两个条件:

必须满足线程互斥条件:任何时候最多只允许一个线程访问数据,其他线程必须等待。这称为线程互斥。

必须满足线程同步条件:线程同步是指线程之间所具有的一种制约关系,一个线程的执行依赖另一个线程的消息,当它没有得到另一个线程的消息时应等待,直到消息到达时才被唤醒。举个例子:在线程方式下,生产者和消费者各自是一个线程。生产者把数据写入队列,消费者从队列读出数据。当队列为空,消费者就阻塞等待(稍事休息);当队列满(达到最大长度),生产者就阻塞等待。

线程阻塞

线程阻塞通常是指一个线程在执行过程中暂停,以等待某个条件的触发。比如一个线程原子操作下,其他的线程都是阻塞状态的;比如input语句等待用户的输入,线程也是阻塞状态;比如当队列任务为空的时候,线程等待新的任务,这时候线程也是阻塞的。

线程安全

比如一个 ArrayList 类,在添加一个元素的时候,它可能会有两步来完成:

在 Items[Size] 的位置存放此元素;

增大 Size 的值。

在单线程运行的情况下,如果 Size = 0,添加一个元素后,此元素在位置 0,而且 Size=1;而如果是在多线程情况下,比如有两个线程,线程 A 先将元素1存放在位置 0。但是此时 CPU调度线程A暂停,线程B得到运行的机会。线程B向此 ArrayList 添加元素2,因为此时 Size 仍然等于0(注意,我们假设的是添加一个元素是要两个步骤,而线程A仅仅完成了步骤1),所以线程B也将元素存放在位置0。然后线程A和线程B都继续运行,都增加Size的值,结果Size等于2。 那好,我们来看看 ArrayList的情况,期望的元素应该有2个,而实际元素是在0位置,造成丢失元素,而且Size 等于 2。这就是“线程不安全”了。

原子操作

原子(atom)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意为"不可被中断的一个或一系列操作"。如变量的赋值,不可能一个线程在赋值,到一半切到另外一个线程工作去了,这是原子操作。但是一些数据结构的操作,比如上述ArrayList的例子,添加元素是分成两个步骤的,所以必须要加锁。加锁后的操作就可以认为是原子的了。

举个小栗子来加深理解:

原来在银行办理业务是排队的形式的,虽然也是多线程(多个窗口),但是经常出现有些柜台人多、有些柜台人少,或者有些柜台办理完了,有些柜台还排着长队,这时候就需要人工来干预,很麻烦,效率不高。

现在的银行都是叫号系统,这是一个典型的生产者-消费者模式。

银行提供四排座椅(队列),每人手上拿一个号,先来先办,后来后办(先进先出 First in First out);

由叫号机(生产者)来打印一个号,来一个顾客打印一个号,完了塞到队尾;当然可以设置一个队列最大数,比如100人,超过100人在队列里,叫号机就不打印,直到队列有空闲位置。

银行开多个窗口(多个消费者线程)从队列里叫号,办完一个,再叫一个。办理的业务是原子性的(存或者取都在一个人手上完成,中间流程不可分割)。也不会同时有多个柜台操作你的帐户,所以是线程安全的。

如果都办理完了,叫号机和柜台都陷入了等待状态,打一会儿瞌睡,线程阻塞,直到有新的顾客来办理业务。

三种形式的Queue:

FIFO队列

图片来源于网络

class Queue.Queue(maxsize=0)

FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

LIFO队列

图片来源于网络

class Queue.LifoQueue(maxsize=0)

LIFO即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上

Priority队列

class Queue.PriorityQueue(maxsize=0)

构造一个优先队列。优先级级别越低的越先出来,maxsize用法同上。

常用方法

创建一个队列

# maxsize 表示队列长度,小于1表示队列长度无限。

import Queue

q = Queue.Queue(maxsize=10)

将一个值放入队列

# put(item[,block[,timeout]])

# 参数item为必选参数

# block 为可选参数,默认为True

# 如果队列为空且block=True,put()使得调用线程阻塞,直到空出一个数据单元。

# 如果队列为空且block=False,put()将抛出Full异常。

# 将10插入队尾

q.put(10)

将一个值从队列中取出

# get([block[,timeout]])

# 参数block为可选参数,默认为True

# 如果队列为空且block=True,get()使得调用线程阻塞,直到有新数据产生。

# 如果队列为空且block=False,get()将抛出Empty异常。

# 从对列头部取出一个数据

q.get()

获取队列的大小

q.qsize()

判断队列是否为空

# 队列为空返回True,反之返回False

q.empty()

判断队列是否已满

# 队列已满返回True,反之返回False

q.full()

task_done()

意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。

如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。

join()

阻塞调用线程,直到队列中的所有任务被处理掉。

只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。

其他

q.put_nowait(item) == q.put(item,False)

q.get_nowait() == q.get(False)

一个最简单的生产者-消费者模式

# coding: utf-8

# filename: queue.py

# author: caord@showwant.com

import time

import Queue

import threading

class Producer(threading.Thread):

def __init__(self, thread_name, queue):

threading.Thread.__init__(self, name=thread_name)

self.data = queue

def run(self):

for i in range(20):

print('%s:%s is producing %d to the queue!' % (time.ctime(), self.getName(), i))

self.data.put(i)

time.sleep(1)

print('%s: %s finished!' % (time.ctime(), self.getName()))

time.sleep(10)

for i in range(10):

self.data.put(i)

class Consumer(threading.Thread):

def __init__(self, thread_name, queue):

threading.Thread.__init__(self, name=thread_name)

self.data = queue

def run(self):

while 1:

try:

num = self.data.get()

print('%s: %s is consuming. %d in the queue is consumed!' % (time.ctime(), self.getName(), num))

except:

print('%s: %s finished!' % (time.ctime(), self.getName()))

#break

def main():

queue = Queue.Queue(maxsize=20)

producer = Producer('producer', queue)

consumer = Consumer('consumer', queue)

producer.start()

consumer.start()

producer.join()

consumer.join()

print('All thread finished')

if __name__ == '__main__':

main()

python queue 生产者 消费者_Queue: 应用于生产者-消费者模式的Python队列相关推荐

  1. python queue 生产者 消费者_【python】-- 队列(Queue)、生产者消费者模型

    队列(Queue) 在多个线程之间安全的交换数据信息,队列在多线程编程中特别有用 队列的好处: 提高双方的效率,你只需要把数据放到队列中,中间去干别的事情. 完成了程序的解耦性,两者关系依赖性没有不大 ...

  2. Python爬虫的经典多线程方式,生产者与消费者模型

    在之前的文章当中我们曾经说道,在多线程并发的场景当中,如果我们需要感知线程之间的状态,交换线程之间的信息是一件非常复杂和困难的事情.因为我们没有更高级的系统权限,也没有上帝视角,很难知道目前运行的状态 ...

  3. Python使用两个Event对象同步生产者消费者问题

    问题描述:如果缓冲区满则生产者等待,若空则生产者往缓冲区放置物品至缓冲区满:如果缓冲区空则消费者等待,若满则消费者从缓冲区获取物品进行消费直至缓冲区空. 参考代码: 1)首先导入相应的模块 2)编写生 ...

  4. python 全栈开发,Day39(进程同步控制(锁,信号量,事件),进程间通信(队列,生产者消费者模型))...

    昨日内容回顾 python中启动子进程 并发编程 并发 :多段程序看起来是同时运行的 ftp 网盘 不支持并发 socketserver 多进程 并发 异步 两个进程 分别做不同的事情 创建新进程 j ...

  5. Python之进程+线程+协程(生产者消费者模型)

    本篇主要总结一下非常有名的生成者消费者模型 概念引用 1.为什么要使用生产者和消费者模型? 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快, ...

  6. 生产者消费者 java实现_Java生产者消费者的三种实现

    Java生产者消费者是最基础的线程同步问题,java岗面试中还是很容易遇到的,之前没写过多线程的代码,面试中被问到很尬啊,面完回来恶补下.在网上查到大概有5种生产者消费者的写法,分别如下. 用sync ...

  7. RabbitMQ:消费者ACK机制、生产者消息确认

    文章目录 基础案例环境搭建: 环境: 1. 生产者发送消息确认 1.1 confirm 确认模式 1.2 return 退回模式 源代码 1.1.3 小结 2. 消费者签收消息(ACK) 2.1 代码 ...

  8. RabbitMQ入门4:生产者、消费者演示;多个消费者平均压力、公平派遣;

    说明: (1)内容说明: ● 这儿我们会创建一个项目,演示RabbitMQ最基础的内容: 通过,这个最简单的例子,先了解:如何使用RabbitMQ,如何连接RabbitMQ,如何发送消息,如何接收消息 ...

  9. RabbitMQ中的生产者消费者与订阅发布者两种模式

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件).AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进 ...

最新文章

  1. 如何解决2012年7月1日增加闰秒后引起linux系统重启问题
  2. oracle选择语言设置,oracle本地语言变量设置
  3. 皮一皮:这样的消息我也想收...
  4. 每日一皮:所以重点到底是啥...
  5. 霍金的预言正在实现,我们已经离不开人工智能,而它们在脱离控制
  6. string转map集合_集合(下)
  7. linux curl
  8. sqlserver 更新 datetime 数据_SqlServer 关于 datetime 的更新引发的思考
  9. 新手网管升级之路(三)
  10. OpenStack 已死?
  11. -bash: unzip: 未找到命令_29 条运维工程师必会实用 Linux 命令
  12. python打包exe_Python | 用Pyinstaller打包发布exe应用
  13. 剑指offer -- 反转链表
  14. python下载数据集出现:Compressed file ended before the end-of-stream marker was reached
  15. Struts2.1.6 + Spring2.5+Hibernate3.2整合
  16. html5应用方法,9 个用来加速 HTML5 应用的方法
  17. MySQL命令行登录数据库
  18. Windows清理助手ARSWP
  19. 办公一定要有哪些基础软件?
  20. 投票系统(投票问题可多选、带柱状图)

热门文章

  1. Java项目:SSM实现的儿童摄影预约网站平台
  2. Qt设计用户界面的三种方法
  3. HRESULT详细说明
  4. Verilog 每日一题 (VL27 可置位计数器)
  5. centos 源码安装:iftop
  6. asp.net+C#房屋销售房产管理系统网站
  7. python进行excle表格脱敏批处理
  8. JAVA计算机毕业设计吃到撑零售微商城Mybatis+系统+数据库+调试部署
  9. Scanner 的知识与使用
  10. 机器学习损失函数之似然函数