线程是执行线程的缩写。程序员可以将他的工作拆分到线程中,这些线程同时运行并共享同一内存上下文。
多线程应该在多处理器或者多核机器上执行,将每个CPU核上并行化每个线程执行,从而使程序更快。
竞争冒险:
如果两个线程更新相同的没有任何保护的数据,则会发生竟态条件。
而锁机制有助于保护数据,在多线程编程中总要确保以安全的方式访问资源。
死锁:
两个线程锁一个资源,并尝试获取另外一个线程锁定的资源,他们讲永远彼此等待的情况叫死锁。

python多线程

多线程应用案例:假设需要使用多个查询从一些web服务器获取数据,这些查询无法通过单个大型的HTTP请求批量处理
使用API的地理编码端点解析。
单线程:

import time
api = {"Reykjavik":"123,231","Vien":"234,342","Zadar":"334,345","Venice":"345,243","Wrocla":"45,34","Bolognia":"895,569","Berlin":"56,23","Slubice":"2,3","NewYork":"345,6"}
PLACES = ('Reykjavik','Vien','Zadar','Venice','Wrocla','Bolognia','Berlin','Slubice','NewYork'
)
def fetch_place(place):gecode = api[place]print("place:",place,"gecode:",gecode)def main():for place in PLACES:fetch_place(place)if __name__ == '__main__':main()

多线程:加入线程池。
启动一些预定义数量的线程,从队列中消费工作项,直到完成。当没有其他工作要做,线程将返回。

import time
from queue import Queue,Empty
from threading import Thread
THREAD_POOL_SIZE=4def worker(work_queue):while not work_queue.empty():try:item = work_queue.get(block=False)except Empty:breakelse:fetch_place(item)work_queue.task_done()api = {"Reykjavik":"123,231","Vien":"234,342","Zadar":"334,345","Venice":"345,243","Wrocla":"45,34","Bolognia":"895,569","Berlin":"56,23","Slubice":"2,3","NewYork":"345,6"}
PLACES = ('Reykjavik','Vien','Zadar','Venice','Wrocla','Bolognia','Berlin','Slubice','NewYork'
)
def fetch_place(place):gecode = api[place]print("place:",place,"gecode:",gecode,"\n")def main():work_queue = Queue()for place in PLACES:work_queue.put(place)threads = [Thread(target=worker,args=(work_queue,))for _ in  range(THREAD_POOL_SIZE)]for thread in threads:thread.start()work_queue.join()while threads:threads.pop().join()if __name__ == '__main__':main()


从打印结果可以看出输出有点问题,更好的方案是启动另外的线程打印,而不是在主线程中进行。我们可以通过一种队列来实现,这个队列主要负责从我们的工作线程收集结果。

import time
from queue import Queue,Empty
from threading import Thread
THREAD_POOL_SIZE=4def worker(work_queue,result_queue):while not work_queue.empty():try:item = work_queue.get(block=False)except Empty:breakelse:result_queue.put(fetch_place(item))work_queue.task_done()api = {"Reykjavik":"123,231","Vien":"234,342","Zadar":"334,345","Venice":"345,243","Wrocla":"45,34","Bolognia":"895,569","Berlin":"56,23","Slubice":"2,3","NewYork":"345,6"}
PLACES = ('Reykjavik','Vien','Zadar','Venice','Wrocla','Bolognia','Berlin','Slubice','NewYork'
)
def fetch_place(place):geo_dict = []gecode = api[place]geo_dict.append((place,gecode))return geo_dictdef present_result(geocode):print(geocode)def main():work_queue = Queue()result_queue = Queue()for place in PLACES:work_queue.put(place)threads = [Thread(target=worker,args=(work_queue,result_queue))for _ in  range(THREAD_POOL_SIZE)]for thread in threads:thread.start()work_queue.join()while threads:threads.pop().join()while not result_queue.empty():present_result(result_queue.get())if __name__ == '__main__':main()

处理错误与速率限制

有时候会遇到外部服务器提供商施加的速率限制。当客户端超过请求速率会抛出异常,但是这个异常是单独引起的,不会导致整个程序崩溃,工作线程会立即退出,但是主线程将等待work_queue上存储的所有任务完成(使用work_queue.join()调用)。我们的线程应该尽可能的处理异常,并确保队列中的所有项目都会被处理。如果不做进一步处理改进,以上的案例一旦遇到异常,一些工作线程将崩溃,并且不会退出。
针对以上问题,可以在result_queue队列中设置一个错误实例,并将当前任务标记已完成,与没有错误时一样。这样,我们确保主线程在work_queue.join()中等待时间不会无限期地锁定。主线程然后可以检查结果并重新提出在结果队列中发现的任何异常。

