Rabbitmq介绍

Rabbitmq是一个开源的AMQP实现,服务端用Erlang语言编写,支持多种客户端,用于在分布式系统中存储转发消息,在易用性,扩展性,高可用性
等方面表现不俗

Rabbitmq安装

#拉去镜像 management 默认带控制台
docker pull rabbitmq:3.8.14-management
#查看镜像
docker image
#运行镜像
docker run -it --name rabbitmq-5672 -e -p 15672:15672 -p 5672:5672 rabbitmq
#查询容器是否启动成功
docker ps -a
#端口说明
4369
是Erlang端口/节点名称映射程序,用来跟踪节点名称监听地址,在集群中起到一个类似DNS的作用
5672,5671
AMQP 0-9-1和客户端端口,没有使用SSL和使用SSL端口
25672
用于在RabbitMQ节点间CLI工具通信,配合4369使用
15672
HTTP_API端口,用于管理RabbitMQ,需要启用management插件
61613,61614
当SOOMP插件启用的时候打开,作为STOMP客户端端口(根据是否使用TLS来选择)
1883
当MQTT插件启动时打开,作为MQTT客户端端口(根据是否使用TLS来选择)
15674
基于WebSocket的STOMP客户端端口(当插件Web STOMP启动时打开)
15675
基于WebSocket的MQTT客户端端口(当插件Web MQTT启动时打开)
查看web控制台 看是否启动成功
默认账号密码guest/guest
docker 启动参数详情
-d 后台运行的容器名称
--hostname 主机名(Rabbitmq根据节点名称存储数据,默认为主机名)
--name 容器名称
-p 指定服务的运行端口 宿主:容器
--restar 出现故障后自动重启方式,no 不自动重启,on-failure:重启次数 指定自动重启的失败次数,到达失败次数后不再重启,always 自动重启
--add-host 添加hosts映射
-v 映射目录或文件
Producer:生产者,进行投递消息,发布到Rabbitmq中
消息一般包含二个部分:消息体和附加信息
消息体(payload)
消息体一般是一个带有业务逻辑结构的数据.可以进行序列化操作
附加信息
用来表述这条消息,比如目标交换机的名称、路由建和一些自定义属性Broker:消息中间件服务节点
对于Rabbitmq来书,一个Rabbitmq Broker 可以简单看做一个Rabbitmq服务节点,或者Rabbitmq服务实例Virtral Host:虚拟主机,表示一批交换机,消息队列,和相关对象
虚拟主机是共享相同的身份认证和加密环境的独立服务器域
每一个vhost本质上是一个mini版的Rabbitmq服务器,拥有自己的队列,交换机,绑定和权限机制
vhost是AMQP的概念,必须要在连接是指定,默认vhost是 /Channel:频道或者信道,是建立在Connection连接之上的一种轻量级的连接
大部分操作在Channel这个接口中完成,包括自定义队列的声明,交换机的声明,队列的绑定,发布消息,消费消息等RoutingKey:路由建 在交换机和绑定键固定的情况下,生产者在发送消息给交换器时,通过制定RoutingKey决定消息流向哪里Excange:交换机 生产者发送消息到Exchange交换机,由交换机将消息路由到一个或多个队列中,如果路由不到,返回给生产者,或者直接丢弃Queue:队列,rabbittmq内部对象,用来存储消息Binding: 绑定,rabbitmq通过绑定将交换机和队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey)
RabbittMQ常用的交换机类型有Fanout、Direct、Topic、Headers四种

SpringBoot Rabbitmq四种类型实现

