最近在工作中遇到了一个场景是这样的:

每到月初我们需要向上个月考勤有异常的同学的企业微信推送异常考勤提醒,让有异常的同学及时处理:补卡或者提交对应的请假申请等等。之前的做法是直接循环数据库,查处有异常的同学的考勤数据,然后推送给到相关的同学。一次推送的数据量在1500左右。

这种方法存在的问题,因为是通过同步的定时任务的方式,会因为数据量太大导致定时任务执行超时,导致事务回滚,数据库中未创建对应消息的记录,所以导致消息无法确认。

在发现这个问题之后,我们的第一反应是把定时任务做成异步的,修改的实现方式是,添加一个异步的装饰器,在定时任务触发之后,在后台另外开启一个线程执行查找异常考勤和发送异常消息。

当然这种可以解决问题,但不是最优解,所以在新版本的时候,我们就考虑使用消息队列MQ的方式来解决消息的推送问题,最开始我的设想是使用RabbitMQ的发布订阅模式来解决,但是在和公司运维了解情况后发现,公司使用的腾讯云对RabbitMQ的支持并不是很好,和运维沟通后决定使用Redis来实现消息队列。

由于之前也没有基于Redis的实践,所以我其实是在网上找了一些资料,才把问题解决。

先说结论,基于Redis的实现其实是相当简单的,网上也有现成的代码可供参考,我在post出相关代码之后,会列出我在实现的过程中遇到哪些坑。

代码实现:

文件1:先写一个获取Redis配置的类:

import redis
import logging
logger = logging.getLogger(__name__)class RedisHelper(object):"""host: redis ipport: redis portchannel: 发送接送消息的频道"""def __init__(self, host, port, db, channel, password=None):self.host = hostself.port = portself.db = dbself.password = passwordself.channel = channelself.__conn = redis.Redis(self.host,self.port,self.db,self.password,decode_responses=True)def ping(self):try:self.__conn.ping()return Trueexcept Exception as e:logger.exception(f"连接Redis失败,失败原因为:{e}")return False# 发送消息def public(self,msg):if self.ping():self.__conn.publish(self.channel,msg)return Trueelse:return False# 订阅def subscribe(self):if self.ping():pub = self.__conn.pubsub()pub.subscribe(self.channel)pub.parse_response()return pubelse:return False

文件2: 业务文件:发布方

config = self.env.ref('sf_conference_management.wxagent_config_detail_conference_default').sudo()
obj = RedisHelper(host=config.redis_host, port=config.redis_port, db=14, channel='channel:1', password=config.redis_password)
for user_id, message in message_dict.items():info = {'title': f'{first.month}月考勤确认','description': message,'url': url,'user_id': user_id.id,'task_id': str(int(time.time() * 1000)) + ''.join([random.choice('0123456789') for _ in range(5)]),'btn': [{'key': 'confirm', 'name': '确认无误'}]}_logger.info(f'写入信息为:{info}')obj.public(msg=json.dumps(info))

把每一条信息写入到redis的消息队列中

接收方:

@api.model
def sub_info(self):"""消费消息"""config = self.env.ref('sf_conference_management.wxagent_config_detail_conference_default').sudo()obj = RedisHelper(host=config.redis_host, port=config.redis_port, db=14, channel='channel:1',password=config.redis_password)sub_obj = obj.subscribe()while True:if sub_obj:msg = sub_obj.parse_response()info_dict = json.loads(msg[2])user_id = info_dict.pop('user_id')user_obj = self.env['res.users'].browse(user_id)_logger.info(f"发送考勤异常消息给{user_obj.name}.....")self.send_qywx_msg(user_obj, msgtype='interactive_taskcard', parameter={'interactive_taskcard': info_dict})self.env.cr.commit()time.sleep(0.1)else:break

这个地方,接受方我做了个装饰器,是为了可以在定时任务里面调用该函数。

代码写好之后,我发现先执行发送消息的定时任务,然后再点击获取信息的定时任务发现,并没有把消息实际发送出去,这让我一度怀疑是不是代码写错了,直到第二天我无意间先点击接收方,然后再点击发送方之后,有意思的事情发生了,消息开始一条条的发出来,所以这个地方的逻辑其实是如下图这样的。

 必须要在先开启接收方之后,再发送之后才能及时的都发送出去。大家切记。

 