import time
from queue import Queue,Empty
from threading import Thread
THREAD_POOL_SIZE=4def worker(work_queue,results_queue):while True:try:item = work_queue.get(block=False)except Empty:breakelse:try:result = fetch_place(item)except Exception as err:results_queue.put(err)else:results_queue.put(result)finally:work_queue.task_done()api = {"Reykjavik":"123,231","Vien":"234,342","Zadar":"334,345","Venice":"345,243","Wrocla":"45,34","Bolognia":"895,569","Berlin":"56,23","Slubice":"2,3","NewYork":"345,6"}PLACES = ('Reykjavik','Vien','Zadar','Venice','Wrocla','Bolognia','Berlin','Slubice','NewYork'
)def fetch_place(place):geo_dict = []gecode = api[place]geo_dict.append((place,gecode))return geo_dictdef present_result(geocode):print(geocode)def main():work_queue = Queue()results_queue = Queue()for place in PLACES:work_queue.put(place)threads = [Thread(target=worker,args=(work_queue,results_queue))for _ in  range(THREAD_POOL_SIZE)]for thread in threads:thread.start()work_queue.join()while threads:threads.pop().join()while not results_queue.empty():result = results_queue.get()if isinstance(result,Exception):raise resultpresent_result(result)if __name__ == '__main__':main()

对工作速度的限制成为节流。
令牌桶算法:

  • 存在具有预定量的令牌桶
  • 每个令牌响应单个权限以处理一项工作
  • 每次工作者要求一个或者多个令牌权限时
    – 我们测量从上次重新装满桶所花费的shijan
    – 如果时间差允许,用这个时间差响应的令牌量重新填充桶
    – 如果存储的令牌的数量大于或者等于请求的数量,我们减少存储的令牌的数量并且返回那个值;
    – 如果存储的令牌的数量小于请求的数量,我们返回零
import time
from queue import Queue,Empty
from threading import Thread
from threading import Lock
THREAD_POOL_SIZE=4class Throttle(object):def __init__(self,rate):self._consume_lock = Lock()self.rate = rateself.tokens = 0self.last = 0def consume(self,amount = 1):with self._consume_lock:now = time.time()# 时间测量在第一个令牌请求上初始化以避免初始突变if self.last == 0:self.last = nowelapsed = now - self.last# 请确认传递时间的量足够大一添加新的令牌if int(elapsed*self.rate):self.tokens+=int(elapsed*self.rate)self.last = now# 不要过度装满桶self.tokens = (self.rateif self.tokens>self.rateelse self.tokens)# 如果可用最终分派令牌if self.tokens>=amount:self.tokens-=amountelse:amount=0return amountdef worker(work_queue,results_queue,throttle):while True:try:item = work_queue.get(block=False)except Empty:breakelse:while not throttle.consume():passtry:result = fetch_place(item)except Exception as err:results_queue.put(err)else:results_queue.put(result)finally:work_queue.task_done()api = {"Reykjavik":"123,231","Vien":"234,342","Zadar":"334,345","Venice":"345,243","Wrocla":"45,34","Bolognia":"895,569","Berlin":"56,23","Slubice":"2,3","NewYork":"345,6"}PLACES = ('Reykjavik','Vien','Zadar','Venice','Wrocla','Bolognia','Berlin','Slubice','NewYork'
)def fetch_place(place):geo_dict = []gecode = api[place]geo_dict.append((place,gecode))return geo_dictdef present_result(geocode):print(geocode)def main():work_queue = Queue()results_queue = Queue()throttle = Throttle(10)for place in PLACES:work_queue.put(place)threads = [Thread(target=worker,args=(work_queue,results_queue,throttle))for _ in  range(THREAD_POOL_SIZE)]for thread in threads:thread.start()work_queue.join()while threads:threads.pop().join()while not results_queue.empty():result = results_queue.get()if isinstance(result,Exception):raise resultpresent_result(result)if __name__ == '__main__':main()

