一前言 
   使用python进行并发处理多台机器/多个实例的时候,我们可以使用threading ,但是由于著名的GIL存在,实际上threading 并未提供真正有效的并发处理,要充分利用到多核CPU,我们需要使用多进程。Python提供了非常好用的多进程包--multiprocessing。multiprocessing 可以利用multiprocessing.Process对象来创建一个进程,该Process对象与Threading对象的用法基本相同,具有相同的方法(官方原话:"The multiprocessing package mostly replicates the API of the threading module.") 比如:start(),run(),join()的方法。multiprocessing包中也有Lock/Event/Semaphore/Condition/Pipe/Queue类用于进程之间的通信。话不多说 show me the code!

二使用
2.1 初识异同
下面的程序显示threading和multiprocessing的在使用方面的异同,相近的函数join(),start(),append() 等,并做同一件事情打印自己的进程pid

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. import os
  4. import threading
  5. import multiprocessing
  6. def printer(msg):
  7. print(msg, os.getpid())
  8. print('Main begin:', os.getpid())
  9. # threading
  10. record = []
  11. for i in range(5):
  12. thread = threading.Thread(target=printer, args=('threading',))
  13. thread.start()
  14. record.append(thread)
  15. for thread in record:
  16. thread.join()
  17. # multi-process
  18. record = []
  19. for i in range(5):
  20. process = multiprocessing.Process(target=printer, args=('multiprocessing',))
  21. process.start()
  22. record.append(process)
  23. for process in record:
  24. process.join()
  25. print('Main end:', os.getpid())

输出结果

点击(此处)折叠或打开

  1. Main begin: 9524
  2. threading 9524
  3. threading 9524
  4. threading 9524
  5. threading 9524
  6. threading 9524
  7. multiprocessing 9539
  8. multiprocessing 9540
  9. multiprocessing 9541
  10. multiprocessing 9542
  11. multiprocessing 9543
  12. Main end: 9524

从例子的结果可以看出多线程threading的进程id和主进程(父进程)pid一样 ,同为9524; 多进程打印的pid每个都不一样,for循环中每创建一个process对象都年开一个进程。其他相关的方法基本类似。

2.2 用法
创建进程的类:
Process([group [, target [, name [, args [, kwargs]]]]]),
target表示调用对象,
args表示调用对象的位置参数元组。
kwargs表示调用对象的字典。
name为进程的别名。
group实质上不使用,为None。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程,并自动调用run方法.
属性:authkey、daemon(要通过start()设置,必须设置在方法start之前)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

2.3 创建单进程
单线程比较简单,创建一个 Process的实例对象就好,传入参数 target 为已经定义好的方法worker以及worker需要的参数

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午6:45
  6. func:
  7. """
  8. import multiprocessing
  9. import datetime, time
  10. def worker(interval):
  11. print("process start: {0}".format(datetime.datetime.today()));
  12. time.sleep(interval)
  13. print("process end: {0}".format(datetime.datetime.today()));
  14. if __name__ == "__main__":
  15. p = multiprocessing.Process(target=worker, args=(5,))
  16. p.start()
  17. p.join()
  18. print "end!"

2.4 创建多进程

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午7:50
  6. func:
  7. """
  8. import multiprocessing
  9. def worker(num):
  10. print "worker %d" %num
  11. if __name__ == "__main__":
  12. print("The number of CPU is:" + str(multiprocessing.cpu_count()))
  13. proc = []
  14. for i in xrange(5):
  15. p = multiprocessing.Process(target=worker, args=(i,))
  16. proc.append(p)
  17. for p in proc:
  18. p.start()
  19. for p in proc:
  20. p.join()
  21. print "end ..."

输出

点击(此处)折叠或打开

  1. The number of CPU is:4
  2. worker 0
  3. worker 1
  4. worker 2
  5. worker 3
  6. worker 4
  7. main process end ...

