目录

  • 队列、生产者消费者模型、初识线程

    • 一、用进程锁来优化抢票小程序

      • 1.1 进程锁
      • 1.2 优化抢票小程序
    • 二、队列
      • 2.1 队列的介绍
      • 2.2 创建队列的类
      • 2.3 使用队列的案例
    • 三、生产者消费者模型
      • 3.1 用队列Queue实现生产者消费者模型
      • 3.2 用队列JoinableQueue实现生产者消费者模型

队列、生产者消费者模型、初识线程

一、用进程锁来优化抢票小程序

1.1 进程锁

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端是没有问题的。而共享带来的是竞争,竞争带来的结果就是错乱,那就需要加锁处理来控制。

多个进程共享同一打印终端的时候,并发运行的话虽然效率高,但是竞争同一打印终端,带来了打印错乱;而由并发变成串行,虽然牺牲了运行效率,但是避免了竞争。

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,速度是慢了,但是能够保证数据安全。

进程锁的使用需要from multiprocessing import Process ,Lock

1.2 优化抢票小程序

from multiprocessing import Process,Lock
import json,time,osdef search():time.sleep(1)  # 模拟网络iowith open('db.txt',mode='rt',encoding='utf-8') as fr:res = json.load(fr)print(f"还剩{res['count']}张票")def get():with open('db.txt',mode='rt',encoding='utf-8') as fr:res = json.load(fr)time.sleep(1)if res['count'] > 0:res['count'] -= 1with open('db.txt',mode='wt',encoding='utf-8') as fw:json.dump(res,fw)print(f"进程{os.getpid()}抢票成功")time.sleep(1.5) # 模拟网络ioelse:print('票已售罄!!!')def task(lock):search()lock.acquire()  # 用进程锁锁住get()lock.release()  # 释放锁头if __name__ == '__main__':lock = Lock() # 写在主进程是为了让子进程拿到同一把锁for i in range(15):  # 同时有15个人抢票p = Process(target=task,args=(lock,))p.start()# 进程锁和join用法的区别# 进程锁:是把锁住的代码变成了串行# join:是把所有的子进程变成了串行# 为了保证数据的安全,只能串行,牺牲掉效率

二、队列

2.1 队列的介绍

对于上面的进程锁的例子,我们可以寻找一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。

这就是multiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道

ipc机制:进程通讯

管道:pipe 基于共享的内存空间

队列:Queue = pipe+锁

队列和管道都是将数据存放于内存中的,队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可扩展性。

2.2 创建队列的类

底层就是以管道和锁定的方式实现

Queue([maxsize]):  创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。maxsize是队列中允许最大项数,省略则无大小限制。    

==要注意的是:Queue不适合传大文件,通常传一些信息==