[并发编程专题]多线程相关推荐

  1. 并发编程专题——第一章(深入理解java内存模型)

    说到并发编程,其实有时候觉得,开发中真遇到这些所谓的并发编程,场景多吗,这应该是很多互联网的在职人员,一直在考虑的事情,也一直很想问,但是又不敢问,想学习的同时,网上这些讲的又是乱七八糟,那么本章开始 ...

  2. c+++11并发编程语言,C++11并发编程:多线程std:thread

    原标题:C++11并发编程:多线程std:thread 一:概述 C++11引入了thread类,大大降低了多线程使用的复杂度,原先使用多线程只能用系统的API,无法解决跨平台问题,一套代码平台移植, ...

  3. week6 day4 并发编程之多线程 理论

    week6 day4 并发编程之多线程 理论 一.什么是线程 二.线程的创建开销小 三.线程和进程的区别 四.为何要用多线程 五.多线程的应用举例 六.经典的线程模型(了解) 七.POSIX线程(了解 ...

  4. Java面试系列之并发编程专题-Java线程池灵魂拷问

    金三银四跳槽季即将来临,想必有些猿友已经蠢蠢欲动在做相关的准备了!在接下来的日子里,笔者将坚持写作.分享Java工程师在面试求职期间的方方面面,包括简历制作.面试场景复现.面试题解答.谈薪技巧 以及 ...

  5. 安琪拉教百里守约学并发编程之多线程基础

    <安琪拉与面试官二三事>系列文章 一个HashMap能跟面试官扯上半个小时 一个synchronized跟面试官扯了半个小时 <安琪拉教鲁班学算法>系列文章 安琪拉教鲁班学算法 ...

  6. Java 多线程与并发编程专题

    Java 线程基础 Java 多线程开发 线程安全与同步 并发控制 非阻塞套接字(NIO) Java 5 中的并发 JDK 7 中的 Fork/Join 模式 相关书评 Java 平台提供了一套广泛而 ...

  7. 【Java 并发编程】多线程、线程同步、死锁、线程间通信(生产者消费者模型)、可重入锁、线程池

    并发编程(Concurrent Programming) 进程(Process).线程(Thread).线程的串行 多线程 多线程的原理 多线程的优缺点 Java并发编程 默认线程 开启新线程 `Ru ...

  8. 并发编程(一)多线程基础和原理

    多线程基础 最近,准备回顾下多线程相关的知识体系,顺便在这里做个记录. 并发的发展历史 最早的计算机只能解决简单的数学运算问题,比如正弦. 余弦等.运行方式:程序员首先把程序写到纸上,然后穿 孔成卡片 ...

  9. Java并发编程进阶——多线程的安全与同步

    多线程的安全与同步 多线程的操作原则 多线程 AVO 原则 A:即 Atomic,原子性操作原则.对基本数据类型变量的读和写是保证原子性的,要么都成功,要么都失败,这些操作不可中断. V:即 vola ...

最新文章

  1. C语言局部搜索算法(爬山法,模拟退火法,遗传算法)求解八皇后问题
  2. 微信应用号开发知识贮备之altjs官方实例初探
  3. ASP.Net 2.0 发送邮件的代码
  4. 给力!一行代码躺赚普通程序员10年薪资!
  5. python使用imbalanced-learn的SVMSMOTE方法进行上采样处理数据不平衡问题
  6. 初学python-字符串中引号的使用、input简介、强制类型转换、加减乘除简写、条件判断...
  7. WildFly 报错 java.lang.NoClassDefFoundError
  8. 强迫症慎入:一大票让人看哭的音量键设计即将袭来
  9. python元组支持双向索引吗_2、Python列表和元组
  10. 基于FTP的Nordic nRF51822 OTA 教程一之修改boot区大小
  11. 【完整代码】Scala akka入门示例
  12. PHP的postman的bulk edit小功能:可以直接复制浏览器query string parameters的数据至postman的body的form-data 很方便 不用手写了
  13. Python day 9(6) 调试
  14. 利用Serverless Kubernetes和Kaniko快速自动化构建容器镜像
  15. sparkSql使用hive数据源
  16. C语言2019软件,c语言模拟编程学习软件v2019 最新版
  17. 网络安全实验室CTF—选择题解析 writeup
  18. Markdown的书写方法
  19. 从Python看C语言(列表字典篇)
  20. Python | 判定IP地址合法性的四种方法

热门文章

  1. Docker搭建私服
  2. javascript的异步执行顺序---管中窥豹
  3. 仿慕课网视频播放界面协调布局
  4. java date与sql.date转换
  5. Excel导入sql语句模板,解决转换时间戳问题
  6. 合并excel不同工作簿同名工作表
  7. 太强大了 Python居然能用来做这么多游戏,附赠所有游戏源码文件
  8. [编程神域 C语言浮游塔 第⑤期]内存地址——指针
  9. 文本对比工具推荐 winMerge
  10. html5 collapse,关于css中的border-collapse