go实现的消息队列 redis_queue

  • 支持并发队列
  • 支持Topic,Group注册队列监听消息
  • 支持消息异常监听重启队列

贴相关核心代码,最后附上git链接

//消息载体
type QueuePayload struct {ID       string      `json:"id"`IsFast   bool        `json:"is_fast"`Topic    string      `json:"topic"`Group    string      `json:"group"`Body     interface{} `json:"body"`
}var instanceQueueManager *QueueManager
var onceQueueManager sync.Once//队列管理器
type QueueManager struct {db             *redis.ClientMaxRetry       int                        //重试的最大次数RecoverCh      chan RecoverData            //队列恢复监听通道Handlers       map[string]interface{} //注册队列Map
}//初始化队列管理器
func NewQueueManager() *QueueManager {onceQueueManager.Do(func() {instanceQueueManager = &QueueManager{}instanceQueueManager.MaxRetry = 3instanceQueueManager.Handlers = make(map[string]interface{})})return instanceQueueManager
}func (r *QueueManager) GetQueueName(topic string, group string) string {var name stringif len(group) > 0 {name = fmt.Sprintf("Queue_%s::%s", topic, group)} else {name = fmt.Sprintf("Queue_%s", topic)}return name
}//注册队列
func (r *QueueManager) RegisterQueue(topic string, group string, handler interface{}) error{name := r.GetQueueName(topic, group)if _, ok := r.Handlers[name]; ok {return errors.New("is exits")}else{r.Handlers[name] = handlergo r.QueueConsume(topic, group)}return nil
}//生产者执行入队列
func (r *QueueManager) QueuePublish(payload *QueuePayload) error {if len(payload.Topic) <= 0 {return errors.New("TopicId can not be empty")}id, err := uuid.NewUUID()if err != nil {return err}payload.ID = id.String()payloadStr, _ := json.Marshal(payload)r.db.LPush(r.GetQueueName(payload.Topic, payload.Group), payloadStr)return nil
}//消费者执行出队列
func (r *QueueManager) QueueConsume(topic string, group string) {defer func() {if err := recover(); err != nil {var stacktrace stringfor i := 1; ; i++ {_, f, l, got := runtime.Caller(i)if !got {break}stacktrace += fmt.Sprintf("%s:%d\n", f, l)}// when stack finisheslogMessage := fmt.Sprintf("Trace: %s\n", err)logMessage += fmt.Sprintf("\n%s", stacktrace)log.Println(logMessage)//执行恢复函数r.handleRecover(topic, group)}}()for {//消费者执行出列var payload QueuePayloadresult := r.db.BRPop(0, r.GetQueueName(topic, group))if (len(result.Val()) > 0) {vals := result.Val()[1]err := json.Unmarshal([]byte(vals), &payload)if err != nil {log.Println("BRPOP json.Unmarshal Error:", err)continue}//执行回调函数r.handleCallBack(&payload)}}
}//执行恢复函数
func (r *QueueManager) handleRecover(topic string, group string) {handleName := r.GetQueueName(topic, group)handler, ok := r.Handlers[handleName]if r.RecoverCh != nil && ok{r.RecoverCh <- RecoverData{topic, group, handler}}
}//执行回调函数
func (r *QueueManager) handleCallBack (payload *QueuePayload){handleName := r.GetQueueName(payload.Topic, payload.Group)it := r.Handlers[handleName]if it != nil {if ob, ok := it.(Queueable); ok {//同步执行Max次,保证队列顺序,失败则丢弃消息,for i:=0; i< r.MaxRetry; i++ {rs := ob.Execute(payload)if rs.State{break}}}else{log.Println("no ExecuteFunc,pop:", payload)}}
}

上述完整代码链接: git链接

参考的代码:https://github.com/bennya8/go_redis_queue_manager

go实现的redis消息队列相关推荐

  1. Redis消息队列发展历程

    简介:Redis是目前最受欢迎的kv类数据库,当然它的功能越来越多,早已不限定在kv场景,消息队列就是Redis中一个重要的功能.Redis从2010年发布1.0版本就具备一个消息队列的雏形,随着10 ...

  2. Redis 消息队列的三种方案(List、Streams、Pub/Sub)

    现如今的互联网应用大都是采用 分布式系统架构 设计的,所以 消息队列 已经逐渐成为企业应用系统 内部通信 的核心手段,它具有 低耦合.可靠投递.广播.流量控制.最终一致性 等一系列功能. 当前使用较多 ...

  3. 如何保证 Redis 消息队列中的数据不丢失?

    Redis 最常见的业务场景就是缓存读取与存储,而随着时间的推移,有人开始将它作为消息队列来使用了,并且随着 Redis 版本的发展,在 Redis.2.0.0 中新增了发布订阅模式(Pub/Sub) ...

  4. 【Redis消息队列实现异步秒杀】--Redis学习笔记08

    前言 秒杀业务的优化思路: 先利用Redis完成库存余量.一人一单判断,完成抢单业务 再将下单业务放入队列中(阻塞队列,消息队列),利用独立线程异步下单 基于阻塞队列的异步秒杀存在哪些问题? 内存限制 ...

  5. PHP借用Redis消息队列实现高并发下发送邮件功能

    参考: 我目前的做法是,借用redis的队列,把要发送的消息,全部放到里面,然后就不管了 有一个后台发送进程,来处理队列里面的数据 1.如果需要重发,则把发送失败的消息放到一个备份的队列里,每次循环开 ...

  6. springboot:整合redis消息队列

    整合redis消息队列 项目依赖 <!-- RedisTemplate --><dependency><groupId>org.springframework.bo ...

  7. php mysql redis mq_PHP基于Redis消息队列实现发布微博的方法

    本文实例讲述了PHP基于Redis消息队列实现发布微博的方法.分享给大家供大家参考,具体如下: phpRedisAdmin :github地址  图形化管理界面 git clone [url]http ...

  8. php 频繁插库处理队列,PHP+Redis 消息队列 实现高并发下注册人数统计的实例

    前言 现在越来越多的网站开始注重统计和用户行为分析,作为网站经常使用的功能,如何让统计性能更加高,这也是我们需要考虑的事情.本篇通过Redis来优化统计功能(以注册人数统计为例). 传统的统计功能都是 ...

  9. redis消息队列,你还不敢用?

    文章目录 前言 一.关于消息队列 1.应用场景 2.如何设计消息队列 二.Redis 消息队列解决方案 1.基于 List 的消息队列解决方案 2.基于 zset 的消息队列解决方案 3.基于 Str ...

  10. Redis消息队列三种方案

    什么是消息队列: 消息(Message)是指在应用间传输的数据,消息可以包括简答的文本字符串,也可以有嵌入对象等,消息队列(Message Queue)是一种应用间的通信方式,用来监视消息是否发送成功 ...

最新文章

  1. 阿里三面被挂,幸获内推,历经5轮终于拿到口碑offer
  2. 一起谈.NET技术,微软PDC10:大牛谈ASP.NET和C#技术走向
  3. cmd oracle sys登录_Oracle 忘记了sys和system密码和用户名的解决方法
  4. e-cology在保险行业——泛微保险行业解决方案
  5. 实现quartz定时器及quartz定时器原理介绍
  6. S5PC100基于I2C子系统的lm75驱动流程图
  7. c语言中图形驱动程序功能_C / C ++中的图形:一些更有趣的功能
  8. #js#简单的在线计算器
  9. 【编程语言之Python】之plt画图尺寸、去白边
  10. android textview电话号码,Android应用开发之Android EditTextView 实现带空格分隔的输入(电话号码,银行卡)...
  11. windows环境中JDK环境变量配置
  12. 如何在Mac上批量转换和调整图像大小
  13. AI技术如何做工程?
  14. 字节游戏测试开发面试题
  15. 5万成员丨CSDN 大数据领域网红社区!
  16. 超级计算机预测未来,超级计算机预测未来
  17. 用HTML标签设置字体颜色,html中li标签设置字体颜色
  18. 介绍中国传统节日的网页html,介绍中国传统节日的作文4篇
  19. 微信开放平台开发 微信登录
  20. 【JavaScript】JS中的预解析

热门文章

  1. XFS为例 讨论NVMe SSD测试注意事项
  2. Linux——进程间通信的常见方法(管道、信号、共享映射区、本地套接字)、管道的了解与简单用法
  3. java--18位身份证号最后一位(检验码)是怎么算出来的?
  4. Python小游戏(五)吃豆人小游戏
  5. SQL高级查询之分组查询
  6. 电话拨号界面 android,仿安卓手机拨号界面按键特效
  7. vue调用 手机拨号
  8. 将阴天欠曝发暗的照片后期调出小清新色调效果
  9. 2016-2021 年中国电动夹爪市场销量变化
  10. element ui 相关 -------星星评分