2.5 线程池
multiprocessing提供进程池的类--Pool,它可以指定程序最大可以调用的进程数量,当有新的请求提交到pool中时,如果进程池还没有满,那么就会创建一个新的进程用来执行该请求;但如果进程池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
构造方法:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes  : 使用的工作进程的数量,如果processes是None,默认使用os.cpu_count()返回的数量。
initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
context: 用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool()或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。

实例方法:
  apply(func[, args[, kwds]]):同步进程池
  apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池
  close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
  terminate() : 结束工作进程,不在处理未完成的任务.
  join() : 等待工作线程的退出,在调用join()前必须调用close()或者 terminate(),因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午7:50
  6. func:
  7. """
  8. from multiprocessing import Pool
  9. import time
  10. def worker(num):
  11. print "worker %d" %num
  12. time.sleep(2)
  13. print "end worker %d" %num
  14. if __name__ == "__main__":
  15. proc_pool = Pool(2)
  16. for i in xrange(4):
  17. proc_pool.apply_async(worker, (i,)) #使用了异步调用,从输出结果可以看出来
  18. proc_pool.close()
  19. proc_pool.join()
  20. print "main process end ..."

输出结果

点击(此处)折叠或打开

  1. worker 0
  2. worker 1
  3. end worker 0
  4. end worker 1
  5. worker 2
  6. worker 3
  7. end worker 2
  8. end worker 3
  9. main process end ..

解释:创建一个进程池pool 对象proc_pool,并设定进程的数量为2,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为2,所以0、1会直接送到进程中执行,当其中的2个任务执行完之后才空出2进程处理对象2和3,所以会出现输出 worker 2 worker 3 出现在end worker 0 end worker 1之后。思考一下如果调用  proc_pool.apply(worker, (i,)) 的输出结果会是什么样的?

2.6 使用queue
multiprocessing提供队列类,可以通过调用multiprocessing.Queue(maxsize) 初始化队列对象,maxsize表示队列里面最多的元素个数。
例子 创建了两个函数入队,出队,出队处理时使用了lock特性,串行化取数据。

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午9:03
  6. func:
  7. """
  8. import time
  9. from multiprocessing import Process, current_process,Lock,Queue
  10. import datetime
  11. def inputQ(queue):
  12. time.sleep(1)
  13. info = "proc_name: " + current_process().name + ' was putted in queue at: ' + str(datetime.datetime.today())
  14. queue.put(info)
  15. def outputQ(queue,lock):
  16. info = queue.get()
  17. lock.acquire()
  18. print ("proc_name: " + current_process().name + ' gets info :' + info)
  19. lock.release()
  20. if __name__ == '__main__':
  21. record1 = [] # store input processes
  22. record2 = [] # store output processes
  23. lock = Lock() # To prevent messy print
  24. queue = Queue(3)
  25. for i in range(10):
  26. process = Process(target=inputQ, args=(queue,))
  27. process.start()
  28. record1.append(process)
  29. for i in range(10):
  30. process = Process(target=outputQ, args=(queue,lock))
  31. process.start()
  32. record2.append(process)
  33. for p in record1:
  34. p.join()
  35. queue.close() # No more object will come, close the queue
  36. for p in record2:
  37. p.join()

