继上节使用原生多进程并行运行,基于Redis作为消息队列完成了圆周率的计算,本节我们使用原生操作系统消息队列来替换Redis。

文件

使用文件进行通信是最简单的一种通信方式,子进程将结果输出到临时文件,父进程从文件中读出来。文件名使用子进程的进程id来命名。进程随时都可以通过 os.getpid() 来获取自己的进程id。

# coding: utf-8import os
import sys
import mathdef slice(mink, maxk):s = 0.0for k in range(mink, maxk):s += 1.0/(2*k+1)/(2*k+1)return sdef pi(n):pids = []unit = n / 10for i in range(10):  # 分10个子进程mink = unit * imaxk = mink + unitpid = os.fork()if pid > 0:pids.append(pid)else:s = slice(mink, maxk)  # 子进程开始计算with open("%d" % os.getpid(), "w") as f:f.write(str(s))sys.exit(0)  # 子进程结束sums = []for pid in pids:os.waitpid(pid, 0)  # 等待子进程结束with open("%d" % pid, "r") as f:sums.append(float(f.read()))os.remove("%d" % pid)  # 删除通信的文件return math.sqrt(sum(sums) * 8)print pi(10000000)

输出

3.14159262176

管道pipe

管道是Unix进程间通信最常用的方法之一,它通过在父子进程之间开通读写通道来进行双工交流。我们通过os.read()和os.write()来对文件描述符进行读写操作,使用os.close()关闭描述符。

上图为单进程的管道

上图为父子进程分离后的管道

# coding: utf-8import os
import sys
import mathdef slice(mink, maxk):s = 0.0for k in range(mink, maxk):s += 1.0/(2*k+1)/(2*k+1)return sdef pi(n):childs = {}unit = n / 10for i in range(10):  # 分10个子进程mink = unit * imaxk = mink + unitr, w = os.pipe()pid = os.fork()if pid > 0:childs[pid] = r  # 将子进程的pid和读描述符存起来os.close(w)  # 父进程关闭写描述符,只读else:os.close(r)  # 子进程关闭读描述符,只写s = slice(mink, maxk)  # 子进程开始计算os.write(w, str(s))os.close(w)  # 写完了,关闭写描述符sys.exit(0)  # 子进程结束sums = []for pid, r in childs.items():sums.append(float(os.read(r, 1024)))os.close(r)  # 读完了,关闭读描述符os.waitpid(pid, 0)  # 等待子进程结束return math.sqrt(sum(sums) * 8)print pi(10000000)

输出

3.14159262176

无名套接字socketpair

我们知道跨网络通信免不了要通过套接字进行通信,但是本例的多进程是在同一个机器上,用不着跨网络,使用普通套接字进行通信有点浪费。

上图为单进程的socketpair

上图为父子进程分离后的socketpair

为了解决这个问题,Unix系统提供了无名套接字socketpair,不需要端口也可以创建套接字,父子进程通过socketpair来进行全双工通信。

socketpair返回两个套接字对象,一个用于读一个用于写,它有点类似于pipe,只不过pipe返回的是两个文件描述符,都是整数。所以写起代码形式上跟pipe几乎没有什么区别。

我们使用sock.send()和sock.recv()来对套接字进行读写,通过sock.close()来关闭套接字对象。

# coding: utf-8import os
import sys
import math
import socketdef slice(mink, maxk):s = 0.0for k in range(mink, maxk):s += 1.0/(2*k+1)/(2*k+1)return sdef pi(n):childs = {}unit = n / 10for i in range(10):  # 分10个子进程mink = unit * imaxk = mink + unitrsock, wsock = socket.socketpair()pid = os.fork()if pid > 0:childs[pid] = rsockwsock.close()else:rsock.close()s = slice(mink, maxk)  # 子进程开始计算wsock.send(str(s))wsock.close()sys.exit(0)  # 子进程结束sums = []for pid, rsock in childs.items():sums.append(float(rsock.recv(1024)))rsock.close()os.waitpid(pid, 0)  # 等待子进程结束return math.sqrt(sum(sums) * 8)print pi(10000000)

输出

3.14159262176

OS消息队列

操作系统也提供了跨进程的消息队列对象可以让我们直接使用,只不过python没有默认提供包装好的api来直接使用。我们必须使用第三方扩展来完成OS消息队列通信。第三方扩展是通过使用Python包装的C实现来完成的。

OS消息队列有两种形式,一种是posix消息队列,另一种是systemv消息队列,有些操作系统两者都支持,有些只支持其中的一个,比如macos仅支持systemv消息队列,我本地的python的docker镜像是debian linux,它仅支持posix消息队列。

