前言

使用redis队列存放消息时,我们通常用rpop,lpop,或者brpop取出队列中存放的数据。

同步阻塞模型

同步阻塞模型也就是,代码从上到下按顺序执行,遇到函数调用,则调用函数,阻塞等待结果返回,然后
继续循环调用。

该方法有哪些缺点呢?

  1. 对cpu的利用率低下,也就是消费速度低于生成速度,容易造成队列堵塞,从而造成
    消息丢失等一系列问题。

python实现代码如下

from redis import Redis
import timedef analyse(data):"""数据分析器"""time.sleep(2) #模拟解析数据所需时间return datadef run():cli = Redis(host='127.0.0.1', db=0)while True:# 方法一# 同步阻塞程序item = cli.brpop('test', 0)result = analyse(bytes.decode(item[1]))

同步非阻塞模型

为了解决第一种模型带来的性能低下问题,我们可以实例化多个消费者,并发去消费信息,或者通过多线程,
实现主线程取消息,子线程消费消息。这也是目前最容易被接纳的方案。

该方法有哪些优缺点呢?

优点:

  1. 充分利用cpu的性能,在java等其他语言上,甚至能利用多核cpu的优势,在同一时间同时并发执行,但在
    python上,由于全文解析器锁(GIL)锁对象的存在,cpu每刻只能保持一个线程在运行。

缺点:

  1. 线程切换也是需要时间的
  2. 每个线程都占用内存

因此,对于内存占用不太关心的服务,可以采用这种方案。

from concurrent.futures import ThreadPoolExecutor
import time
from redis import Redis
cli = Redis(host='127.0.0.1', db=0)def callback(future):# 模拟持久化到数据库time.sleep(1)print(future.result())def analyse(data):"""数据分析器"""time.sleep(2) #模拟解析数据所需时间return datadef run():pool = ThreadPoolExecutor(max_workers=4) #线程池,最多四个线程while True:# 方法二try:# 多线程处理# 当前肯定是个阻塞操作,不过没关系,主线程负责取数据,子线程负责解析数据,添加回调完成写入数据库item = cli.brpop('test', 0)future = pool.submit(analyse, bytes.decode(item[1]))future.add_done_callback(callback)except:pool.shutdown(wait=True)

异步非阻塞模型

上面介绍的第二种办法,已经可以让消费速度跟上生产速度了。为了将cpu的性能压缩到极致,同时,节省内存,下面,我们可以尝试去
编写一个异步非阻塞的模型,实现对消息的消费。

开始之前,我们先了解下异步io
所谓异步io,即是进行io操作的时候,不会等待结果方法,继续执行其他的事情,结果返回了,可以通过回调提醒应用程序处理。

在操作系统上,实现异步io,有以下三大底层模块可供选择

  1. select 文件描述符fd,存放在数组中,最大可存储1024个文件描述符,通过循环遍历的方式,取出就绪的可读或者可写的消息
  2. poll 文件描述符fd,存放在链表中,没有存储限制,和select一样,通过循环遍历的方式,取出就绪的可读或者可写的消息
  3. epoll 文件描述符fd,没有存储限制,通过回调的方法,取出可读或者可写的消息,相比于select和poll,性能最高

我们看下实例,使用select和socket,编写一个简单的异步非阻塞的通信程序

server端程序

import socket
import selectsk1 = socket.socket()
sk1.bind(('0.0.0.0', 8001))
sk1.listen()sk2 = socket.socket()
sk2.bind((('0.0.0.0', 8002)))
sk2.listen()sk3 = socket.socket()
sk3.bind(('0.0.0.0', 8003))
sk3.listen()inputs  =[sk1, sk2, sk3]while True:r_list, w_list, e_list = select.select(inputs, [], inputs, 1)for sk in r_list:print(sk)conn, address = sk.accept()conn.sendall(bytes('hello', encoding='utf-8'))conn.close()for sk in e_list:inputs.remove(sk)

client端程序

import socketobj = socket.socket()
obj.connect(('127.0.0.1', 8081))while True:inp = input('>>>')print(inp)obj.sendall(bytes(inp, encoding='utf-8'))ret = str(obj.recv(1024),encoding='utf-8')print(ret)obj.close()

回到我们的redis,要先实现同一线程中,我们一边取出消息,另一边,在解析信息的信息,不会发生堵塞。我们需要了解到协程知识

所谓协程,也就是他能记住代码上一次的运行位置,在进行io操作的时候,可以不必等待,跳到另一个方法执行,当结果返回时,可以跳回到当前方法,得到结果继续往下运行

import asyncio
from redis import Redis
cli = Redis(host='127.0.0.1', db=0)async def analyse(data):#time.sleep(2)await asyncio.sleep(2) # 模拟解析所需时间return data# 方法三
# 只开一个线程完成异步操作,效果相当于多线程的同步操作async def task():loop = asyncio._get_running_loop()while True:fut = loop.create_future() #创建future任务item = cli.brpop('test', 0)loop.create_task(set_after(fut, bytes.decode(item[1]))) #通过yield返回#fut = loop.run_in_executor(None, analyse,bytes.decode(item[1]) ) # 底层实际是通过多线程处理yield futasync def set_after(future, data):result = await analyse(data)future.set_result(result)async def main():tasks = []async for data in task():tasks.append(data)if len(tasks) >= 5: #通过设置5个任务,同时执行,本来一个任务执行需要2秒,5个任务同时执行,理论上也需要2秒result = await asyncio.gather(*tasks)print(result)tasks.clear()# print(data.result())if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(main())

