go实现的redis消息队列
go实现的消息队列 redis_queue
- 支持并发队列
- 支持Topic,Group注册队列监听消息
- 支持消息异常监听重启队列
贴相关核心代码,最后附上git链接
//消息载体
type QueuePayload struct {ID string `json:"id"`IsFast bool `json:"is_fast"`Topic string `json:"topic"`Group string `json:"group"`Body interface{} `json:"body"`
}var instanceQueueManager *QueueManager
var onceQueueManager sync.Once//队列管理器
type QueueManager struct {db *redis.ClientMaxRetry int //重试的最大次数RecoverCh chan RecoverData //队列恢复监听通道Handlers map[string]interface{} //注册队列Map
}//初始化队列管理器
func NewQueueManager() *QueueManager {onceQueueManager.Do(func() {instanceQueueManager = &QueueManager{}instanceQueueManager.MaxRetry = 3instanceQueueManager.Handlers = make(map[string]interface{})})return instanceQueueManager
}func (r *QueueManager) GetQueueName(topic string, group string) string {var name stringif len(group) > 0 {name = fmt.Sprintf("Queue_%s::%s", topic, group)} else {name = fmt.Sprintf("Queue_%s", topic)}return name
}//注册队列
func (r *QueueManager) RegisterQueue(topic string, group string, handler interface{}) error{name := r.GetQueueName(topic, group)if _, ok := r.Handlers[name]; ok {return errors.New("is exits")}else{r.Handlers[name] = handlergo r.QueueConsume(topic, group)}return nil
}//生产者执行入队列
func (r *QueueManager) QueuePublish(payload *QueuePayload) error {if len(payload.Topic) <= 0 {return errors.New("TopicId can not be empty")}id, err := uuid.NewUUID()if err != nil {return err}payload.ID = id.String()payloadStr, _ := json.Marshal(payload)r.db.LPush(r.GetQueueName(payload.Topic, payload.Group), payloadStr)return nil
}//消费者执行出队列
func (r *QueueManager) QueueConsume(topic string, group string) {defer func() {if err := recover(); err != nil {var stacktrace stringfor i := 1; ; i++ {_, f, l, got := runtime.Caller(i)if !got {break}stacktrace += fmt.Sprintf("%s:%d\n", f, l)}// when stack finisheslogMessage := fmt.Sprintf("Trace: %s\n", err)logMessage += fmt.Sprintf("\n%s", stacktrace)log.Println(logMessage)//执行恢复函数r.handleRecover(topic, group)}}()for {//消费者执行出列var payload QueuePayloadresult := r.db.BRPop(0, r.GetQueueName(topic, group))if (len(result.Val()) > 0) {vals := result.Val()[1]err := json.Unmarshal([]byte(vals), &payload)if err != nil {log.Println("BRPOP json.Unmarshal Error:", err)continue}//执行回调函数r.handleCallBack(&payload)}}
}//执行恢复函数
func (r *QueueManager) handleRecover(topic string, group string) {handleName := r.GetQueueName(topic, group)handler, ok := r.Handlers[handleName]if r.RecoverCh != nil && ok{r.RecoverCh <- RecoverData{topic, group, handler}}
}//执行回调函数
func (r *QueueManager) handleCallBack (payload *QueuePayload){handleName := r.GetQueueName(payload.Topic, payload.Group)it := r.Handlers[handleName]if it != nil {if ob, ok := it.(Queueable); ok {//同步执行Max次,保证队列顺序,失败则丢弃消息,for i:=0; i< r.MaxRetry; i++ {rs := ob.Execute(payload)if rs.State{break}}}else{log.Println("no ExecuteFunc,pop:", payload)}}
}
上述完整代码链接: git链接
参考的代码:https://github.com/bennya8/go_redis_queue_manager
go实现的redis消息队列相关推荐
- Redis消息队列发展历程
简介:Redis是目前最受欢迎的kv类数据库,当然它的功能越来越多,早已不限定在kv场景,消息队列就是Redis中一个重要的功能.Redis从2010年发布1.0版本就具备一个消息队列的雏形,随着10 ...
- Redis 消息队列的三种方案(List、Streams、Pub/Sub)
现如今的互联网应用大都是采用 分布式系统架构 设计的,所以 消息队列 已经逐渐成为企业应用系统 内部通信 的核心手段,它具有 低耦合.可靠投递.广播.流量控制.最终一致性 等一系列功能. 当前使用较多 ...
- 如何保证 Redis 消息队列中的数据不丢失?
Redis 最常见的业务场景就是缓存读取与存储,而随着时间的推移,有人开始将它作为消息队列来使用了,并且随着 Redis 版本的发展,在 Redis.2.0.0 中新增了发布订阅模式(Pub/Sub) ...
- 【Redis消息队列实现异步秒杀】--Redis学习笔记08
前言 秒杀业务的优化思路: 先利用Redis完成库存余量.一人一单判断,完成抢单业务 再将下单业务放入队列中(阻塞队列,消息队列),利用独立线程异步下单 基于阻塞队列的异步秒杀存在哪些问题? 内存限制 ...
- PHP借用Redis消息队列实现高并发下发送邮件功能
参考: 我目前的做法是,借用redis的队列,把要发送的消息,全部放到里面,然后就不管了 有一个后台发送进程,来处理队列里面的数据 1.如果需要重发,则把发送失败的消息放到一个备份的队列里,每次循环开 ...
- springboot:整合redis消息队列
整合redis消息队列 项目依赖 <!-- RedisTemplate --><dependency><groupId>org.springframework.bo ...
- php mysql redis mq_PHP基于Redis消息队列实现发布微博的方法
本文实例讲述了PHP基于Redis消息队列实现发布微博的方法.分享给大家供大家参考,具体如下: phpRedisAdmin :github地址 图形化管理界面 git clone [url]http ...
- php 频繁插库处理队列,PHP+Redis 消息队列 实现高并发下注册人数统计的实例
前言 现在越来越多的网站开始注重统计和用户行为分析,作为网站经常使用的功能,如何让统计性能更加高,这也是我们需要考虑的事情.本篇通过Redis来优化统计功能(以注册人数统计为例). 传统的统计功能都是 ...
- redis消息队列,你还不敢用?
文章目录 前言 一.关于消息队列 1.应用场景 2.如何设计消息队列 二.Redis 消息队列解决方案 1.基于 List 的消息队列解决方案 2.基于 zset 的消息队列解决方案 3.基于 Str ...
- Redis消息队列三种方案
什么是消息队列: 消息(Message)是指在应用间传输的数据,消息可以包括简答的文本字符串,也可以有嵌入对象等,消息队列(Message Queue)是一种应用间的通信方式,用来监视消息是否发送成功 ...
最新文章
- 阿里三面被挂,幸获内推,历经5轮终于拿到口碑offer
- 一起谈.NET技术,微软PDC10:大牛谈ASP.NET和C#技术走向
- cmd oracle sys登录_Oracle 忘记了sys和system密码和用户名的解决方法
- e-cology在保险行业——泛微保险行业解决方案
- 实现quartz定时器及quartz定时器原理介绍
- S5PC100基于I2C子系统的lm75驱动流程图
- c语言中图形驱动程序功能_C / C ++中的图形:一些更有趣的功能
- #js#简单的在线计算器
- 【编程语言之Python】之plt画图尺寸、去白边
- android textview电话号码,Android应用开发之Android EditTextView 实现带空格分隔的输入(电话号码,银行卡)...
- windows环境中JDK环境变量配置
- 如何在Mac上批量转换和调整图像大小
- AI技术如何做工程?
- 字节游戏测试开发面试题
- 5万成员丨CSDN 大数据领域网红社区!
- 超级计算机预测未来,超级计算机预测未来
- 用HTML标签设置字体颜色,html中li标签设置字体颜色
- 介绍中国传统节日的网页html,介绍中国传统节日的作文4篇
- 微信开放平台开发 微信登录
- 【JavaScript】JS中的预解析