#pom依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
#yml文件
server:port: 8881
spring:rabbitmq:host: 192.168.46.3port: 5672username: guestpassword: guestvirtual-host: /
#Direct直连类型队列@BeanQueue directQueue(){//是否持久化return new Queue("directQueue",true);}@BeanDirectExchange directExchange(){return new DirectExchange("directExchange");}@BeanBinding bindingDirect(){return BindingBuilder.bind(directQueue()).to(directExchange()).with("directtKey");}##生产者rabbitTemplate.convertAndSend("directExchange","directtKey","message" + id++);#消费者@RabbitListener(queues = "directQueue")public void message(Message message, Channel channel) {System.out.println("直连-消费者:"+Thread.currentThread().getName() + "------" + new Date() +"------"+new String(message.getBody()));}
#Fanout广播类型交换机@BeanQueue fanoutQuquq1(){return new Queue("fanoutQuquq1",true);}@BeanQueue fanoutQuquq2(){return new Queue("fanoutQuquq2",true);}@BeanFanoutExchange fanoutExchange(){return new FanoutExchange("fanoutExchange");}@BeanBinding bindingFanoutt1(){return BindingBuilder.bind(fanoutQuquq1()).to(fanoutExchange());}@BeanBinding bindingFanoutt2(){return BindingBuilder.bind(fanoutQuquq2()).to(fanoutExchange());}#生产者rabbitTemplate.convertAndSend("directExchange","","message" + id++);#消费者@RabbitListener(queues = "fanoutQuquq1")public void fanoutQuquq1(Message message, Channel channel) {System.out.println("广播1---消费者:"+Thread.currentThread().getName() + "------" + new Date() +"------"+new String(message.getBody()));}@RabbitListener(queues = "fanoutQuquq2")public void fanoutQuquq2(Message message, Channel channel) {System.out.println("广播2---消费者:"+Thread.currentThread().getName() + "------" + new Date() +"------"+new String(message.getBody()));}
#Topic类型交换机  * 单个匹配  #多个匹配/*** topic*/@BeanQueue topicQueue1(){return new Queue("topicQueue1",true);}@BeanQueue topicQueue2(){return new Queue("topicQueue2",true);}@BeanTopicExchange topicExchange(){return new TopicExchange("topicExchange");}@BeanBinding bindingTopic1(){return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("send.message.*");}@BeanBinding bindingTopic2(){return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("send.message.error.*");}#生产者rabbitTemplate.convertAndSend("topicExchange","send.message.error.info","message" + id++);#消费者@RabbitListener(queues = "topicQueue1")public void topicQueue1(Message message, Channel channel) {System.out.println("topic1---消费者:"+Thread.currentThread().getName() + "------" + new Date() +"------"+new String(message.getBody()));}@RabbitListener(queues = "topicQueue2")public void topicQueue2(Message message, Channel channel) {System.out.println("topic2---消费者:"+Thread.currentThread().getName() + "------" + new Date() +"------"+new String(message.getBody()));}
#Headers 交换机@BeanQueue handlerQueue(){return new Queue("handlerQueue",true);}@BeanHeadersExchange headersExchange(){return new HeadersExchange("headersExchange");}@BeanBinding bindingHeaders(){Map<String, Object> map = new HashMap<>();map.put("One","A");map.put("Two","B");return BindingBuilder.bind(handlerQueue()).to(headersExchange()).whereAny(map).match();}#生产者rabbitTemplate.convertAndSend("headersExchange",null,"消息:"+id++,message -> {message.getMessageProperties().setHeader("Three","A");return message;});#消费者@RabbitListener(queues = "handlerQueue")public void handlerQueue(Message message, Channel channel) {System.out.println("handlerQueue---消费者:"+Thread.currentThread().getName() + "------" + new Date() +"------"+new String(message.getBody()));}

RabbitMQ高可用方案

cluster
1.普通模式(元数据共享的方式)
优点:不需要同步数据,效率高
缺点:存在单点故障问题
元数据包含
队列元数据:队列的名称及属性
交换机:交换机的名称及属性
绑定关系元数据:交换机与队列或者交换机与交换机
vhost元数据:host内的队列,交换机和绑定提供命名空间以及安全属性之间的绑定关系2.镜像模式(所有节点数据拷贝一份)
Cluster单机多节点部署
#CentOS 安装
安装 Docker Compose#安装 epel-release 依赖:
yum install epel-release#安装 DNF 包:
yum install dnf
​
#Fedora 安装:
dnf install dnf#安装python2-pip
dnf install python2-pip#安装python3-pip(一般选择python3)
dnf install python3-pip#安装docker-compose
pip3 install docker-compose#查看版本
docker-compose version
使用 Docker Compose 启动3个 RabbitMQ 节点
vi docker-compose.yml
写入以下配置
version: "2.0"
services:rabbit1:image: rabbitmq:managementhostname: rabbit1ports:- 5672:5672 #集群内部访问的端口- 15672:15672 #外部访问的端口environment:- RABBITMQ_DEFAULT_USER=guest #用户名- RABBITMQ_DEFAULT_PASS=guest #密码- RABBITMQ_ERLANG_COOKIE='imoocrabbitmq'
​rabbit2:image: rabbitmq:managementhostname: rabbit2ports:- 5673:5672environment:- RABBITMQ_ERLANG_COOKIE='imoocrabbitmq'links:- rabbit1
​rabbit3:image: rabbitmq:managementhostname: rabbit3ports:- 5674:5672environment:- RABBITMQ_ERLANG_COOKIE='imoocrabbitmq'links:- rabbit1- rabbit2
#启动docker-compose,按照脚本启动集群
docker-compose up -d#进入二号节点
docker exec -it default_rabbit2_1 /bin/bash#停止二号节点的rabbitmq
rabbitmqctl stop_app#配置二号节点加入集群
rabbitmqctl join_cluster rabbit@rabbit1#启动二号节点
rabbitmqctl start_app#3号节点重复此操作