posix消息队列我们先使用posix消息队列来完成圆周率的计算,posix消息队列需要提供一个唯一的名称,它必须是 / 开头。close()方法仅仅是减少内核消息队列对象的引用,而不是彻底关闭它。unlink()方法才能彻底销毁它。O_CREAT选项表示如果不存在就创建。向队列里塞消息使用send方法,收取消息使用receive方法,receive方法返回一个tuple,tuple的第一个值是消息的内容,第二个值是消息的优先级。之所以有优先级,是因为posix消息队列支持消息的排序,在send方法的第二个参数可以提供优先级整数值,默认为0,越大优先级越高。

# coding: utf-8import os
import sys
import math
from posix_ipc import MessageQueue as Queuedef slice(mink, maxk):s = 0.0for k in range(mink, maxk):s += 1.0/(2*k+1)/(2*k+1)return sdef pi(n):pids = []unit = n / 10q = Queue("/pi", flags=os.O_CREAT)for i in range(10):  # 分10个子进程mink = unit * imaxk = mink + unitpid = os.fork()if pid > 0:pids.append(pid)else:s = slice(mink, maxk)  # 子进程开始计算q.send(str(s))q.close()sys.exit(0)  # 子进程结束sums = []for pid in pids:sums.append(float(q.receive()[0]))os.waitpid(pid, 0)  # 等待子进程结束q.close()q.unlink()  # 彻底销毁队列return math.sqrt(sum(sums) * 8)print pi(10000000)

输出

3.14159262176

systemv消息队列systemv消息队列和posix消息队列用起来有所不同。systemv的消息队列是以整数key作为名称,如果不指定,它就创建一个唯一的未占用的整数key。它还提供消息类型的整数参数,但是不支持消息优先级。

# coding: utf-8import os
import sys
import math
import sysv_ipc
from sysv_ipc import MessageQueue as Queuedef slice(mink, maxk):s = 0.0for k in range(mink, maxk):s += 1.0/(2*k+1)/(2*k+1)return sdef pi(n):pids = []unit = n / 10q = Queue(key=None, flags=sysv_ipc.IPC_CREX)for i in range(10):  # 分10个子进程mink = unit * imaxk = mink + unitpid = os.fork()if pid > 0:pids.append(pid)else:s = slice(mink, maxk)  # 子进程开始计算q.send(str(s))sys.exit(0)  # 子进程结束sums = []for pid in pids:sums.append(float(q.receive()[0]))os.waitpid(pid, 0)  # 等待子进程结束q.remove()  # 销毁消息队列return math.sqrt(sum(sums) * 8)print pi(10000000)

输出

3.14159262176

共享内存

共享内存也是非常常见的多进程通信方式,操作系统负责将同一份物理地址的内存映射到多个进程的不同的虚拟地址空间中。进而每个进程都可以操作这份内存。考虑到物理内存的唯一性,它属于临界区资源,需要在进程访问时搞好并发控制,比如使用信号量。我们通过一个信号量来控制所有子进程的顺序读写共享内存。我们分配一个8字节double类型的共享内存用来存储极限的和,每次从共享内存中读出来时,要使用struct进行反序列化(unpack),将新的值写进去之前也要使用struct进行序列化(pack)。每次读写操作都需要将读写指针移动到内存开头位置(lseek)。

# coding: utf-8import os
import sys
import math
import struct
import posix_ipc
from posix_ipc import Semaphore
from posix_ipc import SharedMemory as Memorydef slice(mink, maxk):s = 0.0for k in range(mink, maxk):s += 1.0/(2*k+1)/(2*k+1)return sdef pi(n):pids = []unit = n / 10sem_lock = Semaphore("/pi_sem_lock", flags=posix_ipc.O_CREX, initial_value=1)  # 使用一个信号量控制多个进程互斥访问共享内存memory = Memory("/pi_rw", size=8, flags=posix_ipc.O_CREX)os.lseek(memory.fd, 0, os.SEEK_SET)  # 初始化和为0.0的double值os.write(memory.fd, struct.pack('d', 0.0))for i in range(10):  # 分10个子进程mink = unit * imaxk = mink + unitpid = os.fork()if pid > 0:pids.append(pid)else:s = slice(mink, maxk)  # 子进程开始计算sem_lock.acquire()try:os.lseek(memory.fd, 0, os.SEEK_SET)bs = os.read(memory.fd, 8)  # 从共享内存读出来当前值cur_val, = struct.unpack('d', bs)  # 反序列化,逗号不能少cur_val += s  # 加上当前进程的计算结果bs = struct.pack('d', cur_val) # 序列化os.lseek(memory.fd, 0, os.SEEK_SET)os.write(memory.fd, bs)  # 写进共享内存memory.close_fd()finally:sem_lock.release()sys.exit(0)  # 子进程结束sums = []for pid in pids:os.waitpid(pid, 0)  # 等待子进程结束os.lseek(memory.fd, 0, os.SEEK_SET)bs = os.read(memory.fd, 8)  # 读出最终这结果sums, = struct.unpack('d', bs)  # 反序列化memory.close_fd()  # 关闭共享内存memory.unlink()  # 销毁共享内存sem_lock.unlink()  #  销毁信号量return math.sqrt(sums * 8)print pi(10000000)

