golang gin 监听rabbitmq队列无限消费
golang gin 监听rabbitmq队列无限消费
连接rabbitmq
package databaseimport ("github.com/streadway/amqp""log""reflect""yy-data-processing/common/config" )var RabbitConn *amqp.Connection var RabbitChannel *amqp.Channelfunc InitRabbitmq() {var err errorRabbitConn, err = amqp.Dial(config.Config.RabbitUrl)if err != nil {log.Println("连接RabbitMQ失败")panic(err)}RabbitChannel, err = RabbitConn.Channel()if err != nil {log.Println("获取RabbitMQ channel失败")panic(err)} }// 0表示channel未关闭,1表示channel已关闭 func CheckRabbitClosed(ch amqp.Channel) int64 {d := reflect.ValueOf(ch)i := d.FieldByName("closed").Int()return i }
创建生产者
package serviceimport ("encoding/json""github.com/streadway/amqp""log""yy-data-processing/common/config""yy-data-processing/common/database""yy-data-processing/model"
)func Producer() {// 声明队列,没有则创建// 队列名称、是否持久化、所有消费者与队列断开时是否自动删除队列、是否独享(不同连接的channel能否使用该队列)declare, err := database.RabbitChannel.QueueDeclare(config.Config.HawkSaveQueueName, true, false, false, false, nil)if err != nil {log.Printf("声明队列 %v 失败, error: %v", config.Config.HawkSaveQueueName, err)panic(err)}request := model.Request{}marshal, _ := json.Marshal(request )// exchange、routing key、mandatory、immediateerr = database.RabbitChannel.Publish("", declare.Name, false, false, amqp.Publishing{ContentType: "text/plain",Body: []byte(marshal),})if err != nil {log.Printf("生产者发送消息失败, error: %v", err)} else {log.Println("生产者发送消息成功")}
}
创建消费者
package serviceimport ("encoding/json""log""os""strings""sync""time""yy-data-processing/common/config""yy-data-processing/common/database""yy-data-processing/model" )func Consumer() {// 声明队列,没有则创建// 队列名称、是否持久化、所有消费者与队列断开时是否自动删除队列、是否独享(不同连接的channel能否使用该队列)_, err := database.RabbitChannel.QueueDeclare(config.Config.QueueName, true, false, false, false, nil)if err != nil {log.Printf("声明队列 %v 失败, error: %v", config.Config.QueueName, err)panic(err)}err = database.RabbitChannel.Qos(1, // prefetch count 服务器将在收到确认之前将那么多消息传递给消费者。0, // prefetch size 服务器将尝试在收到消费者的确认之前至少将那么多字节的交付保持刷新到网络false, // 当 global 为 true 时,这些 Qos 设置适用于同一连接上所有通道上的所有现有和未来消费者。当为 false 时,Channel.Qos 设置将应用于此频道上的所有现有和未来消费者)if err != nil {log.Printf("rabbitmq设置Qos失败, error: %v", err)}// 队列名称、consumer、auto-ack、是否独享// deliveries是一个管道,有消息到队列,就会消费,消费者的消息只需要从deliveries这个管道获取deliveries, err := database.RabbitChannel.Consume(config.Config.QueueName, "", false, false, false, false, nil)if err != nil {log.Printf("从队列 %v 获取数据失败, error: %v", config.Config.QueueName, err)} else {log.Println("从消费队列获取任务成功")}// 阻塞住for {select {case message := <-deliveries:closed := database.CheckRabbitClosed(*database.RabbitChannel)if closed == 1 { // channel 已关闭,重连一下database.InitRabbitmq()err = database.RabbitChannel.Qos(1, 0, false)if err != nil {log.Printf("rabbitmq重连后设置Qos失败, error: %v", err)}} else {msgData := string(message.Body)request := model.Request{}err := json.Unmarshal([]byte(msgData), &request)if err != nil {log.Printf("解析rabbitmq数据 %v 失败, error: %v", msgData, err)} else {// TODO...// 处理逻辑// 处理完毕手动ACKmessage.Ack(true)}}}} }
main方法协程调用
package mainimport ("log""yy-data-processing/common/config""yy-data-processing/common/database""yy-data-processing/router""yy-data-processing/service" )func main() {// 初始化路由routers := router.InitRouters()// 初始化RabbitMQdatabase.InitRabbitmq()go service.Producer()go service.Consumer()port := config.Config.Portif err := routers.Run(":" + port); err != nil {log.Printf("启动服务失败: ", err)}}
golang gin 监听rabbitmq队列无限消费相关推荐
- rabbitmq多个消费者监听一个队列_RabbitMQ的六种工作模式
一.基于erlang语言:是一种支持高并发的语言 RabbitMQ的六种工作模式: 1.1 simple简单模式 消息产生着§将消息放入队列 消息的消费者(consumer) 监听(while) 消息 ...
- java 监听队列_spring+activemq实战之配置监听多队列实现不同队列消息消费
摘选:https://my.oschina.net/u/3613230/blog/1457227 摘要: 最近在项目开发中,需要用到activemq,用的时候,发现在同一个项目中point-to-po ...
- 七十八、SpringBoot监听rabbitmq和创建交换器,队列
@Author:Runsen 来源:尚硅谷 下面建议读者学习尚硅谷的B站的SpringBoot视频,我是学雷丰阳视频入门的. 具体链接如下:B站尚硅谷SpringBoot教程 文章目录 AmqpAdm ...
- 基于Golang的监听读取配置文件的程序包开发——simpleConfig_v1
基于Golang的监听&读取配置文件的程序包开发--simpleConfig_v1 [阅读时间:约10分钟] 一.配置文件概述 二.系统环境&项目介绍 1.系统环境 2.项目的任务要求 ...
- IBMMQ监听消息队列
** IBMMQ发送和接收消息示例: pom.xml下载jar包: <dependency><groupId>com.ibm.mq</groupId><art ...
- Python监听RabbitMq ready数量
通过定时器每隔半小时监控一次.防止队列卡住. 说明: vhost = '%2F' 当你的vhost是 / 的时候 浏览器要通过 转义 所以 / = %2F #encoding: utf-8 #summ ...
- java onmessage监听消息队列_消息队列(MQ)功能场景
来自公众号:京东技术 消息队列(MQ)是一种不同应用程序之间(跨进程)的通信方法.应用程序通过写入和检索出入列队的数据(消息)来通信,而无需通过专用链接来连接它们.消息传递指的是程序之间通过在消息中发 ...
- 聊聊RabbitMq动态监听这点事
很长时间没有分享过学习心得了,看了下发布记录,最后一篇文章的时间都在2020-12-10年了,今天抽时间整理下一个很早就想整理的技术分享.顺便说句题外话,因为我一直没时间整理,再加上开发的小伙伴对Mq ...
- java消费rabbitMQ队列消息
服务起来之后,会自动监听队列中的消息 import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.sprin ...
最新文章
- Android 抓取app进程 hprof 文件
- java中所有的类都继承于_Java中所有的类都是通过直接或间接地继承()类得到的...
- [git] 基础操作-02 分支和主支合并
- 第一次LeetCode周赛心得(力扣-cn周赛,使用python3)
- 【转载】MySQL innodb_table_stats表不存在的解决方法
- Atitit.atiInputMethod v2词库清理策略工具 q229
- 会声会影高清视频输出(小体积-大高清)
- 屏幕录制专家,如何上传到优酷的高清视频?
- 【渝粤教育】电大中专公共基础课程作业 题库
- 3.6 使用透视裁剪工具修复透视图 [Ps教程]
- Accuracy, Precision, Recall和F1-score解释
- Java基础:IO 流中的 flush
- 定制属于你的潮流轻链竞品分析
- matlab fscanf
- NLP-统计词频之处理停用词
- gcc -Wl,--wrap,malloc 替换系统函数
- Lua系列--pairs和ipairs
- 基于STM32风速风向检测仿真
- wampserver中文版 附安装教程
- 河南公考计算机知识,河南省公务员计算机知识考核复习题非专业类
热门文章
- 电脑右下角系统时间精确显示到秒详细设置
- 2.LAST【潜心创作】大富翁
- 数据采集-“消防知识网上答题挑战赛”题库
- 有四个数字:1、2、3、4,能组成多少个互不相同且无重复数字的三位数?
- 【Excel学习】Excel 制作2个维度的折线图
- 苹果录屏精灵_iPhone有这么厉害?苹果官方列出20项优点,网友:我怎么没发现?...
- mysql指定时间转换成yyyy-mm-dd_如何在MySQL中将特定日期的MM / YY转换为YYYY-MM-DD?...
- 实用:python中字符串重复统计
- Unity模拟群聚行为 Boids 鸟群、鱼群
- Django 小记 FileResponse 实现文件下载