一. 事件循环

1.注:

实现搭配:事件循环+回调(驱动生成器【协程】)+epoll(IO多路复用),asyncio是Python用于解决异步编程的一整套解决方案;

基于asynico:tornado,gevent,twisted(Scrapy,django channels),tornado(实现了web服务器,可以直接部署,真正部署还是要加nginx),django,flask(uwsgi,gunicorn+nginx部署)

1 importasyncio2 importtime3 async defget_html(url):4 print('start get url')5 #不能直接使用time.sleep,这是阻塞的函数,如果使用time在并发的情况有多少个就有多少个2秒

6 await asyncio.sleep(2)7 print('end get url')8 if __name__=='__main__':9 start_time=time.time()10 loop=asyncio.get_event_loop()11 task=[get_html('www.baidu.com') for i in range(10)]12 loop.run_until_complete(asyncio.wait(task))13 print(time.time()-start_time)

View Code

2.如何获取协程的返回值(和线程池类似):

1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8 return "HAHA"

9 #需要接收task,如果要接收其他的参数就需要用到partial(偏函数),参数需要放到前面

10 defcallback(url,future):11 print(url+'success')12 print('send email')13 if __name__=='__main__':14 loop=asyncio.get_event_loop()15 task=loop.create_task(get_html('www.baidu.com'))16 #原理还是获取event_loop,然后调用create_task方法,一个线程只有一个loop

17 #get_future=asyncio.ensure_future(get_html('www.baidu.com'))也可以

18 #loop.run_until_complete(get_future)

19 #run_until_complete可以接收future类型,task类型(是future类型的一个子类),也可以接收可迭代类型

20 task.add_done_callback(partial(callback,'www.baidu.com'))21 loop.run_until_complete(task)22 print(task.result())

View Code

3.wait和gather的区别:

3.1wait简单使用:

1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8

9 if __name__=='__main__':10 loop=asyncio.get_event_loop()11 tasks=[get_html('www.baidu.com') for i in range(10)]12 #wait和线程的wait相似

13 loop.run_until_complete(asyncio.wait(tasks))

View Code

协程的wait和线程的wait相似,也有timeout,return_when(什么时候返回)等参数

3.2gather简单使用:

1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8

9 if __name__=='__main__':10 loop=asyncio.get_event_loop()11 tasks=[get_html('www.baidu.com') for i in range(10)]12 #gather注意加*,这样就会变成参数

13 loop.run_until_complete(asyncio.gather(*tasks))

View Code

3.3gather和wait的区别:(定制性不强时可以优先考虑gather)

gather更加高层,可以将tasks分组;还可以成批的取消任务

1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8

9 if __name__=='__main__':10 loop=asyncio.get_event_loop()11 groups1=[get_html('www.baidu.com') for i in range(10)]12 groups2=[get_html('www.baidu.com') for i in range(10)]13 #gather注意加*,这样就会变成参数

14 loop.run_until_complete(asyncio.gather(*groups1,*groups2))15 #这种方式也可以

16 #groups1 = [get_html('www.baidu.com') for i in range(10)]

17 #groups2 = [get_html('www.baidu.com') for i in range(10)]

18 #groups1=asyncio.gather(*groups1)

19 #groups2=asyncio.gather(*groups2)

20 #取消任务

21 #groups2.cancel()

22 #loop.run_until_complete(asyncio.gather(groups1,groups2))

View Code

二. 协程嵌套

1.run_util_complete()源码:和run_forever()区别并不大,只是可以在运行完指定的协程后可以把loop停止掉,而run_forever()不会停止

2.loop会被放在future里面,future又会放在loop中

3.取消future(task):

3.1子协程调用原理:

官网例子:

解释: await相当于yield from,loop运行协程print_sum(),print_sum又会去调用另一个协程compute,run_util_complete会把协程print_sum注册到loop中。

1.event_loop会为print_sum创建一个task,通过驱动task执行print_sum(task首先会进入pending【等待】的状态);