使用Redis搭建消息队列(python版)相关推荐

  1. rocketmq python 某个队列不消费_消息队列 RocketMQ 版消息轨迹没有显示消费信息,为什么?...

    关于 消息队列 RocketMQ 版消息轨迹没有显示消费信息,为什么?的搜索结果 回答 2021一月拼团已有400余人拼团成功最低一折 点击进入:一月新人专场 服务器配置时间价格1核2G1年84元1核 ...

  2. ​redis实现消息队列

    redis是一个开源的key-value存储系统.与Memcached类似,Redis将大部分数据存储在内存中,支持的数据类型包括:字符串.哈希表.链表.集合.有序集合以及基于这些数据类型的相关操作. ...

  3. 用redis实现消息队列(实时消费+ack机制)【转】

    用redis实现消息队列(实时消费+ack机制) java queue 消息队列 redis 消息队列 首先做简单的引入. MQ主要是用来: 解耦应用. 异步化消息 流量削峰填谷 目前使用的较多的有A ...

  4. 使用Redis 实现消息队列

    一 .为什么要用Redis实现轻量级MQ? MQ的主要作用: 应用解耦 异步化消息 流量削峰填谷 目前使用比较多的是ActiveMQ . RabbitMQ . ZeroMQ . Kafka . Met ...

  5. posix自己搭建消息队列_蘑菇街消息系统上云实践

    小编又来啦-本周要推荐给大家的是一篇跟中间件上云相关的技术文章,这里面详细的记录了,蘑菇街自研消息系统上云的全过程,也是市面上开放出来为数不多的企业自研组件上云实践.有相关需求的同学可以好好学习下. ...

  6. 基于NODE.JS与KUE搭建消息队列[转]

    转自http://blog.xiamingxing.com/archives/262 基于node.js与kue搭建消息队列 xiamingxingnodejs0 Comment 背景 在计算机科学中 ...

  7. Redis做消息队列,香吗?

    来自:架构师修行之路 菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的 ...

  8. 【springboot】【redis】springboot+redis实现发布订阅功能,实现redis的消息队列的功能...

    springboot+redis实现发布订阅功能,实现redis的消息队列的功能 参考:https://www.cnblogs.com/cx987514451/p/9529611.html 思考一个问 ...

  9. PHP + Redis 实现消息队列

    Redis做消息队列的好处在于它的轻量级,高并发,延迟敏感,应用场景有 即时数据分析.秒杀计数器.缓存等 Redis做消息队列待解决的问题: 1.消息的可靠性: 没有相应的机制保证消息的消费,当消费者 ...

最新文章

  1. 【Qt】通过QtCreator源码学习Qt(三):linux平台的信号、程序崩溃处理
  2. GridView复合多层表头(不限级)!!! (转)
  3. sharp扫地机器人讲话_扫地机机器人,智能扫地机器人推荐
  4. iostat 命令查看linux磁盘I/O
  5. MySQL5.5加主键锁读问题【转】
  6. 【R】语言第一课-----安装
  7. js中==和===的区别
  8. 6 PP配置-生产主数据-工作中心相关-工作中心标准值参数
  9. BZOJ 1228: [SDOI2009]ED(SG定理)
  10. HTML5---offline application(application cache)
  11. matlab实现sift,SIFT算法的Matlab实现
  12. 如何实现USB自动挂载?
  13. PS抠发丝简单详细方法
  14. PS驴头案例,熟悉形状工具
  15. 解决Android图片剪切返回崩溃问题
  16. php unlink 无法删除,php unlink()删除文件实例讲解
  17. Excise_day03
  18. 飞行汽车能顺利上天吗?
  19. 深度Linux安装火狐,deepin或Ubuntu安装最新版Firefox,并设置去掉标题栏
  20. 土方量方lisp_时隔3年,再做双倍超立方数的题目,这次用Lisp

热门文章

  1. 如何使用origin间隔提取数据
  2. linux下常用压缩命令
  3. 求安卓OTG链接尼康相机获取照片解决方案
  4. SGL系列 LED调光电路设计 ANBOZ
  5. python socketio_Python socketio.Server方法代码示例
  6. excel2019锁定保护指定单元格的方法步骤
  7. mysql检索过程_mysql索引原理详解
  8. Windows 7下阻止系统关机
  9. 微信小程序,密码重置
  10. js物体运动-图标向上运动再从下方出现运动到原位置