输出

3.14159262176

更多Python视频、源码、资料加群725638078免费获取

深入Python进程间通信原理相关推荐

  1. 深入Python进程间通信原理--图文版

    继上节使用原生多进程并行运行,基于Redis作为消息队列完成了圆周率的计算,本节我们使用原生操作系统消息队列来替换Redis. 文件 使用文件进行通信是最简单的一种通信方式,子进程将结果输出到临时文件 ...

  2. python进程间通信--信号Signal

    信号signal 是python进程间通信多种机制中的其中一种机制.可以对操作系统进程的控制,当进程中发生某种原因而中断时,可以异步处理这个异常. 信号通过注册的方式'挂'在一个进程中,并且不会阻塞该 ...

  3. python array赋值_从踩坑学Python内部原理(5):执行时机的差异

    (给Python开发者加星标,提升Python技能) 英文:Satwik Kansal,翻译:暮晨 Python开发者整理自 GitHub [导读]:Python 是一个设计优美的解释型高级语言,它提 ...

  4. python爬虫原理-python学习之python爬虫原理

    原标题:python学习之python爬虫原理 今天我们要向大家详细解说python爬虫原理,什么是python爬虫,python爬虫工作的基本流程是什么等内容,希望对这正在进行python爬虫学习的 ...

  5. python爬虫原理-python爬虫原理详细讲解

    原标题:python爬虫原理详细讲解 一 .爬虫是什么 1.什么是互联网? 互联网是由网络设备(网线,路由器,交换机,防火墙等等)和一台台计算机连接而成,像一张网一样. 2.互联网建立的目的?互联网的 ...

  6. python epoll 并发_Python语言之python并发原理(阻塞、非阻塞、epoll)

    本文主要向大家介绍了Python语言之python并发原理(阻塞.非阻塞.epoll),通过具体的内容向大家展示,希望对大家学习Python语言有所帮助. 在Linux系统中 01 阻塞服务端 特征: ...

  7. Python多线程原理与实现

    Date: 2019-06-04 Author: Sun Python多线程原理与实战 目的: (1)了解python线程执行原理 (2)掌握多线程编程与线程同步 (3)了解线程池的使用 1 线程基本 ...

  8. Python 进程间通信 Queue / Pipe - Python零基础入门教程

    目录 一.前言 1.使用 Queue 线程间通信 2.使用 Queue 进程间通信,适用于多个进程之间通信 3.使用 Pipe 进程间通信,适用于两个进程之间通信(一对一) 二.python 进程间通 ...

  9. 可对Python运行原理进行可视化分析的工具

    Python Tutor Python Tutor是一个对Python运行原理进行可视化分析的工具,网址为: http://www.pythontutor.com 使用范例 参考链接 转载于:http ...

最新文章

  1. 分布式并行计算MapReduce
  2. sdh管理单元指针_C语言学习|指针的介绍
  3. jQuery_事件学习
  4. 前端框架 渐进式框架
  5. 怎样玩转千万级别的数据
  6. 二分排序java实现
  7. OpenGL ES API with no current context
  8. 复杂网络分析工具及其比较(转)
  9. visual foxpro c语言教程,VFP简单初级入门教程超好.pdf
  10. Oracle linux系统安装oracle11g
  11. zigbee协议栈 任务、事件与轮询机制
  12. 「音视频直播技术」Android下H264解码
  13. 收款码三合一生成系统源码
  14. 计算机相关国内外文献,计算机外文文献
  15. cad直线和圆弧倒角不相切_在cad绘制倒圆角的方法技巧步骤详解
  16. C语言理论小学案例试讲,【精选】C语言试讲 演示文稿C语言试讲 演示文稿.ppt
  17. oracle中sql的递归查询运用
  18. 工程师在创业团队的技术挑战
  19. uniapp 微信小程序地图,在高德app、腾讯app、百度app、苹果地图app 中打开
  20. 流行的框架Nohttp到来,让我们见证封装好的Nohttp详细使用步骤吧

热门文章

  1. 如何监控代理记账行业的客户信息安全?
  2. 电商扣减库存_二类电商:鲁班今日最新消息!
  3. Elastic Stack容器化部署拓展(Https、AD域集成)并收集Cisco设备的日志信息
  4. 【TS】1010- 细数 10 个 TypeScript 奇怪的符号
  5. 价值规律在现代社会基本失效
  6. 计算机视觉大型攻略 —— 特征与匹配(3)特征描述符
  7. 平台云基石-CoreOS之集群篇(无需互联网)
  8. Java 使用dcm4che的工具类findscu查询pacs数据
  9. c#中在一个窗体中触发另一个窗体的事件
  10. python面试题常考的超市购物车系统