2.print_sum直接进入字协程的调度,这个时候转向执行另一个协程(compute,所以print_sum变为suspended【暂停】状态);

3.compute这个协程首先打印,然后去调用asyncio的sleep(此时compute进入suspende的状态【暂停】),直接把返回值返回给Task(没有经过print_sum,相当于yield from,直接在调用方和子生成器通信,是由委托方print_sum建立的通道);

4.Task会告诉Event_loop暂停,Event_loop等待一秒后,通过Task唤醒(越过print_sum和compute建立一个通道);

5.compute继续执行,变为状态done【执行完成】,然后抛一个StopIteration的异常,会被await语句捕捉到,然后提取出1+2=3的值,进入print_sum,print_sum也被激活(因为抛出了StopIteration的异常被print_sum捕捉),print_sum执行完也会被标记为done的状态,同时抛出StopIteration会被Task接收

三. call_soon、call_later、call_at、call_soon_threadsafe

1.call_soon:可以直接接收函数,而不用协程

1 importasyncio2 #函数

3 defcallback(sleep_time):4 print('sleep {} success'.format(sleep_time))5 #通过该函数暂停

6 defstoploop(loop):7 loop.stop()8 if __name__=='__main__':9 loop=asyncio.get_event_loop()10 #可以直接传递函数,而不用协程,call_soon其实就是调用的call_later,时间为0秒

11 loop.call_soon(callback,2)12 loop.call_soon(stoploop,loop)13 #不能用run_util_complete(因为不是协程),run_forever找到call_soon一直运行

14 loop.run_forever()

View Code

2.call_later:可以指定多长时间后启动(实际调用call_at,时间不是传统的时间,而是loop内部的时间)

1 importasyncio2 #函数

3 defcallback(sleep_time):4 print('sleep {} success'.format(sleep_time))5 #通过该函数暂停

6 defstoploop(loop):7 loop.stop()8 if __name__=='__main__':9 loop=asyncio.get_event_loop()10 loop.call_later(3,callback,1)11 loop.call_later(1, callback, 2)12 loop.call_later(1, callback, 2)13 loop.call_later(1, callback, 2)14 loop.call_soon(callback,4)15 #loop.call_soon(stoploop,loop)

16 #不能用run_util_complete(因为不是协程),run_forever找到call_soon一直运行

17 loop.run_forever()

View Code

3.call_at:指定某个时间执行

1 importasyncio2 #函数

3 defcallback(sleep_time):4 print('sleep {} success'.format(sleep_time))5 #通过该函数暂停

6 defstoploop(loop):7 loop.stop()8 if __name__=='__main__':9 loop=asyncio.get_event_loop()10 now=loop.time()11 print(now)12 loop.call_at(now+3,callback,1)13 loop.call_at(now+1, callback, 0.5)14 loop.call_at(now+1, callback, 2)15 loop.call_at(now+1, callback, 2)16 #loop.call_soon(stoploop,loop)

17 #不能用run_util_complete(因为不是协程),run_forever找到call_soon一直运行

18 loop.run_forever()

View Code

4.call_soon_threadsafe:

线程安全的方法,不仅能解决协程,也能解决线程,进程,和call_soon几乎一致,多了self._write_to_self(),和call_soon用法一致

四. ThreadPoolExecutor+asyncio(线程池和协程结合)

1.使用run_in_executor:就是把阻塞的代码放进线程池运行,性能并不是特别高,和多线程差不多

1 #使用多线程,在协程中集成阻塞io

2 importasyncio3 importsocket4 from urllib.parse importurlparse5 from concurrent.futures importThreadPoolExecutor6 importtime7 defget_url(url):8 #通过socket请求html

9 url=urlparse(url)10 host=url.netloc11 path=url.path12 if path=="":13 path="/"

14 #建立socket连接

15 client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)16 client.connect((host,80))17 #向服务器发送数据

18 client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))19 #将数据读取完

20 data=b""

21 whileTrue:22 d=client.recv(1024)23 ifd:24 data+=d25 else:26 break