集群镜像模式(在普通模式的基础上,通过Policy来实现 使用镜像模式可以实现Rabbitmq高可用方案)
#随便进入一台容器
docker exec -it default_rabbit1_1 /bin/bash
#-p静默输出,减少信息输出;"ha-mode":"all"代表所有队列;"ha-sync-mode":"automatic"代表自动同步,默认需要手动同步
rabbitmqctl set_policy -p ha-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'#查看
rabbitmqctl list_policies
![请添加图片描述](https://img-blog.csdnimg.cn/e711050581534f23b995c24dab85ffe0.png)#清除(需要加上vhost的信息)
rabbitmqctl clear_policy -p / ha-allName:policy名称
pattern:匹配表达式
Apply to:规则应用目标
Priority:优先级
Definition:规则的定义,对于镜像队列的配置来说,只需要3个部分,ha-mode,ha-params,ha-sync-mode

ha-mode

ha-sync-mode 队列同步消息的方式有效值为automatic和manual,默认是automatic

Federation插件

Federation 插件的设计目标是使 RabbitMQ 在不同的 Broker 节点之间进行消息传递而 无须建立集群,该功能在以下场景下非常有用:
• 各个节点运行在不同版本的Erlang和RabbitMQ上。
• 网络环境不稳定,比如广域网当中。

Shovel插件

Shovel 与Federation具备的数据转发功能类似 Shovel能够可靠、持续地从一个Broker中的队列(作为源端,即source)拉取数据并转发 至另一个Broker中的交换器(作为目的端,即destination)。

Rabbitmq持久化机制

持久化消息

非持久化消息

Rabbitmq内存控制

当内存使用超过配置的阈值或者磁盘剩余空间低于配置的阈值时,RabbitMQ 会暂时阻塞客户端的连接, 并停止接收从客户端发来的消息,以此避免服务崩溃,客户端与服务端的心跳检测也会失效。

当出现内存告警时,可以通过管理命令临时调整内存大小

#进入docker容器
docker exec -it XXX /bin/bash#查看命令
rabbitmqctl -help
#设置告警内存大小 默认 0.4
rabbitmqctl set_vm_memory_high_watermark <fraction>
##绝对值 单位为KB,MB,GB
rabbitmqctl set_vm_memory_high_watermark absolute <value>注意:通过此命令修改的阈值在Broker重启后将会失效,通过修改配置文件的方式设置的阈值则 不会在重启后消失,但需要重启Broker才会生效。配置文件地址:/etc/rabbitmq/rabbitmq.confvm_memory_high_watermark.relative = 0.4
# vm_memory_high_watermark.absolute = 1GBRabbitMQ 提供relative或absolute两种配置方式
• relative 相对值,即前面的fraction,建议取值在0.4~0.66之间,不建议超过0.7
• absolute 绝对值,单位为KB、MB、GB

Rabbitmq内存换页

在某个 Broker 节点触及内存并阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间。 持久化和非持久化的消息都会被转储到磁盘中,其中持久化的消息本身就在磁盘中有一份副本,这里会 将持久化的消息从内存中清除掉。默认情况下,在内存到达内存阈值的 50%时会进行换页动作。 也就是说,在默认的内存阈值为 0.4 的情况下,当内存超过 0.4 x 0.5=0.2 时会进行换页动作可以通过在配置文件中配置vm_memory_high_watermark_paging_ratio项来修改此值vm_memory_high_watermark_paging_ratio = 0.75vm_memory_high_watermark_paging_ratio的值大于1时,相当于禁用了换页功能

Rabbitmq磁盘控制

当磁盘剩余空间低于确定的阈值时,RabbitMQ 同样会阻塞生产者,这样可以避免因非持久化的消息持续 换页而耗尽磁盘空间导致服务崩溃。
默认情况下,磁盘阈值为50MB,表示当磁盘剩余空间低于50MB 时会阻塞生产者并停止内存 中消息的换页动作 。
这个阈值的设置可以减小,但不能完全消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘 空间检测期间内,磁盘空间从大于50MB被耗尽到0MB一个相对谨慎的做法是将磁盘阈值设置为与操作系统所显示的内存大小一致通过命令可以临时调整磁盘阈值
rabbitmqctl set_disk_free_limit <disk_limit>
rabbitmqctl set_disk_free_limit mem_relative <fraction>disk_limit 为固定大小,单位为KB、MB、GB
fraction 为相对比值,建议的取值为1.0~2.0之间对于的配置如下
disk_free_limit.relative = 2.0
# disk_free_limit.absolute = 50mb

Rabbitmq消息可靠性

RabbitMQ 的消息可靠性,一般是业务系统接入消息中间件时首要考虑的问题,一般通过 三个方面保障

  • **发送可靠性:**确保消息成功发送到Broker
  • 存储可靠性: Broker对消息持久化,确保消息不会丢失
  • **消费可靠性:**确保消息成功被消费

消息发送可靠性

  • At most once:最多一次,消息可能会丢失 ,但绝不会重复传输
  • At least once:最少一次,消息绝不会丢失,但可能会重复传输
  • Exactly once:恰好一次,每条消息肯定会被传输一次且仅传输一次

Rabbitmq支持"最多一次" 和 “最少一次
**最少一次:**消息生产者需要开启事务确认机制或者publisher confirm机制,确保消息可靠的传输到Rabbitmq中

**最多一次:**生产者随意发送,不过这样很难确保消息会成功发送

消息存储可靠性
开启RabbitMQ持久化(交换机、队列、消息)

消息消费可靠性
消费者在消费消息的同时,需要将autoAck设置为false,然后通过手动确认的方式去确认 己经正确消费的消息,以免在消费端引起不必要的消息丢失

保证消息不重复消费
幂等性(每个消息用一个唯一标识来区分,消费前先判断标识有没有被消费过,若已消费过,则直接ACK)

RabbitMQ如何保证消息的顺序性
将消息放入同一个交换机,交给同一个队列,这个队列只有一个消费者,消费者只允许同时开启一个线程

RabbitMQ死信队列
死信队列是当消息在一个队列因为下列原因:
a、消息被拒绝(basic.reject或basic.nack)并且requeue=false.
b、消息TTL过期
c、队列达到最大长度(队列满了,数据无法添加到mq中)
变成了 “死信队列” 后被重新投递(publish)到另一个Exchange,然后重新消费。说白了就是没有被消费的消息换个地方重新被消费

例如:

   /*** 死信队列*/@BeanQueue dlxQueue(){return new Queue("dlxQueue");}@BeanDirectExchange dlxExchange(){return new DirectExchange("dlxExchange");}@BeanBinding binding(){return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlxRoutingKey");}/*** 订单*/@BeanDirectExchange orderExchange(){return new DirectExchange("orderExchange");}/*** 订单队列绑定死信队列* @return*/@BeanQueue orderQueue(){Map<String, Object> arguments = new HashMap<>(2);// 绑定我们的死信交换机arguments.put("x-dead-letter-exchange", "dlxExchange");// 绑定我们的路由keyarguments.put("x-dead-letter-routing-key", "dlxRoutingKey");return new Queue("orderQueue", true, false, false, arguments);}@BeanBinding orderBinding(){return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("orderRoutingKey");}

生产者

        rabbitTemplate.convertAndSend("orderExchange","orderRoutingKey","message" + id++,message -> {message.getMessageProperties().setExpiration("100");return message;});

消费者

    /*** 订单队列* @param message* @param channel*/@RabbitListener(queues = "orderQueue")public void orderQueue(Message message, Channel channel) {System.out.println("订单队列---消费者:"+Thread.currentThread().getName() + "------" + new Date() +"------"+new String(message.getBody()));int i = 1/0;}/*** 死信队列* @param message* @param channel*/@RabbitListener(queues = "dlxQueue")public void dlxQueue(Message message, Channel channel) {System.out.println("死信队列---消费者:"+Thread.currentThread().getName() + "------" + new Date() +"------"+new String(message.getBody()));}

Rabbitmq 系列相关推荐

  1. RabbitMQ系列之【启动过程中遇到问题及解决方案】

    RabbitMQ系列之[启动过程中遇到问题及解决方案] 参考文章: (1)RabbitMQ系列之[启动过程中遇到问题及解决方案] (2)https://www.cnblogs.com/feixiabl ...

  2. rabbitmq系列问题解决:406, “PRECONDITION_FAILED - inequivalent arg ‘durable‘

    rabbitmq系列问题解决:406, "PRECONDITION_FAILED - inequivalent arg 'durable' 参考文章: (1)rabbitmq系列问题解决:4 ...

  3. RabbitMQ系列教程之四:路由(Routing)

    在上一个教程中,我们构建了一个简单的日志系统,我们能够向许多消息接受者广播发送日志消息. 在本教程中,我们将为其添加一项功能 ,这个功能是我们将只订阅消息的一个子集成为可能. 例如,我们可以只将关键的 ...

  4. RabbitMQ系列教程之三:发布\/订阅(Publish\/Subscribe)

    在前一个教程中,我们创建了一个工作队列.工作队列背后的假设是每个任务会被交付给一个[工人].在这一部分我们将做一些完全不同的事情--我们将向多个[消费者]传递信息.这种模式被称为"发布/订阅 ...

  5. RabbitMQ系列教程之二:工作队列(Work Queues)

    今天开始RabbitMQ教程的第二讲,废话不多说,直接进入话题.   (使用.NET 客户端 进行事例演示) 在第一个教程中,我们编写了一个从命名队列中发送和接收消息的程序.在本教程中,我们将创建一个 ...

  6. RabbitMQ系列(三)RabbitMQ交换器Exchange介绍与实践

    RabbitMQ交换器Exchange介绍与实践 RabbitMQ系列文章 RabbitMQ在Ubuntu上的环境搭建 深入了解RabbitMQ工作原理及简单使用 RabbitMQ交换器Exchang ...

  7. RabbitMQ系列——Rabbitmq Plugin configuration unchanged. 解决方案

    RabbitMQ系列--Rabbitmq Plugin configuration unchanged. 解决方案 一.问题概述 cmd命令窗口执行rabbitmq-plugins enable ra ...

  8. 工作队列模式(任务队列)| RabbitMQ系列(二)

    相关文章 RabbitMQ系列汇总:RabbitMQ系列 前言 工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成. 相反我们安排任务在之后执行.我们把任务封装为消息并 ...

  9. 【外行也能看懂的RabbitMQ系列(一)】—— RabbitMQ快速入门篇(内含丰富实例)

    系列文章目录 准备篇 RabbitMQ安装文档 第一章 RabbitMQ快速入门篇 第二章 RabbitMQ的Web管理界面详解 第三章 RabbitMQ进阶篇之死信队列 第四章 RabbitMQ进阶 ...

  10. 【外行也能看懂的RabbitMQ系列(二)】—— RabbitMQ的Web管理界面(rabbitmq_management)详解(内含Topic模式通配符实操)

    系列文章目录 准备篇 RabbitMQ安装文档 第一章 RabbitMQ快速入门篇 第二章 RabbitMQ的Web管理界面详解 第三章 RabbitMQ进阶篇之死信队列 第四章 RabbitMQ进阶 ...

最新文章

  1. 转:Object-Runtime的基本数据类型
  2. maven的内部属性
  3. strace调试(Linux Device Driver)
  4. STM32——时钟系统
  5. docker 基础操作
  6. 领导有了这些策略 不怕团队没有执行力
  7. 40亿个手机号码如何去重?
  8. sketchup 图片转模型_3d模型转su模型(如何将3D模型转化为sketchup)
  9. WIN10在服务器上找不到共享打印机,win10搜索不到共享打印机怎么办
  10. graphpad如何换柱状图与折线图能否混合一起_Graphpad Prism 绘制柱状图与散点图共存图...
  11. 詹姆斯高斯林_詹姆斯·高斯林接下来要做什么?
  12. 树莓派4B+Intel神经计算棒(Stick2)+YoloV5可行性考察报告
  13. 《手机传感器》参数与选择
  14. 【嵌入式蓝桥杯】程序执行完中断将不再触发 /* Go to infinite loop when Hard Fault exception occurs */
  15. python地图制作 - pyecharts(1.9.1)绘制各城市地图
  16. 劳资蜀道山!6个高质量免费电子书网站!我看谁还不知道
  17. 紫光国芯 数字后端 面经
  18. JAVA-win7的JDK环境变量配置安装
  19. 总结一下:运维工程师面试的经历及面试相关问题
  20. SQL查找是否“存在“

热门文章

  1. ASCII Art@字符画LOGO字符生成@Font Generator
  2. matlab绘制动态图
  3. 攀岩基地-东龙洲,香港小众离岛之一,半天游
  4. Excel 单条件模糊查询
  5. 新增服务器虚拟磁盘,系统提示“由于管理员设置的策略,该磁盘处于脱机状态”
  6. php教程徽章,PHP-徽章原因错误
  7. 大数据和云计算技术周报(第99期)
  8. 服务器光驱装系统教程图解,图文解析win7系统光驱装系统的具体步骤
  9. [转载] C++文件读写详解(ofstream,ifstream,fstream)
  10. 2023年全国最新二级建造师精选真题及答案6