2.7 使用pipe 
Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。
用法 multiprocessing.Pipe([duplex])
该类返回一组对象实例(conn1, conn2),分别代表发送和接受消息的两端。

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午8:01
  6. func:
  7. """
  8. from multiprocessing import Process, Pipe
  9. def p1(conn, name):
  10. conn.send('hello ,{name}'.format(name=name))
  11. print "p1 receive :", conn.recv()
  12. conn.close()
  13. def p2(conn, name):
  14. conn.send('hello ,{name}'.format(name=name))
  15. print "p2 receive :", conn.recv()
  16. conn.close()
  17. if __name__ == '__main__':
  18. parent_conn, child_conn = Pipe()
  19. proc1 = Process(target=p1, args=(child_conn, "parent_conn"))
  20. proc2 = Process(target=p2, args=(parent_conn, "child_conn"))
  21. proc1.start()
  22. proc2.start()
  23. proc1.join()
  24. proc2.join()

输出:

点击(此处)折叠或打开

  1. p1 receive : hello ,child_conn
  2. p2 receive : hello ,parent_conn

该例子中 p1 p2 通过pipe 给彼此相互发送信息,p1 发送"parent_conn" 给 p2 ,p2 发送"child_conn" 给p1.
2.8 daemon程序对比结果

  1. import multiprocessing
  2. import datetime, time
  3. def worker(interval):
  4. print("process start: {0}".format(datetime.datetime.today()));
  5. time.sleep(interval)
  6. print("process end: {0}".format(datetime.datetime.today()));
  7. if __name__ == "__main__":
  8. p = multiprocessing.Process(target=worker, args=(5,))
  9. p.start()
  10. print "end!"

输出:

点击(此处)折叠或打开

  1. end!
  2. process start: 2017-07-02 18:47:30.656244
  3. process end: 2017-07-02 18:47:35.657464

设置 daemon = True,程序随着主程序结束而不等待子进程。

  1. import multiprocessing
  2. import datetime, time
  3. def worker(interval):
  4. print("process start: {0}".format(datetime.datetime.today()));
  5. time.sleep(interval)
  6. print("process end: {0}".format(datetime.datetime.today()));
  7. if __name__ == "__main__":
  8. p = multiprocessing.Process(target=worker, args=(5,))
  9. p.daemon = True
  10. p.start()
  11. print "end!"

输出:
end!
因为子进程设置了daemon属性,主进程结束,multiprocessing创建的进程对象就随着结束了。

  1. import multiprocessing
  2. import datetime, time
  3. def worker(interval):
  4. print("process start: {0}".format(datetime.datetime.today()));
  5. time.sleep(interval)
  6. print("process end: {0}".format(datetime.datetime.today()));
  7. if __name__ == "__main__":
  8. p = multiprocessing.Process(target=worker, args=(5,))
  9. p.daemon = True  #
  10. p.start()
  11. p.join() #进程执行完毕后再关闭
  12. print "end!"

输出:

点击(此处)折叠或打开

  1. process start: 2017-07-02 18:48:20.953754
  2. process end: 2017-07-02 18:48:25.954736

2.9 Lock()
当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
实例方法:
acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。
例子:
多个进程使用同一个std_out ,使用lock机制确保同一个时刻有一个一个进程获取输出。

  1. #!/usr/bin/env python
    # encoding: utf-8
    """
    author: yangyi@youzan.com
    time: 2017/7/2 下午9:28
    func: 
    """
    from multiprocessing import Process, Lock
    def func_with_lock(l, i):
        l.acquire()
        print 'hello world', i
        l.release()

    def func_without_lock(i):
        print 'hello world', i

    if __name__ == '__main__':
        lock = Lock()
        print "func_with_lock :"
        for num in range(10):
            Process(target=func_with_lock, args=(lock, num)).start()

输出:

点击(此处)折叠或打开

  1. func_with_lock :
  2. hello world 0
  3. hello world 1
  4. hello world 2
  5. hello world 3
  6. hello world 4
  7. hello world 5
  8. hello world 6
  9. hello world 7
  10. hello world 8
  11. hello world 9

三 小结
 本文参考官方资料以及其他资源,对multiprocesssing 的使用方式做了总结,还有很多知识需要详细阅读官方文档。纸上来得终觉浅,绝知此事要躬行。参考资料
[1]官方文档 
[2]Python标准库10 多进程初步 (multiprocessing包)

【Python】浅谈 multiprocessing相关推荐

  1. 【python 笔记/小白快速入门python】python浅谈(一)犹抱琵琶半遮面

    python浅谈(一)犹抱琵琶半遮面 继浅谈(零)初识庐山真面目[https://blog.csdn.net/HarryOtter/article/details/90519877 ] 之后,终于窥得 ...

  2. Python | 浅谈并发锁与死锁问题

    今天是 Python专题 的第24篇文章,我们一起来聊聊多线程场景当中不可或缺的另外一个部分-- 锁 . 很多人学习python,不知道从何学起. 很多人学习python,掌握了基本语法过后,不知道在 ...

  3. Python - 浅谈Python的编译与反编译

    1 - Python编译过程涉及的文件 py 源代码文件,由python.exe解释,可在控制台下运行,可用文本编辑器进行编辑: pyc 源代码文件经过编译后生成的二进制文件,无法用文本编辑器进行编辑 ...

  4. Python |浅谈爬虫的由来

    本文概要 本篇文章主要介绍Python爬虫的由来以及过程,适合刚入门爬虫的同学,文中描述和代码示例很详细,干货满满,感兴趣的小伙伴快来一起学习吧!

  5. Python浅谈gevent实现协程

    每日一怼:垃圾桶都收到花了,你却没有 认识gevent之前我们先来认识一下协程,什么是协程? 在笔者的看来就是在A程序与B程序的反复调用执行. 当A程序执行结束后调用执行B程序,B程序结束后调用A程序 ...

  6. 浅谈Python3函数命名空间与作用域

    日期:2020年1月23日 作者:Commas 注释:前一章节讲述了命名空间和作用域的知识,现在我们来谈一谈Python3函数的命名空间吧. 如果您想了解更多有关Python的知识,那么请点<我 ...

  7. python 共享内存变量_浅谈python多进程共享变量Value的使用tips

    前言: 在使用tornado的多进程时,需要多个进程共享一个状态变量,于是考虑使用multiprocessing.Value(对于该变量的具体细节请查阅相关资料).在根据网上资料使用Value时,由于 ...

  8. js打印线程id_浅谈python中的多线程和多进程(二)

    原创:hxj7 本文继续分享一个关于python多线程和多进程区别的例子 前文<浅谈python中的多线程和多进程>中我们分享过一个例子,就是分别利用python中的多线程和多进程来解决高 ...

  9. 获得进程id_浅谈python中的多线程和多进程(二)

    原创:hxj7 本文继续分享一个关于python多线程和多进程区别的例子 前文<浅谈python中的多线程和多进程>中我们分享过一个例子,就是分别利用python中的多线程和多进程来解决高 ...

最新文章

  1. Apache 基金会宣布 Apache Pulsar 毕业成为顶级项目
  2. 《偷梁换柱》全世界最最简单对付SMSS。EXE病毒的方法,可能也是对付某类流氓程序的好方法...
  3. 查询最近一千条数据mysql_保留mysql数据库中的最新1000条记录
  4. 五大主流深度学习框架Google趋势比较
  5. csrf攻击 java_java使用jsp servlet来防止csrf 攻击的实现方法
  6. 可变对象 不可变对象区别_对象应该是不可变的
  7. Java虚拟机运行时数据区
  8. Python基础学习:svn导出差异文件脚本
  9. 一头盔一电极,MIT机器人读心完毕
  10. java web流量阀值_Javaweb应用使用限流处理大量的并发请求详解
  11. serialize和unserialize函数
  12. 香肠派对电脑版_6款好玩的吃鸡小游戏,和平精英、香肠派对、迷你攻势、、、...
  13. 几何公差基础知识之圆柱度
  14. 小辣椒安卓机器人无命令_雅奇小辣椒(安卓手机app开发软件)V1.0.1 中文版
  15. 数字图像处理(MATLAB版
  16. ncode毛刺探测glyph自动识别及清除毛刺 流程图
  17. lisp 焊缝标注_基于ObjectARX的焊接符号标注系统开发
  18. Linux下shell脚本的编写
  19. 【知识蒸馏】Deep Mutual Learning
  20. 非监督分类ecognition_资管新规学习03资管产品的范围和分类

热门文章

  1. android TextView设置字体颜色
  2. android eclipse的环境配置
  3. Python常用编程模块汇总
  4. mac上搭建vue环境及webstorm新建vue项目
  5. Django框架(三)—— orm增删改查、Django生命周期
  6. C#打印0到100的素数
  7. 转:“401 - 未授权:由于凭据无效,访问被拒绝”在iis的解决办法
  8. 针对“来用”团队项目之NABC分析
  9. 国王的游戏(华东师范大学-信息学竞赛学教案)
  10. C#静态构造函数总结