27 #会将header信息作为返回字符串

28 data=data.decode('utf8')29 print(data.split('\r\n\r\n')[1])30 client.close()31

32 if __name__=='__main__':33 start_time=time.time()34 loop=asyncio.get_event_loop()35 excutor=ThreadPoolExecutor()36 tasks=[]37 for i in range(20):38 task=loop.run_in_executor(excutor,get_url,'http://www.baidu.com')39 tasks.append(task)40 loop.run_until_complete(asyncio.wait(tasks))41 print(time.time()-start_time)

View Code

五. asyncio模拟http请求

注:asyncio目前没有提供http协议的接口

1 #asyncio目前没有提供http协议的接口

2 importasyncio3 from urllib.parse importurlparse4 importtime5

6

7 async defget_url(url):8 #通过socket请求html

9 url =urlparse(url)10 host =url.netloc11 path =url.path12 if path == "":13 path = "/"

14 #建立socket连接(比较耗时),非阻塞需要注册,都在open_connection中实现了

15 reader, writer = await asyncio.open_connection(host, 80)16 #向服务器发送数据,unregister和register都实现了

17 writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))18 #读取数据

19 all_lines =[]20 #源码实现较复杂,有__anext__的魔法函数(协程)

21 async for line inreader:22 data = line.decode('utf8')23 all_lines.append(data)24 html = '\n'.join(all_lines)25 returnhtml26

27

28 async defmain():29 tasks =[]30 for i in range(20):31 url = "http://www.baidu.com/"

32 tasks.append(asyncio.ensure_future(get_url(url)))33 for task inasyncio.as_completed(tasks):34 result =await task35 print(result)36

37

38 if __name__ == '__main__':39 start_time =time.time()40 loop =asyncio.get_event_loop()41 #tasks=[get_url('http://www.baidu.com') for i in range(10)]

42 #在外部获取结果,保存为future对象

43 #tasks = [asyncio.ensure_future(get_url('http://www.baidu.com')) for i in range(10)]

44 #loop.run_until_complete(asyncio.wait(tasks))

45 #for task in tasks:

46 #print(task.result())

47 #执行完一个打印一个

48 loop.run_until_complete(main())49 print(time.time() - start_time)

View Code

六. future和task

1.future:协程中的future和线程池中的future相似

future中的方法,都和线程池中的相似

set_result方法

不像线程池中运行完直接运行代码(这是单线程的,会调用call_soon方法)

2.task:是future的子类,是future和协程之间的桥梁

会首先启动_step方法

该方法会首先启动协程,把返回值(StopIteration的值)做处理,用于解决协程和线程不一致的地方

七. asyncio同步和通信

1.单线程协程不需要锁:

1 importasyncio2 total=03 async defadd():4 globaltotal5 for i in range(1000000):6 total+=1

7

8

9 async defdecs():10 globaltotal11 for i in range(1000000):12 total-=1

13 if __name__=='__main__':14 loop=asyncio.get_event_loop()15 tasks=[add(),decs()]16 loop.run_until_complete(asyncio.wait(tasks))17 print(total)

View Code

2.某种情况需要锁:

asyncio中的锁(同步机制)

1 importasyncio,aiohttp2 #这是并没有调用系统的锁,只是简单的自己实现(注意是非阻塞的),Queue也是非阻塞的,都用了yield from,不用用到condition【单线程】】

3 #Queue还可以限流,如果只需要通信还可以直接使用全局变量否则可以

4 from asyncio importLock,Queue5 catche={}6 lock=Lock()7 async defget_stuff():8 #实现了__enter__和__exit__两个魔法函数,可以用with

9 #with await lock:

10 #更明确的语法__aenter__和__await__

11 async with lock:12 #注意加await,是一个协程

13 #await lock.acquire()

14 for url incatche:15 returncatche[url]16 #异步的接收

17 stauff=aiohttp.request('Get',url)18 catche[url]=stauff19 returnstauff20 #release是一个简单的函数

