一文了解 rabbitMq 消息队列
消息队列
- 1. 流向削峰
- 2. 应用解耦
- 3.异步处理
- 应用原理
- Broker(代理)
- Virtual host(虚拟网站)
- Connection
- Channl
- Exchange
- 四大核心
- 生产者
- 交换机
- 队列
- 消费者
- 六大模式
- 1.简单模式(hello World)
- 2. 工作模式(work queues)
- 3.发布订阅(publish/fanout)
- 4.路由模式(routing/direct)
- 5.topic主题模式
- 6. 发布确认模式
- 简单模式(hello world)
- 工作队列模式 (Work Queues)
- 消息应答
- 消息自动重新入队
- RabbitMQ 持久化
- 队列持久化
- 消息持久化
- 不公平分发
- 预取值 Prefetch
- 发布确认
- 单个确认发布 (同步)
- 批量确认发布
- 异步确认发布
- Exchange (交换机)
- Exchange 类型
- 临时队列
- binding
- fanout 发布订阅模式
- direct 直接模式
- topic 主题模式
- 死信队列
- 死信的来源
- 延迟队列
- 基于死信的 延迟队列TTL
- Rabbit 延迟插件
- 发布确认高级内容
- 确认机制方案
- 回退消息
- Mandatory 参数
- 备份交换机
- 幂等性
- 概念 (重复提交 - 消息被重复消费了)
- 解决思路 ( 就是判断消息是否被消费过 )
- 唯一 id + 指纹码机制
- redis 原子性
- 优先级队列 (0-255 越大越优先执行)
- 惰性队列
- 两种模式
- 内存开销对比
- RabbitMQ 集群
- 镜像队列
#为什么使用MP
1. 流向削峰
2. 应用解耦
3.异步处理
应用原理
Broker(代理)
接受和分发消息的应用,RabbitMQ Server就是Message Broker (消息实体)
Virtual host(虚拟网站)
出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似与网络中的namespace概念,当多个不同的用户使用同一个RabbitMq server提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue等
Connection
publisher/consumer 和 broker 之间的TCP链接
Channl
如果每一次访问 RabbitMQ都建立一个Connection,在消息量大的时候建立TCPConnection的开销将是巨大的,效率也较地,Channel实在connection内部建立的逻辑链接,如果应用程序支持多线程,通常每个 thread 创建单独的 channl 进行通讯,AMQP method 包含了 channel
id 帮助客户端和 message broker 识别 channl ,所以 channl 之间时完全隔离的,Channel 作为轻量级的Connection 极大减少了操作系统建立TCP connection 的开销
Exchange
message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key ,分发消息到queue中去,常用的类型有 direct(直接的)(point - to -point),topic (话题)(publish -subscribe 发布 - 订阅) 和 fanout(输出) (multicast(多播))
四大核心
生产者
交换机
队列
消费者
六大模式
1.简单模式(hello World)
2. 工作模式(work queues)
3.发布订阅(publish/fanout)
4.路由模式(routing/direct)
5.topic主题模式
6. 发布确认模式
简单模式(hello world)
工作队列模式 (Work Queues)
工作队列,又称为任务队列,主要思想时避免立即执行资源密集型任务,而不得不等待他完成。相反我们安在任务在之后执行,我们把任务封装成消息发送到消息队列,在后台运行的工作进程将弹出任务并最终执行作业,当有多个工作线程时,这些工作线程将一起处理任务
比如:多个工作线程处理消息
默认是轮询发送消息 但是消息量多则不是轮询发送消息
消息应答
为了保证消息在发送过程中不丢失,rabbitmq 引入了消息应答机制,消息应答就是:消费者在接受消息并且处理该消息后,告诉rabbitmq他已经处理了,rabbitmq可以把该消息删除了
- 自动应答
这种模式需要在高吞吐量和数据传输安全性方面做权衡,没有对消息数量进行限制,仅使用消费者可以在高县并以魔种速率能够处理这些消息的使用情况下使用 - 手动应答
- Channl.basicAck ( 用于肯定确认)
mq 一直到该消息并且成功的处理消息,可以将其丢弃了 - Channl.basicNack (用于否定确认)
- Channl.basicReject (用于否定确认)
不处理该消息直接拒绝,可以将其丢弃
- Channl.basicAck ( 用于肯定确认)
消息自动重新入队
如果消费者由于某些原因失去了链接(其通道已经关闭,连接已关闭或TCP链接丢失),导致消息未发送ACK确认,mq 将了解到消息没有完全处理,并将其重新排队,如果此时其他消费者可以处理,他将很快的将其分发给零一个消费者,这样,即使某一个消费者偶尔死亡,也可以确定消息不丢失
RabbitMQ 持久化
队列持久化
- 第二个参数 就是持久化
这里的 大写 的 D 代表持久化 durable
消息持久化
- 发送消息为持久化消息
不公平分发
// 设置为不公平分发
channel.basicQos(1);
预取值 Prefetch
发布确认
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道发布的消息都竟会被指派一个唯一的ID,一但消息被投递到所有匹配的队列后,broker 就会发送一个确认给生产者(包含唯一ID),如果消息和队列是可持久化的,那么确认消息会在计入磁盘之后发出, broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外broker 也可以设置 basic.bak 的 multiple 域,表示到这个序列号之前的所有消息都得到了处理。
confirm 模式最大的好处在于一部的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终的到确认之后,生产者应用便可以通过回调的方式来确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以以回调的方式中处理该 nack 消息。
单个确认发布 (同步)
// 开启发布确认channel.confirmSelect();channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(str+i).getBytes("UTF-8"));// 等待确认boolean b = false;// 等待 确认b = channel.waitForConfirms();
批量确认发布
无法确定那个消息没有被确认
int count = 100;for (int i = 0; i < COUNT; i++) {channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(str+i).getBytes("UTF-8"));// 到底100条批量确认if(i % count == 0){channel.waitForConfirms();}
异步确认发布
他是使用回调函数达到消息的可靠性传递
// 消息的监听器,监听消息的成功还是失败 1. 成功消息 2. 失败消息// 1. 消息的标记 2. 是否为批量确认channel.addConfirmListener((deliveryTag,multiple)->{System.out.println("消息成功" + deliveryTag+".."+multiple);},(deliveryTag,multiple)->{System.out.println("消息失败" + deliveryTag);});for (int i = 0; i < COUNT; i++) {channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(str+i).getBytes("UTF-8"));//}
ConcurrentSkipListMap<Long , String> outStandingConfirms = new ConcurrentSkipListMap<>();// 消息的监听器,监听消息的成功还是失败 1. 成功消息 2. 失败消息// 1. 消息的标记 2. 是否为批量确认channel.addConfirmListener((deliveryTag,multiple)->{if(multiple){ConcurrentNavigableMap<Long, String> NavigableMap = outStandingConfirms.headMap(deliveryTag);NavigableMap.clear();System.out.println("消息成功1+++" + deliveryTag+".."+multiple);System.out.println(outStandingConfirms);}else {outStandingConfirms.remove(deliveryTag);System.out.println("消息成功2+++" + deliveryTag+".."+multiple);System.out.println(outStandingConfirms);}},(deliveryTag,multiple)->{System.out.println("消息失败" + deliveryTag);});for (int i = 0; i < COUNT; i++) {outStandingConfirms.put(channel.getNextPublishSeqNo(),str+i);channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(str+i).getBytes("UTF-8"));}
Exchange (交换机)
生产者生产的消息从不会直接发送到队列两种,生产者只能将消息发送到交换机,他一方面接受消息,两一方面把消息放到特定的队列中或者丢弃
Exchange 类型
direct(直接) topic(主题) 标题(header) 扇出(fanout)
无名交换机 使用默认类型
// 1.发送到哪一个交换机
// 2.路由的 Key 值 routing - key
// 3.其他参数信息
// 4.发送的消息体
channel.basicPublish("", QUEUE_NAME,null,message.getBytes());
临时队列
临时对列代表没有持久化的队列,断开链接队列就会被删除,与不用指定名字
channel.queueDeclare().getQueue();
binding
绑定交换机和队列的关系
fanout 发布订阅模式
一 对 多
direct 直接模式
一对 一
topic 主题模式
一对 匹配的队列 routing-key
死信队列
一般来说,producer 将消息投递到 broker 或者 直接到queue 里,consumer从queue取出进行消费,但耨时候由于特定的原因导致queue中的下消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列
为了使消息数据不丢失,需要使用到 RabbitMq 的死信对列机制,当消息发生异常时,将消息投入到死信队列中
死信的来源
- 消息 TTL 过期
- 队列达到最大长度
- 消息被拒绝
延迟队列
延迟队列内部时有序的,最重要的特性就体现在他的延时属性上,延时队列中的元素时希望在指定时间到了以后或之前去除和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。 他就是死信队列消息过期的一种
比如:订单10分钟未支付
用户注册成功后,如果三天内没有登录则进行短息提醒
用户发起退款后,三天没有的到处理通知相关人员
预定会议,需要预定的前10分钟通知各个参与会人员
这些场景都有一个他特点,需要某个事件发生之后或者之前的指定时间点完成某一项任务
基于死信的 延迟队列TTL
这里有一个不足之处 , 每增加新的时间时间需求就要新增一个队列
优化版
由生产者指定消息的时间 有通用队列适应不同的时间
== 但是这种 延迟消息 发送两条以上的消息是,后入队列的消息会被先入队的消息阻塞住,即使他的过期时间很短也需要等待前一个消息出队==
Rabbit 延迟插件
增加了x-delayed-messages 延迟队列插件
发布确认高级内容
在生产环境中由于一些不明的原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和回复,
确认机制方案
/*** @author zhaochong* @version 1.0* @description: 交换机接受消息 进行消息确认的回调 他主要解决了在交换机层面的消息丢失 ,不能解决路由消息丢失* @date 2021/8/14 16:20*/
@Component
@Slf4j
public class MycallBack implements RabbitTemplate.ConfirmCallback {// 注入@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init (){rabbitTemplate.setConfirmCallback(this::confirm);}/*** 交换机确认回调方法* 1. 生产者发送消息,交换机接受的哦了 回调* 1.1 correlationData 保存回调消息的ID及相关信息* 1.2 交换机收到消息 true* 1.3 cause(原因) null* 2 发消息 交换机接受失败了* 2.1 correlationData 保存回调消息的ID及相关信息* 2.2 false* 2.3 失败的原因**/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if(ack){log.info("交换机收到了消息 id为 {} ",id);}else {log.info("交换机还未收到 id为 {}的消息,原因为 {} ",id,cause);}}
}
回退消息
Mandatory 参数
== 只有不可到达才会到达==
在近开启了生产者确认的机制情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会直接对其,此时生产者时不知道消息被丢弃这个事件的。 通过设置 Mandatory 这个参数可以在当消息传递过程中不可到达目的地时及时将消息返回给生产者。
//设置 Mandatory 这个参数可以在当消息传递过程中不可到达目的地时及时将消息返回给生产者。// 这里只有失败@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("消息 {},被交换机 {} 退回了 ,退回原因 :{} ,路由key: {}",new String(returned.getMessage().getBody()),returned.getExchange(),returned.getReplyText(),returned.getRoutingKey());}
备份交换机
有了 Mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会再生产者的纤细无法被投递时发现并处理,但是有时我们不知道该如何处理这些无法路由的消息。
当我们为某个交换机声明了备份交换机时,就是为了他创建了一个备胎,当交换机接到一条不可路由的消息时,将会把这条消息转发到备份交换机上,由备份交换机进行消息处理和转发,通常备份交换机的类型为 fanout 类型,这样就能把消息投递到预期绑定的对垒中去,然后我们在备份交换机中半丁对垒,用独立的消费者进行检测和预警
备份交换机 时发生在 消息为成功入队 ,死信对列时发生在消息成功入队但是没有被消费
Mandatory 参数 ReturnCallBack 的优先级 小于 备份交换机 、 使用备份交换机 ReturnCallBack 则无法收到信息
幂等性
概念 (重复提交 - 消息被重复消费了)
用户对于统一操作发起的一次请求或者多次请求的结果是一直的,不会因为多次点击而产生副作用。最简单的例子-支付
解决思路 ( 就是判断消息是否被消费过 )
MQ 消费者的幂等性的解决一般使用 全局ID 或者写一个唯一标识 比如 时间戳或者 uuid 或者订单消费者消费 MQ 中的消息也可以利用 MQ的该 id 来判断,或者可按自己的规则生成一个全局唯一的id,每次消费时用该ID 判断消息是否被消费过。
唯一 id + 指纹码机制
指纹码: 我们的一些规则或者时间戳加别的服务给到的唯一信息码,他并不一定时我们系统生成的,基本都是我们的业务逻辑规则拼接而来的,但是一定要保持唯一性,然后利用铲鲟语句进行怕不断这个 id 是否存在数据库中,优势就是实现简单的一个拼接,然后判断是否重复,劣势就是在高并发情况下,数据库压力,这种方式不推荐
redis 原子性
利用 redis 执行setnx 命令,天然具有幂等性,从而实现不被重复消费
优先级队列 (0-255 越大越优先执行)
使用场景: 订单催付场景 大客户和小客户优先级不同,
设置最大优先级
// 设置对列最大优先级QueueBuilder maxPriority(int maxPriority) {return withArgument("x-max-priority", maxPriority);}
// 设置 消息 优先级
properties.SetPersistent(true);
//设置消息的优先级
properties.Priority = (byte)((i == 3)?30:i); //发布消息channel.BasicPublish(exchange: "",routingKey: "q.test",basicProperties: properties,body: body);rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content,message -> {message.getMessageProperties().setPriority(priority);return message;});
惰性队列
惰性队列会将消息尽可能存入磁盘中,而在消费到相应的消息时再回被加载到内存中,他设计的重要目标就是为了能够支持更长的队列,既支持更多的消息存储,为解决 消费者由于各种各样的原因(下线、宕机、维护)导致消息造成的堆积
默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存中,这样可以快速的将消息发送给消费者,即使时持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份,当RabbitMQ需要释放内存的时候,会将内存中的消息换页置磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作 无法接收到新的消息,
两种模式
default 和 lazy ,可以通过channel.queueDeclare 设置,也可以通过 Policy 设置 ,如果一个队列同时通过这两种方式设置的话,马模 Policy 的方式具有更高的优先级,,如果通过声明的方式设置,只能先删除在重新生命一个新的
内存开销对比
RabbitMQ 集群
镜像队列
如果集群中 只有一个 Broker 节点,那么该节点的失效将导致整体服务的临时性不可以红,并且可能会导致消息的丢失,可以将所有消息都设置为持久化,并且对应队列的 druable 属性也设置为 true ,但是这样仍任无法避免由于缓存导致的问题,因为消息在发送之后和被写入磁盘并执行刷盘动作之间存在一个短暂却会产生问题的时间窗,通过 publisherconfirm 机制能够确保客户端知道跑那些消息已经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务的不可用
引入镜像队列的机制,可以将队列镜像到集群中其他的 Broker 节点之上, 如果集群中的一个节点失效了,队列能自动的切换到镜像中的另一个节点上以保证服务的可用性。
一文了解 rabbitMq 消息队列相关推荐
- RabbitMQ学习系列二:.net 环境下 C#代码使用 RabbitMQ 消息队列
上一篇已经讲了Rabbitmq如何在Windows平台安装,不懂请移步:RabbitMQ学习系列一:windows下安装RabbitMQ服务 一.理论: .net环境下,C#代码调用RabbitMQ消 ...
- RabbitMQ消息队列(十三)-VirtualHost与权限管理
像mysql有数据库的概念并且可以指定用户对库和表等操作的权限.那RabbitMQ呢?RabbitMQ也有类似的权限管理.在RabbitMQ中可以虚拟消息服务器VirtualHost,每个Virtua ...
- rabbitMQ消息队列 – 面板介绍及简单demo
首先rabbit安装好之后,运维会给一个控制面板. 默认账号密码为guest 登入以后可以看到具体界面. 在此鸣谢百度翻译给予的大力支持.. ###写一个简单的demo 编写之前..虽然说可以直接用底 ...
- 大数据互联网架构阶段 QuartZ定时任务+RabbitMQ消息队列
QuartZ定时任务+RabbitMQ消息队列 一 .QuartZ定时任务解决订单系统遗留问题 情景分析: 在电商项目中 , 订单生成后 , 数据库商品数量-1 , 但是用户迟迟不进行支付操作 , 这 ...
- RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)
在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会 ...
- (四)RabbitMQ消息队列-服务详细配置与日常监控管理
(四)RabbitMQ消息队列-服务详细配置与日常监控管理 原文:(四)RabbitMQ消息队列-服务详细配置与日常监控管理 RabbitMQ服务管理 启动服务:rabbitmq-server -de ...
- 使用EasyNetQ组件操作RabbitMQ消息队列服务
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现,是实现消息队列应用的一个中间件,消息队列中间件是分布式系统中重要的组件,主要解决应用耦合, ...
- 初探 RabbitMQ 消息队列
初探 RabbitMQ 消息队列 rabbitmq基础概念常见应用场景导入依赖属性配置具体编码定义队列实体类控制器消息消费者主函数测试总结说点什么 SpringBoot 是为了简化 Spring 应用 ...
- 基于RabbitMQ消息队列的分布式事务解决方案 - MQ分布式消息中间件实战
基于RabbitMQ消息队列的分布式事务解决方案 - MQ分布式消息中间件实战 参考文章: (1)基于RabbitMQ消息队列的分布式事务解决方案 - MQ分布式消息中间件实战 (2)https:// ...
最新文章
- 学习鸟哥的Linux私房菜笔记(1)——Linux系统入门
- 极富创意的404错误个性页面设计欣赏
- Javascript弹出对话框 确定取消转到不同页面
- 一、Delphi 2009 中的泛型
- 在NIO.2中使用文件和目录
- php解决01背包问题,PHP动态规划解决0-1背包问题实例分析_PHP教程
- 分布式存储系统学习笔记(三)—分布式键值系统(2)—淘宝Tair
- 计算机无法安装ae,Windows10系统AE软件无法安装如何处理
- wine装通达信_通过wine使用通达信、钱龙、同花顺、大智慧软件
- cad2019菜单栏怎么调出来_AutoCAD2019怎么把工具栏放左右两边 两侧工具栏调出来...
- sqli-labs(50-53)
- 风口下的追逐:AI正在驾驶、客服、教育领域疾驰
- channel使用法则
- golang emoji表情处理
- CDA数据分析师携手万宝盛华开启人才培训新篇章
- MySQLamp;amp;JDBC回顾——MySQL
- 程序员双十一剁手指南(2020)
- Flink实战—基于时间窗口定时输出sink
- My first job - Goodbaby Group in Shanghai
- C语言:L1-057 PTA使我精神焕发 (5 分)
热门文章
- 电力系统学计算机有用吗,电力系统中计算机技术的应用
- 2018手机江湖变局:联想手机的高调“回归”
- Matlab数字(所有数值存储为双精度浮点数)
- java dto对象_DTO对象转换
- git -- RPC failed; HTTP 403 curl 22 The requested URL returned error: 403
- 注册微信公众号,原来如此简单
- Matlab:船舶航向舵数学模型构建
- oracle中如何执行存储过程,Oracle如何执行存储过程
- jinja2 简单入门
- 台式机机械硬盘咔咔异响、硬盘无法读取的根本原因及解决办法