redis队列消费者性能提升优化相关推荐

  1. 阿里云Redis多线程性能提升思路解析

    摘要: Redis做为高性能的K-V数据库,由于其高性能,丰富的数据结构支持,易用等特性,而得到广泛的应用.但是由于redis单进程单线程的模型限制,单Redis Server QPS最高只能达到10 ...

  2. 我是如何将一个老系统的kafka消费者服务的性能提升近百倍的

    大家好,又见面了~ kafka作为一种高吞吐量的分布式发布订阅消息系统,在业务系统中被广泛的使用. 如果问你,如何提高kafka队列中的消息消费速度呢? 答案很简单,topic多分几个分片,然后使用消 ...

  3. 初探性能优化--2个月到4小时的性能提升!

    作者:闪客sun  |  博客园 https://www.cnblogs.com/flashsun 一直不知道性能优化都要做些什么,从哪方面思考,直到最近接手了一个公司的小项目,可谓麻雀虽小五脏俱全. ...

  4. 初探性能优化——2个月到4小时的性能提升

    一直不知道性能优化都要做些什么,从哪方面思考,直到最近接手了一个公司的小项目,可谓麻雀虽小五脏俱全.让我这个编程小白学到了很多性能优化的知识,或者说一些思考方式.真的感受到任何一点效率的损失放大一定倍 ...

  5. Hologres揭秘:优化COPY,批量导入性能提升5倍+

    简介:揭秘Hologres优化COPY的技术原理,实现批量导入性能提升5倍+ Hologres(中文名交互式分析)是阿里云自研的一站式实时数仓,这个云原生系统融合了实时服务和分析大数据的场景,全面兼容 ...

  6. Redis 6.0 如何实现大幅度的性能提升?

    导读: Redis可以轻松支撑100k+ QPS,离不开基于Reactor模型的I/O Multiplexing,In-memory操作,以及单线程执行命令避免竞态消耗.尽管性能已经能满足大多数应用场 ...

  7. OPPO百万级高并发MongoDB集群性能数十倍提升优化实践

    点击蓝色"架构文摘"关注我哟 加个"星标",每天上午 09:25,干货推送! 1. 背景 线上某集群峰值TPS超过100万/秒左右(主要为写流量,读流量很低), ...

  8. GEMM性能提升200倍,AutoKernel算子优化工具正式开源

    作者 | OPEN AI LAB 研究员 吕春莹 出品 | AI科技大本营 头图 | CSDN下载自视觉中国 随着AI技术的快速发展,深度学习在各个领域得到了广泛应用.深度学习模型能否成功在终端落地应 ...

  9. 30分钟3300%性能提升—python+memcached网页优化小记

    前言 本来我一直不知道怎么来更好地优化网页的性能,然后最近做python和php同类网页渲染速度比较时,意外地发现一个好方法:直接像某些php应用比如Discuz论坛那样,在生成的网页中打印出&quo ...

最新文章

  1. 在生产集群上运行topology
  2. python怎么写文件-python头文件怎么写
  3. 跨域、ContentType组件
  4. android 原理 组合控件_Android自定义控件进阶01-自定义控件开发套路与流程
  5. 高级用户 java_java高级-基本
  6. 使用 Linux 系统调用的内核命令
  7. Understanding Generative Adversarial Networks (GAN)
  8. dell的1501和640m,买哪个好呢?
  9. CPU/ABI显示No system images installed for this target的解决方案
  10. c++ class struct同名_C/C++面向对象编程之封装
  11. retrofit 2.0 android 教程,初识Retrofit2.0
  12. has been blocked by CORS policy: No ‘Access-Control-Allow-Origin‘ header is present---nginx工作笔记006
  13. RabbitMQ的工作模式Topics  通配符,test测试代
  14. HSQL转换成MapReduce过程
  15. Navicate 如何导出数据库中的存储过程、事件、视图等?
  16. pandas中read_csv的缺失值处理
  17. 有什么计算机应用基础的app,计算机应用基础软件
  18. 多台机器同步Windows上的文件 -cwRsync 同步问题
  19. 中科大计算机本科毕业论文范文,中科大本科生毕业论文格式.pdf
  20. DFA(deterministic finite automaton )有限状态机概念

热门文章

  1. [Vulhub] Nginx漏洞
  2. 开发版无法发送短信的问题
  3. iPhoneX官网停售了,价格会暴跌吗?
  4. 泰克Tektronix MDO3022混合域示波器
  5. Spring原理/SpringMVC原理/IOC/AOP原理
  6. 带有界面的12306!无限自动查询并购票的脚本!年关买票了吗
  7. fegin接口下载文件
  8. C# winform框架 音乐播放器开发 联网下载音乐功能的实现原理及源码(纯原创--)
  9. python批量命名教程_《自拍教程69》Python 批量重命名音频文件,AV专家必备!
  10. Elasticsearch集群安装部署