21 #lock.release()

22

23 async defparse_stuff():24 stuff=await get_stuff()25

26 async defuse_stuff():27 stuff=await get_stuff()28 #如果没有同步机制,就会发起两次请求(这里就可以加一个同步机制)

29 tasks=[parse_stuff(),use_stuff()]30 loop=asyncio.get_event_loop()31 loop.run_until_complete(asyncio.wait(tasks))

View Code

八. aiohttp实现高并发爬虫

1 #asyncio去重url,入库(异步的驱动aiomysql)

2 importaiohttp3 importasyncio4 importre5 importaiomysql6 from pyquery importpyquery7

8 start_url = 'http://www.jobbole.com/'

9 waiting_urls =[]10 seen_urls =[]11 stopping =False12 #限制并发数

13 sem=asyncio.Semaphore(3)14

15

16 async deffetch(url, session):17 async with sem:18 await asyncio.sleep(1)19 try:20 async with session.get(url) as resp:21 print('url_status:{}'.format(resp.status))22 if resp.status in [200, 201]:23 data =await resp.text()24 returndata25 exceptException as e:26 print(e)27

28

29 defextract_urls(html):30 '''

31 解析无io操作32 '''

33 urls =[]34 pq =pyquery(html)35 for link in pq.items('a'):36 url = link.attr('href')37 if url and url.startwith('http') and url not inurls:38 urls.append(url)39 waiting_urls.append(url)40 returnurls41

42

43 async definit_urls(url, session):44 html =await fetch(url, session)45 seen_urls.add(url)46 extract_urls(html)47

48

49 async defhandle_article(url, session, pool):50 '''

51 处理文章52 '''

53 html =await fetch(url, session)54 seen_urls.append(url)55 extract_urls(html)56 pq =pyquery(html)57 title = pq('title').text()58 async with pool.acquire() as conn:59 async with conn.cursor() as cur:60 insert_sql = "insert into Test(title) values('{}')".format(title)61 await cur.execute(insert_sql)62

63

64 async defconsumer(pool):65 with aiohttp.CLientSession() as session:66 while notstopping:67 if len(waiting_urls) ==0:68 await asyncio.sleep(0.5)69 continue

70 url =waiting_urls.pop()71 print('start url:' + 'url')72 if re.match('http://.*?jobble.com/\d+/', url):73 if url not inseen_urls:74 asyncio.ensure_future(handle_article(url, session, pool))75 await asyncio.sleep(30)76 else:77 if url not inseen_urls:78 asyncio.ensure_future(init_urls(url, session))79

80

81 async defmain():82 #等待mysql连接好

83 pool = aiomysql.connect(host='localhost', port=3306, user='root',84 password='112358', db='my_aio', loop=loop, charset='utf8', autocommit=True)85 async with aiohttp.CLientSession() as session:86 html =await fetch(start_url, session)87 seen_urls.add(start_url)88 extract_urls(html)89 asyncio.ensure_future(consumer(pool))90

91 if __name__ == '__main__':92 loop =asyncio.get_event_loop()93 asyncio.ensure_future(loop)94 loop.run_forever(main(loop))

View Code