类中方法介绍:**

  • 主要方法:

    1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
    2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
    3
    4 q.get_nowait():同q.get(False)
    5 q.put_nowait():同q.put(False)
    6
    7 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
    8 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
    9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
  • 其他方法(了解)

    1 q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
    2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
    3 q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为
    '''

2.3 使用队列的案例

案例一

from multiprocessing import Process,Queueq = Queue()
q.put('鸭屁屁')  # 插入数据到队列中
q.put([1,2,4])
q.put(2)
print(q.get())
print(q.get())
print(q.get())
#q.put(5)  # 往队列中再放入一个值,下面的代码就不会阻塞
print(q.get())  # 前三个已经把值拿掉了,这里就会默认一直等着拿值,就阻塞住了

案例二

from multiprocessing import Process,Queue
q = Queue(4) # 括号里的参数是指定队列里值得最大个数
q.put('鸭屁屁')
q.put([1,2,3])
q.put([2,3,4])
q.put(5)
q.put(6)  # 队列满了的情况再放值,会阻塞

案例三(以下几个案例了解即可)

from multiprocessing import Process,Queue
q = Queue(3)
q.put('zhao',block=True,timeout=2) # block=True:默认会阻塞,timeout:指定阻塞的时间
q.put('zhao',block=True,timeout=2)
q.put('zhao',block=True,timeout=2)q.put('zhao',block=True,timeout=5) # put里的 block=True 如果满了会等待,timeout最多等待n秒,如果n秒队列还是满的就报错了

案例四

from multiprocessing import Process,Queue
q = Queue()
q.put('yyyy')
q.get()
q.get(block=True,timeout=5) #get里的 block=True 阻塞等待,timeout最多等5s, 5s后还是取不到就报错

案例五

from multiprocessing import Process,Queue
q = Queue(3)
q.put('qwe')
q.put('qwe')
q.put('qwe')q.put('qwe',block=False) # 对于put来说 block=False 如果队列满了就直接报错q = Queue(3)
q.put('qwe')
q.get()q.get(block=False)  # 对于get来说 block = Flase 拿不到不阻塞,直接报错

案例六

from multiprocessing import Process,Queue
q = Queue(1)
q.put('123')
# q.get()
q.put_nowait('666')  # put_nowait方法相当于put里的block = Flase
# q.get_nowait()   # get_nowait方法相当于get里的block = Flase

三、生产者消费者模型

3.1 用队列Queue实现生产者消费者模型

生产者: 生产数据的任务

消费者: 处理数据的任务

生产者--队列(盆)-->消费者

生产者可以不停的生产,达到了自己最大的生产效率,消费者可以不停的消费,也达到了自己最大的消费效率.

生产者消费者模型大大提高了生产者生产的效率和消费者消费的效率.

补充: queue不适合传大文件,通产传一些消息.

from multiprocessing import Process,Queue
import time,randomdef producer(q,name,food):'''生产者进程'''for i in range(3):print(f"{name}生产了{food}{i}")time.sleep(random.randint(1,3))res = f"{food}{i}"q.put(res)def consumer(q,name):'''消费者进程'''while True:res = q.get(timeout=5)if res is None:break  # 收到空信号就结束time.sleep(random.randint(1,3))print(f"{name}吃了{res}")if __name__ == '__main__':q = Queue()# 生产者对象p1 = Process(target=producer,args=(q,'rocky','生蚝'))p2 = Process(target=producer,args=(q,'nick','韭菜'))p3 = Process(target=producer,args=(q,'tank','扇贝'))# 消费者对象c1 = Process(target=consumer,args=(q,'蓬妹'))c2 = Process(target=consumer,args=(q,'山鸡'))# 生产者开始生产,消费者开始吃p1.start()p2.start()p3.start()c1.start()c2.start()# 必须保证生产者生产完才能发送结束的信号,用到Process的join方法p1.join()  # 感知子进程的结束p2.join()p3.join()  # 到这里生产者生产完毕q.put(None)  # 有几个消费者就put几次q.put(None)

3.2 用队列JoinableQueue实现生产者消费者模型

JoinableQueue:这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:

q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常

q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止。

from multiprocessing import Process,JoinableQueue
import time
import randomdef producer(name,food,q):'''生产者进程'''for i in range(2):time.sleep(random.randint(1,3))res = f"{food}{i}"q.put(res)print(f'\033[44m{name} 生产了 {res}\033[0m')q.join()  #生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。def consumer(name,q):'''消费者进程'''while True:res = q.get()time.sleep(random.randint(1,3))print(f"{name}吃了{res}")q.task_done()  #向q.join()发送一次信号,证明一个数据已经被取走了if __name__ == '__main__':q = JoinableQueue()#生产者p1 = Process(target=producer,args=('yjy','酱大骨',q))p2 = Process(target=producer,args=('wwb','酸菜鱼',q))p3 = Process(target=producer,args=('hhh','卤猪蹄',q))#消费者c1 = Process(target=consumer,args = ('xx',q,))c2 = Process(target=consumer,args = ('yy',q,))c1.daemon = True  #将他设置为守护进程c2.daemon = True  #将他设置为守护进程#开始生产,开始吃l = [p1,p2,p3,c1,c2]for i in l :i.start()#必须保证生产者生产完才能发送结束的信号,运用到.joinp1.join()  #感知子进程的结束p2.join()p3.join()print("主进程结束了")'''
主进程等p1,p2,p3
P1,P2,p3等c,c2
p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
#因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。
'''

转载于:https://www.cnblogs.com/zhuangyl23/p/11529792.html

队列、生产者消费者模型相关推荐

  1. python 全栈开发,Day39(进程同步控制(锁,信号量,事件),进程间通信(队列,生产者消费者模型))...

    昨日内容回顾 python中启动子进程 并发编程 并发 :多段程序看起来是同时运行的 ftp 网盘 不支持并发 socketserver 多进程 并发 异步 两个进程 分别做不同的事情 创建新进程 j ...

  2. 阻塞队列——生产者消费者模型

    目录

  3. 生产者消费者模型【新版】

    目录 啥是生产者消费者模型? 生产者消费者模型存在问题??如何进行解决呢?? 生产者消费者模型导致的问题 什么是阻塞队列 生产者消费者模型优点 生产者消费者模型实现 Message MessageQu ...

  4. 进程 互斥锁、队列与管道、生产者消费者模型

    目录 1.互斥锁 2.队列与管道 3.生产者消费者模型(Queue) 4.生产者消费者模型(JoinableQueue) 1.互斥锁 首先导入Lock模块 实例化一把锁 但是每次生成子进程的时候都会重 ...

  5. 11.python并发入门(part8 基于线程队列实现生产者消费者模型)

    一.什么是生产者消费者模型? 生产者就是生产数据的线程,消费者指的就是消费数据的线程. 在多线程开发过程中,生产者的速度比消费者的速度快,那么生产者就必须等待消费者把数据处理完,生产者才会产生新的数据 ...

  6. Python之路(第三十八篇) 并发编程:进程同步锁/互斥锁、信号量、事件、队列、生产者消费者模型...

    一.进程锁(同步锁/互斥锁) 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理. 例 ...

  7. python queue 生产者 消费者_【python】-- 队列(Queue)、生产者消费者模型

    队列(Queue) 在多个线程之间安全的交换数据信息,队列在多线程编程中特别有用 队列的好处: 提高双方的效率,你只需要把数据放到队列中,中间去干别的事情. 完成了程序的解耦性,两者关系依赖性没有不大 ...

  8. 分布与并行计算—生产者消费者模型队列(Java)

    在生产者-消费者模型中,在原有代码基础上,把队列独立为1个类实现,通过公布接口,由生产者和消费者调用. public class Consumer implements Runnable {int n ...

  9. 进程锁、事件、进程队列、进程间共享数据、生产者消费者模型

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 进程锁(Lock) 锁的基本概念 锁的基本用法 模拟12306抢票软件 信号量:Semaphone 概念 Semaphore ...

  10. [Linux]生产者消费者模型(基于BlockQueue的生产者消费者模型 | 基于环形队列的生产者消费者模型 | 信号量 )

    文章目录 生产者消费者模型 函数调用角度理解生产者消费者模型 生活角度理解生产者消费者模型 为什么要使用生产者消费者模型 生产者消费者模型优点 321原则 基于BlockingQueue的生产者消费者 ...

最新文章

  1. HDOJ_ACM_数塔
  2. IntelliJ IDEA 的 .idea 目录加入.gitignore无效的解决方法
  3. ANSYS FLEXlm License Manager的卸载方法
  4. 为什么用redis?
  5. 把虚拟机装到内存里(打开PScs3只需要2秒)
  6. shell、ftp、mysql如何连接笔记
  7. Python 装饰器笔记
  8. 数据仓库专题19-数据建模语言Information Engineering - IE模型(转载)
  9. 基于卷积神经网络的大豆病害识别
  10. 【CF-gym101964:B】Broken Watch(找规律+推导+自然溢出/java大数)
  11. 带滤镜拍照的app_这8款拍照修图APP,简直就是逼格神器!
  12. android 网页取词,有道词典屏幕取词怎么用?,你知道吗?在浏览网页
  13. JavaWebServlet学生教师信息管理系统【JavaWeb】Servlet+Mysql+Jsp+Tomcat
  14. Windows如何设置夜间模式(护眼模式)超简单
  15. Clion解决c++源文件多个编译运行
  16. 微信小程序——章节自测七
  17. 使用JBE(Java Bytecode Editor)修改Java字节码
  18. 幼儿园班级信息管理系统
  19. 十、如何给标识符命名
  20. win8.1怎样打开计算机名,Win8怎么打开cmd命令窗口_Win8.1打开命令提示符的方法-192路由网...

热门文章

  1. JAVA当中令人蛋疼的projected
  2. 众说区块链:区块链在食品安全溯源领域中的应用
  3. JS验证电话号码格式
  4. mobaxterm macro 导入 导出 清空 乱码
  5. [GXYCTF 2019]BabySqli
  6. js改变原数组的方法和不改变原数组
  7. 漫画 | 产品经理频繁更改需求,我没忍住把他给砍了!
  8. 苹果终端date命令_mac date命令详解
  9. 小白的红米ac2100刷breed记录
  10. Linux批量域名查询IP