python asyncio 并发编程_asyncio并发编程相关推荐

  1. asyncio并发数_asyncio并发编程

    ThreadPoolExecutor和asyncio完成阻塞IO请求 这个小节我们看下如何将线程池和asyncio结合起来. 在协程里面我们还是需要使用多线程的,那什么时候需要使用多线程呢? 我们知道 ...

  2. python asyncio tcp转发_asyncio不通过tcp发送整个图像数据

    我正在尝试使用带TCP协议的asyncio将映像从本地计算机发送到云中的计算机.有时我收到整个图像被发送,有时只发送图像的一部分.在 客户代码import os os.environ['PYTHONA ...

  3. python线程池模块_python并发编程之进程池,线程池,协程

    需要注意一下 不能无限的开进程,不能无限的开线程 最常用的就是开进程池,开线程池.其中回调函数非常重要 回调函数其实可以作为一种编程思想,谁好了谁就去掉 只要你用并发,就会有锁的问题,但是你不能一直去 ...

  4. python线程池模块_python并发编程之进程池,线程池,协程(Python标准模块--concurrent.futures(并发未来))...

    需要注意一下 不能无限的开进程,不能无限的开线程 最常用的就是开进程池,开线程池.其中回调函数非常重要 回调函数其实可以作为一种编程思想,谁好了谁就去掉 只要你用并发,就会有锁的问题,但是你不能一直去 ...

  5. python协程 并发数量_Python-并发编程(协程)

    今天说说协程 一.引子 本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态 cpu正在运行一个任务,会在两 ...

  6. Python升级之路( Lv14 ) 并发编程初识

    Python系列文章目录 第一章 Python 入门 第二章 Python基本概念 第三章 序列 第四章 控制语句 第五章 函数 第六章 面向对象基础 第七章 面向对象深入 第八章 异常机制 第九章 ...

  7. Python全栈开发之并发编程

    No.1 线程 什么是多任务 就是操作系统可以同时运行多个任务,就是可以一边用浏览器上网,同时又可以听歌,还能再撩个×××姐,这就是多任务,操作系统会轮流把系统调度到每个核心上去执行 并发和并行 并发 ...

  8. python异步编程视频_asyncio异步编程【含视频教程】

    Python Python开发 Python语言 asyncio异步编程[含视频教程] 不知道你是否发现,身边聊异步的人越来越多了,比如:FastAPI.Tornado.Sanic.Django 3. ...

  9. 网络编程和并发编程面试题

    网络编程和并发编程面试题 1.简述 OSI 七层协议. 一.应用层 与其它计算机进行通讯的一个应用,它是对应应用程序的通信服务的.例如,一个没有通信功能的字处理程序就不能执行通信的代码,从事字处理工作 ...

最新文章

  1. ida android sign加密,最右sign-v2签名算法追踪及逆向还原
  2. count 有条件 mysql_mysql 不同条件count ,多条件count()
  3. robot连接mysql_robot连接mysql - autocar - 51Testing软件测试网 51Testing软件测试网-软件测试人的精神家园...
  4. IOCP线程池的开发-(1)
  5. html 价格计算,HTML打折计算价格实现原理与脚本代码
  6. 跨境电商ERP系统有几大功能?
  7. aix ssh服务??
  8. 佳能80d有人脸识别吗_2020单反/微单相机推荐,单反和微单(无反)选购攻略,佳能、尼康、索尼、富士、松下相机...
  9. php+彩票中奖判断,彩票算法 – PHP – 数学似乎不错,但功能是否有效?
  10. Matlab Tricks(九)—— 矩阵行/列的增广(删除)
  11. 傲腾内存linux安装,装机不求人:Intel傲腾内存安装教程
  12. sql 2000 数据库置疑
  13. ui自动化html模板,UI自动化学习分享ppt模板
  14. 把java代码导成pdf_Java将Excel导出成pdf文件
  15. git-bash的alias别名设置
  16. .net famework 版本过低,请升级至4.6.2或更新版本
  17. Linux设置登录密码错误次数限制
  18. 最短路径问题——Dijkstra算法详解(单源最短路径)
  19. 删除流氓软件 Alibaba PC Safe Service
  20. 创龙科技全志T3国产评估板(4核ARM Cortex-A7)-性能及参数资料

热门文章

  1. HDU 6188 Duizi and Shunzi
  2. Hession矩阵与牛顿迭代法
  3. svn的使用(Mac)
  4. 错误:readline/readline.h:没有那个文件或目录解决方法
  5. Spark Streaming源码分析 – DStream
  6. linux 网卡驱动安装
  7. Linux 的mv命令
  8. 配置IPsec on GRE Tunnel with IOS Firewall and NAT
  9. vs设计窗口不见了_碳纤维的巅峰:VS沛纳海616V3
  10. Sqlserver备份存储过程