1、认识MQ

1.1、什么是MQ?
MQ全称:message queue 即 消息队列
这个队列遵循的原则:FIFO 即 先进先出
队列里面存的就是message

1.2、为什么要用MQ?
1.2.1、流量削峰

这种情况,要是访问 1020次 / s呢?这种肯定会让支付系统宕机了,因为太大了嘛,受不了,所以:流量削峰

这样就让message排着队了,然后使用FIFO先进先出,这样支付系统就可以承受得了了

1.2.2、应用解耦

上面这种,只要支付系统或库存系统其中一个挂彩了,那么订单系统也要挂彩,因此:解耦呗

而采用了MQ之后,支付系统和库存系统有一个出问题,那么它的处理内存是在MQ中的,此时订单系统就不会有影响,可以正常完成,等到故障恢复了,订单系统再处理对应的事情,这就提高了系统的可用性

1.2.3、异步处理

如上图,订单系统要调用支付系统的API,而订单系统并不知道支付系统处理对应的业务需要多久,要解决可以采用订单系统隔一定时间又去访问支付系统,看处理完没有,而使用MQ更容易解决。

1.3、RabbitMQ的原理?

图中的东西后续会慢慢见到
Broker实体:接收和分发消息的应用 / RabbitMQ Server / Message Broker
而上图中RabbitMQ的四个核心就是:Producer生产者、exchange交换机、queue队列、Consumer消费者
Producer生产者:就是负责推送消息的程序
Exchange交换机:接收来自生产者的消息,并且把消息放到队列中
queue队列:就是一个数据结构,本质就是一个很大的消息缓冲区,许多生产者可以把消息推送到一个队列,许多消费者可以从一个队列中获得数据
Consumer消费者:就是接收消息的程序
注意:生产者、消息中间件MQ、消费者大多时候并不是在同一台机器上的,所以:生产者有时可以是消费者;而消费者有时也可以是生产者
Connection链接:就是让Producer生产者、Broker实体、Consumer消费者之间建立TCP链接
Virtual host虚拟机:处于多租户和安全因素考虑而设计的,当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Channel信道:就是发消息的通道,它是在Connection内部建立的逻辑连接
Routes路由策略 / binding绑定:交换机以什么样的策略将消息发布到Queue。也就是exchange交换机 和 queue队列之间的联系,即 二者之间的虚拟连接,它里面可以包含routing key 路由键

1.4、RabbitMQ的通讯方式
这个玩意儿在官网中有图,地址:https://www.rabbitmq.com/getstarted.html 学完之后这张图最好放在自己脑海里,平时开发玩的就是这些,下面的工作模式在后续会慢慢接触
另外:下面放的是七种,实质上第六种RPC用得很少

1、hello word - 简单模式
2、work queues - 工作模式
3、publish / subscribe - 发布订阅模式
4、Routing - 路由模式
5、Topics - 主题模式
6、RPC模式 - 不用了解也行
7、publisher confirms - 发布确认模式

2、安装RabbitMQ

以下的方式自行选择一种即可

2.1、在Centos 7下安装
查看自己的Linux版本
uname -a

2.1.1、使用rpm红帽软件
准备工作

1、下载Erlang,因为:RabbitMQ是Erlang语言写的,Erlang下载地址【 ps:这是官网 】:https://www.erlang.org/downloads,选择自己要的版本即可
另外:RabbitMQ和Erlang的版本对应关系链接地址 https://www.rabbitmq.com/which-erlang.html

当然:上面这种是下载gz压缩包,配置挺烦的,可以直接去github中下载rpm文件,地址:https://github.com/rabbitmq/erlang-rpm/releases , 选择自己需要的版本即可,注意一个问题:要看是基于什么Linux的版本

要是github下载慢的话,都有自己的文明上网加速方式,要是没有的话,可以进入 https://github.com/fhefh2015/Fast-GitHub 下载好了然后集成到自己浏览器的扩展程序中即可,而如果进入github很慢的话,可以选择去gitee中搜索一个叫做:dev-sidecar的东西安装,这样以后进入github就很快了,还有另外的很多方式,不介绍了。

2、执行rpm -ivh erlang文件 命令
i 就是 install的意思
vh 就是显示安装进度条
注意:需要保证自己的Linux中有rpm命令,没有的话,执行yum install rpm指令即可安装rpm

3、安装RabbitMQ需要的依赖环境
yum install socat -y

4、下载RabbitMQ的rpm文件,github地址:https://github.com/rabbitmq/rabbitmq-server/releases , 选择自己要的版本即可

5、安装RabbitMQ

6、启动RabbitMQ服务

启动服务
sbin/service rabbitmq-server start停止服务
/sbin/service rabbitmq-server stop查看启动状态
/sbin/service rabbitmq-server status开启开机自动
chkconfig rabbitmq-server on

查看启动状态

这表示正在启动,需要等一会儿,看到下面的样子就表示启动成功

7、安装web管理插件
1、停止RabbitMQ服务
service rabbitmq-server stop // 使用上面的命令 /sbin/service rabbitmq-server stop也行

2、安装插件
rabbitmq-plugins enable rabbitmq_management3、开启RabbitMQ服务
service rabbitmq-server start

要是访问不了,看看自己的防火墙关没关啊

查看防火墙状态

systemctl status firewalld

关闭防火墙

systemctl stop firewalld

一劳永逸 禁用防火墙

systemctl enable firewalld

同时查看自己的服务器有没有开放15672端口,不同的东西有不同的处理方式,如我的云服务器直接在服务器网址中添加规则即可,其他的方式自行百度

2.1.2、使用Docker安装
需要保证自己的Linux中有Docker容器,教程链接:https://www.cnblogs.com/xiegongzi/p/15621992.html

使用下面的两种方式都不需要进行web管理插件的安装和erlang的安装

1、查看自己的docker容器中是否已有了rabbitmq这个名字的镜像
docker images

删除镜像
docker rmi 镜像ID // 如上例的 dockerrmi 16c 即可删除镜像

2、拉取RabbitMQ镜像 并 启动Docker容器
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

3、查看Docker容器是否启动
docker ps

4、再次在浏览器进行访问就可以吃鸡了,不需要再安装插件啊,刚刚上一步拉镜像和启动时已经安装好了

2.1.3、使用Docker-compose安装
采用了第二种方式的话,记得把已经启动的Docker容器关了,以下是附加的一些Docker的基操

拉取镜像

docker pull 镜像名称

查看全部镜像

docker images

删除镜像

docker rmi 镜像ID

将本地的镜像导出

docker save -o 导出的路径 镜像id

加载本地的镜像文件

docker load -i 镜像文件

修改镜像名称

docker tag 镜像id 新镜像名称:版本

简单运行操作

docker run 镜像ID | 镜像名称

跟参数的运行

docker run -d -p 宿主机端口:容器端口 --name 容器名称 镜像ID | 镜像名称

如:docker run -d -p 8081:8080 --name tomcat b8

-d:代表后台运行容器

-p 宿主机端口:容器端口:为了映射当前Linux的端口和容器的端口

–name 容器名称:指定容器的名称

查看运行的容器

docker ps [-qa]

-a:查看全部的容器,包括没有运行

-q:只查看容器的标识

查看日志

docker logs -f 容器id

-f:可以滚动查看日志的最后几行

进入容器内部

docker exec -it 容器id bash

退出容器:exit

将宿主机的文件复制到容器内部的指定目录

docker cp 文件名称 容器id:容器内部路径
docker cp index.html 982:/usr/local/tomcat/webapps/ROOT

=====================================================================

重新启动容器

docker restart 容器id

启动停止运行的容器

docker start 容器id

停止指定的容器(删除容器前,需要先停止容器)

docker stop 容器id

停止全部容器

docker stop $(docker ps -qa)

删除指定容器

docker rm 容器id

删除全部容器

docker rm $(docker ps -qa)

1、创建一个文件夹,这些我很早之前就玩过了,所以建好了的
# 创建文件夹
mkdir 文件夹名

2、进入文件夹,创建docker-compose.yml文件,注意:文件名必须是这个
# 创建文件
touch docker-compose.yml

3、编辑docker-compose.yml文件
# 编辑文件
vim docker-compose.yml

里面编写的内容如下,编写好保存即可。注意:别用tab缩进啊,会出问题的,另外:每句的后面别有空格,严格遵循yml格式的
version: “3.1”
services:
rabbitmq:

镜像

image: rabbitmq:3.9-management

自启

restart: always

Docker容器名

container_name: rabbitmq

端口号,docker容器内部端口 映射 外部端口

ports:- 5672:5672- 15672:15672

数据卷映射 把容器里面的东西映射到容器外面去 容易操作,否则每次都要进入容器

volumes:- ./data:/opt/install/rabbitMQ-docker/

4、在docker-compose.yml所在路径执行如下命令,注意:一定要在此文件路径中才行,因为默认是在当前文件夹下找寻docker-compose文件

启动

docker-compose up -d

-d 后台启动

=========================================================

附加内容:docker-compose的一些命令操作

1. 基于docker-compose.yml启动管理的容器

docker-compose up -d

2. 关闭并删除容器

docker-compose down

3. 开启|关闭|重启已经存在的由docker-compose维护的容器

docker-compose start|stop|restart

4. 查看由docker-compose管理的容器

docker-compose ps

5. 查看日志

docker-compose logs -f

有兴趣的也可以去了解docker-file自定义镜像

去浏览器访问一样的吃鸡
上面就是RabbitMQ的基操做完了,不过默认账号是guest游客状态,很多事情还做不了呢,所以还得做一些操作

2.1.4、解决不能登入web管理界面的问题
2.1.4.1、使用rpm红帽软件安装的RabbitMQ
这种方式直接使用guest进行登录是不得吃的

这是因为guest是游客身份,不能进入,需要添加新用户
查看当前用户 / 角色有哪些
rabbitmqctl list_users

删除用户
rabbitmqctl delete_user 用户名添加用户
rabbitmqctl add_user 用户名 密码设置用户角色
rabbitmqctl set_user_tags 用户名 administrator设置用户权限【 ps:guest角色就是没有这一步 】
rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*"

设置用户权限指令解释

         set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

现在使用admin去浏览器登录就可以了

2.1.4.2、使用docker 或 docker-compose安装的RabbitMQ
这两种方式直接使用guest就可以进行登录,后续的操作就是一样的了

3、开始玩RabbitMQ

创建Maven项目 并导入如下依赖

com.rabbitmq
amqp-client
5.9.0

回到前面的RabbitMQ原理图

3.1、Hello word 简单模式
对照原理图来玩,官网中有Hello word的模式图

即:一个生产者Producer、一个默认交换机Exchange、一个队列queue、一个消费者Consumer

生产者

就是下图前面部分

package cn.zixieqing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

private static final String HOST = "ip";      // 放RabbitMQ服务的服务器ip
private static final int PORT = 5672;      // 服务器中RabbitMQ的端口号,在浏览器用的15672是通过5672映射出来的15672
private static final String USER_NAME = "admin";
private static final String PASSWORD = "admin";
private static final String QUEUE_NAME = "hello word";public static void main(String[] args) throws IOException, TimeoutException {// 1、获取链接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置链接信息factory.setHost(HOST);factory.setPort(PORT);factory.setUsername(USER_NAME);factory.setPassword(PASSWORD);/*当然:这里还可以设置vhost虚拟机 - 前提是自己在web管理界面中添加了vhostfactory.setVirtualHost();*/// 3、获取链接ConnectionConnection connection = factory.newConnection();// 4、创建channel信道 - 它才是去和交换机 / 队列打交道的Channel channel = connection.createChannel();// 5、准备一个队列queue// 这里理论上是去和exchange打交道,但是:这里是hello word简单模式,所以直接使用默认的exchange即可/*下面这是参数的完整意思,源码中偷懒了,没有见名知意queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )参数1、队列名字参数2、是否持久化( 保存到磁盘 ),默认是在内存中的参数3、是否共享,即:是否只供一个消费者消费,是否让多个消费者共享这个队列中的信息参数4、是否自动删除,即:最后一个消费者获取信息之后,这个队列是否自动删除参数5、其他配置项,这涉及到后面的知识,目前选择null*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println("正在发送信息!!!");// 6、推送信息到队列中// 准备发送的信息内容String message = "it is hello word";/*basicPublish( exchangeName,queueName,properties,message )参数1、交互机名字 - 目前使用了默认的参数2、指定路由规则 - 目前使用队列名字参数3、指定传递的消息所携带的properties参数4、推送的具体消息 - byte类型的*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());// 7、释放资源 - 倒着关闭即可if ( null != channel ) channel.close();if ( null != connection ) connection.close();System.out.println("消息发送完毕");}

}

运行之后,去浏览器管理界面进行查看

消费者

public class Consumer {

private static final String HOST = "ip";   // 自己的服务器ip
private static final int PORT = 5672;
private static final String USER_NAME = "admin";
private static final String PASSWORD = "admin";
private static final String QUEUE_NAME = "hello word";public static void main(String[] args) throws IOException, TimeoutException {// 1、创建链接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置链接信息factory.setHost(HOST);factory.setPort(PORT);factory.setUsername(USER_NAME);factory.setPassword(PASSWORD);// 3、创建链接对象Connection connection = factory.newConnection();// 4、创建信道channelChannel channel = connection.createChannel();// 5、从指定队列中获取消息/*basicConsume( queueName,isAutoAnswer,deliverCallback,cancelCallback )参数1、队列名参数2、是否自动应答,为true时,消费者接收到消息后,会立即告诉RabbitMQ参数3、消费者如何消费消息的回调参数4、消费者取消消费的回调*/System.out.println("开始接收消息!!!");DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("接收到了消息:" + new String(message.getBody(), StandardCharsets.UTF_8) );};CancelCallback cancelCallback = consumerTag -> System.out.println("消费者取消了消费信息行为");channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);// 6、释放资源 - 但是这里不能直接关闭啊,否则:看不到接收的结果的,可以选择不关,也可以选择加一句代码System.in.read();// channel.close();// connection.close();}

}

3.2、work queue工作队列模式
流程图就是官网中的

一个生产者批量生产消息
一个默认交换机
一个队列
多个消费者
换言之:就是有大量的任务 / 密集型任务有待处理( 生产者生产的消息 ),此时我们就将这些任务推到队列中去,然后使用多个工作线程( 消费者 )来进行处理,否则:一堆任务直接就跑来了,那消费者不得乱套了,因此:这种就需要让这种模式具有如下的特点:
1、消息是有序排好的( 也就是在队列中 )
2、工作线程 / 消费者不能同时接收同一个消息,换言之:生产者推送的任务必须是轮询分发的,即:工作线程1接收第一个,工作线程2接收第二个;工作线程1再接收第三个,工作线程2接收第四个

抽取RabbitMQ链接的工具类

package cn.zixieqing.util;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MQUtil {

private static final String HOST = "自己的ip";
private static final int PORT = 5672;
private static final String USER_NAME = "admin";
private static final String PASSWORD = "admin";public static Channel getChannel(String vHost ) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(HOST);factory.setPort(PORT);factory.setUsername(USER_NAME);factory.setPassword(PASSWORD);if ( !vHost.isEmpty() ) factory.setVirtualHost(vHost);return factory.newConnection().createChannel();}

}

生产者

和hello word没什么两样

package cn.zixieqing.workqueue;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class WorkProducer {

private static final String QUEUE_NAME = "work queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");// 1、声明队列/*下面这是参数的完整意思,源码中偷懒了,没有见名知意queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )参数1、队列名字参数2、是否持久化( 保存到磁盘 ),默认是在内存中的参数3、是否共享,即:是否只供一个消费者消费,是否让多个消费者共享这个队列中的信息参数4、是否自动删除,即:最后一个消费者获取信息之后,这个队列是否自动删除参数5、其他配置项,这涉及到后面的知识,目前选择null*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 2、准备消息System.out.println("请输入要推送的信息,按回车确认:");Scanner input = new Scanner(System.in);// 3、推送信息到队列中while (input.hasNext()) {/*basicPublish( exchangeName,routing key,properties,message )参数1、交互机名字 - 目前是使用了默认的参数2、指定路由规则 - 目前使用队列名字参数3、指定传递的消息所携带的properties参数4、推送的具体消息 - byte类型的*/String message = input.next();channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息====>" + message + "====>推送完毕!");}
}

}

消费者

消费者01
package cn.zixieqing.workqueue;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class WorkConsumer {

private static final String QUEUE_NAME = "work queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("接收到了消息====>" + new String(message.getBody(), StandardCharsets.UTF_8));};CancelCallback cancelCallback = consumerTag -> {System.out.println( consumerTag + "消费者中断了接收消息====>" );};System.out.println("消费者01正在接收消息......");channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}

}

消费者02
package cn.zixieqing.workqueue;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class WorkConsumer {

private static final String QUEUE_NAME = "work queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("接收到了消息====>" + new String(message.getBody(), StandardCharsets.UTF_8));};CancelCallback cancelCallback = consumerTag -> {System.out.println( consumerTag + "消费者中断了接收消息====>" );};System.out.println("消费者02正在接收消息......");channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}

}


3.3、消息应答机制
消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了
目的就是为了保证数据的安全,如果没有这个机制的话,那么就会造成下面的情况

消费者接收队列中的消息时,没接收完,出现异常了,然后此时MQ以为消费者已经把消息接收并处理了( MQ并没有接收到消息有没有被消费者处理完毕 ),然后MQ就把队列 / 消息给删了,后续消费者异常恢复之后再次接收消息,就会出现:接收不到了

3.3.1、消息应答机制的分类
这个东西已经见过了
/*
basicConsume( queueName,isAutoAnswer,deliverCallback,cancelCallback )
参数1、队列名
参数2、是否自动应答,为true时,消费者接收到消息后,会立即告诉RabbitMQ
参数3、消费者如何消费消息的回调
参数4、消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

3.3.1.1、自动应答
指的是:消息发送后立即被认为已经传送成功
需要具备的条件:
1、发送的消息很多,就是高吞吐量的那种
2、发送的消息在传输方面是安全的
优点:处理效率快,很高效

3.3.1.2、手动应答
就是我们自己去设定,好处是可以批量应答并且减少网络拥堵

调用的API如下:

Channel.basicACK( long, boolean );       // 用于肯定确认,即:MQ已知道该消息 并且 该消息已经成功被处理了,所以MQ可以将其丢弃了Channel.basicNack( long, boolena, boolean );    // 用于否定确认Channel.basicReject( long, boolea );       // 用于否定确认与Channel.basicNack( long, boolena, boolean )相比,少了一个参数,这个参数名字叫做:multiple

multiple参数说明,它为true和false有着截然不同的意义【 ps:建议弄成false,虽然是挨个去处理,从而应答,效率慢,但是:数据安全,否则:很大可能造成数据丢失 】

true 代表批量应答MQ,channel 上未应答 / 消费者未被处理完毕的消息

false 只会处理队列放到channel信道中当前正在处理的消息告知MQ是否确认应答 / 消费者处理完毕了

3.3.1.3、消息重新入队原理
指的是:如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息
如下图:消息1原本是C1这个消费者来接收的,但是C1失去链接了,而C2消费者并没有断开链接,所以:最后MQ将消息重新入队queue,然后让C2来处理消息1

3.3.1.4、手动应答的代码演示
生产者

package cn.zixieqing.ACK;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class AckProducer {

private static final String QUEUE_NAME = "ack queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");// 声明队列/*下面这是参数的完整意思,源码中偷懒了,没有见名知意queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )参数1、队列名字参数2、是否持久化( 保存到磁盘 ),默认是在内存中的参数3、是否共享,即:是否只供一个消费者消费,是否让多个消费者共享这个队列中的信息参数4、是否自动删除,即:最后一个消费者获取信息之后,这个队列是否自动删除参数5、其他配置项,这涉及到后面的知识,目前选择null*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println("请输入要推送的消息:");Scanner input = new Scanner(System.in);while (input.hasNext()) {/*basicPublish( exchangeName,routing key,properties,message )参数1、交互机名字 - 使用了默认的参数2、指定路由规则,使用队列名字参数3、指定传递的消息所携带的properties参数4、推送的具体消息 - byte类型的*/String message = input.next();channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));System.out.println("消息====>" + message + "推送完毕");}
}

}

消费者01

package cn.zixieqing.ACK;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

public class AckConsumer {

private static final String QUEUE_NAME = "ack queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");DeliverCallback deliverCallback = (consumerTag, message) -> {try {Thread.sleep(5*1000);System.out.println("接收到了消息=====>" + new String( message.getBody(), StandardCharsets.UTF_8 ));// 添加手动应答/*basicAck( long, boolean )参数1、消息的标识tag,这个标识就相当于是消息的ID参数2、是否批量应答multiple*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);} catch (InterruptedException e) {e.printStackTrace();}};System.out.println("消费者01正在接收消息,需要5秒处理完");channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {System.out.println("触发消费者取消消费消息行为的回调");System.out.println(Arrays.toString(consumerTag.getBytes(StandardCharsets.UTF_8)));});
}

}

消费者02

package cn.zixieqing.ACK;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

public class AckConsumer {

private static final String QUEUE_NAME = "ack queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");DeliverCallback deliverCallback = (consumerTag, message) -> {try {Thread.sleep(10*1000);System.out.println("接收到了消息=====>" + new String( message.getBody(), StandardCharsets.UTF_8 ));// 添加手动应答/*basicAck( long, boolean )参数1、消息的标识tag,这个标识就相当于是消息的ID参数2、是否批量应答multiple*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);} catch (InterruptedException e) {e.printStackTrace();}};System.out.println("消费者02正在接收消息,需要10秒处理完");channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {System.out.println("触发消费者取消消费消息行为的回调");System.out.println(Arrays.toString(consumerTag.getBytes(StandardCharsets.UTF_8)));});
}

}

3.4、RabbitMQ的持久化 durable
3.4.1、队列持久化
这个玩意儿的配置吧,早就见过了,在生产者消息发送时,有一个声明队列的过程,那里面就有一个是否持久化的配置
/*
下面这是参数的完整意思,源码中偷懒了,没有见名知意
queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
参数1、队列名字
参数2、是否持久化( 保存到磁盘 ),默认是在内存中的
参数3、是否共享,即:是否只供一个消费者消费,是否让多个消费者共享这个队列中的信息
参数4、是否自动删除,即:最后一个消费者获取信息之后,这个队列是否自动删除
参数5、其他配置项,这涉及到后面的知识,目前选择null
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

而如果没有持久化,那么RabbitMQ服务由于其他什么原因导致挂彩的时候,那么重启之后,这个没有持久化的队列就灰飞烟灭了【 ps:注意和里面的消息还没关系啊,不是说队列持久化了,那么消息就持久化了 】
在这个队列持久化配置中,它的默认值就是false,所以要改成true时,需要注意一个点:选择队列持久化,那么必须保证当前这个队列是新的,即:RabbitMQ中没有当前队列,否则:需要进到web管理界面把已有的同名队列删了,然后重新配置当前队列持久化选项为true,不然:报错

那么:当我把持久化选项改为true,并 重新发送消息时
image

inequivalent arg ‘durable’ for queue ‘queue durable’ in vhost ‘/’: received ‘true’ but current is ‘false’
告知你:vhost虚拟机中已经有了这个叫做durable的队列,要接收的选项值是true,但是它当前的值是false,所以报错了呗
解决方式就是去web管理界面,把已有的durable队列删了,重新执行

再次执行就可以吃鸡了,同时去web管理界面会发现它状态变了,多了一个D标识

有了这个玩意儿之后,那么就算RabbitMQ出问题了,后续恢复之后,那么这个队列也不会丢失

3.4.2、消息持久化
注意:这里说的消息持久化不是说配置之后消息就一定不会丢失,而是:把消息标记为持久化,然后RabbitMQ尽量让其持久化到磁盘

但是:也会有意外,比如:RabbitMQ在将消息持久化到磁盘时,这是有一个时间间隔的,数据还没完全刷写到磁盘呢,RabbitMQ万一出问题了,那么消息 / 数据还是会丢失的,所以:消息持久化配置是一个弱持久化,但是:对于简单队列模式完全足够了,强持久化的实现方式在后续的publisher / confirm发布确认模式中

至于配置极其地简单,在前面都已经见过这个配置项,就是生产者发消息时做文章,就是下面的第三个参数,把它改为MessageProperties.PERSISTENT_TEXT_PLAIN即可

  /*basicPublish( exchangeName,routing key,properties,message )参数1、交互机名字 - 使用了默认的参数2、指定路由规则,使用队列名字参数3、指定传递的消息所携带的properties参数4、推送的具体消息 - byte类型的*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());// 改成消息持久化channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

MessageProperties类的源码如下:
public class MessageProperties {

public static final BasicProperties MINIMAL_BASIC = new BasicProperties((String)null, (String)null, (Map)null, (Integer)null, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);public static final BasicProperties MINIMAL_PERSISTENT_BASIC = new BasicProperties((String)null, (String)null, (Map)null, 2, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);public static final BasicProperties BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);public static final BasicProperties PERSISTENT_BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);public static final BasicProperties TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);public MessageProperties() {
}

}

上面用到了BasicProperties类型,它的属性如下:
public static class BasicProperties extends AMQBasicProperties {
// 消息内容的类型
private String contentType;
// 消息内容的编码格式
private String contentEncoding;
// 消息的header
private Map<String, Object> headers;
// 消息是否持久化,1:否,2:是
private Integer deliveryMode;
// 消息的优先级
private Integer priority;
// 关联ID
private String correlationId;
// :用于指定回复的队列的名称
private String replyTo;
// 消息的失效时间
private String expiration;
// 消息ID
private String messageId;
// 消息的发送时间
private Date timestamp;
// 类型
private String type;
// 用户ID
private String userId;
// 应用程序ID
private String appId;
// 集群ID
private String clusterId;
}

3.5、不公平分发 和 预取值
不公平分发

这个东西是在消费者那一方进行设置的
RabbitMQ默认是公平分发,即:轮询分发
轮询分发有缺点:如前面消费者01( 设5秒的那个 )和 消费者02 ( 设10秒的那个 ),这种情况如果采用轮询分发,那么:01要快一点,而02要慢一点,所以01很快处理完了,然后处于空闲状态,而02还在拼命奋斗中,最后的结果就是02不停干,而01悠悠闲闲的,浪费了时间,所以:应该压榨一下01,让它不能停
设置方式:在消费者接收消息之前进行channel.basicQos( int prefetchCount )设置
// 不公平分发,就是在这里接收消息之前做处理
/*
basicQos( int prefetchCount )
为0、轮询分发 也是RabbitMQ的默认值
为1、不公平分发
*/
channel.basicQos(1);

    channel.basicConsume("qos queue", true, deliverCallback, consumerTag -> {System.out.println("消费者中断了接收消息行为触发的回调");});

预取值

指的是:多个消费者在消费消息时,让每一个消费者预计消费多少条消息

而要设置这种效果,和前面不公平分发的设置是一样的,只是把里面的参数改一下即可

 // 预取值,也是在这里接收消息之前做处理,和不公平分发调的是同一个API/* basicQos( int prefetchCount ) 为0、轮询分发 也是RabbitMQ的默认值;为1、不公平分发而当这里的数字变成其他的,如:上图中上面的那个消费者要消费20条消息,那么把下面的数字改成对应的即可注意点:这是要设置哪个消费者的预取值,那就是在哪个消费者代码中进行设定啊*/channel.basicQos(10);     // 这样就表示这个代码所在的消费者需要消费10条消息了channel.basicConsume("qos queue", true, deliverCallback, consumerTag -> {System.out.println("消费者中断了接收消息行为触发的回调");});

3.6、publisher / confirms 发布确认模式
3.6.1、发布确认模式的原理
这个玩意儿的目的就是为了持久化

在上面的过程中,想要让数据持久化,那么需要具备以下的条件
1、队列持久化
2、消息持久化
3、发布确认
而所谓的发布确认指的就是:数据在刷写到磁盘时,成功了,那么MQ就回复生产者一下,数据确认刷写到磁盘了,否则:只具备前面的二者的话,那也有可能出问题,如:数据推到了队列中,但是还没来得及刷写到磁盘呢,结果RabbitMQ宕机了,那数据也有可能会丢失,所以:现在持久化的过程就是如下的样子:

开启发布确认

在发送消息之前( 即:调basicPublish() 之前 )调一个API就可以了
channel.confirmSelect(); // 没有参数

3.6.2、发布确认的分类
3.6.2.1、单个确认发布
一句话:一手交钱一手交货,即 生产者发布一条消息,RabbitMQ就要回复确认状态,否则不再发放消息,因此:这种模式是同步发布确认的方式,缺点:很慢,优点:能够实时地了解到那条消息出异常 / 哪些消息都发布成功了
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {

    // 单个确认发布singleConfirm();        // 单个确认发布发送这些消息花费4797ms
}public static void singleConfirm() throws IOException, TimeoutException, InterruptedException {Channel channel = MQUtil.getChannel("");// 开启确认发布channel.confirmSelect();// 声明队列 并 让队列持久化channel.queueDeclare("singleConfirm", true, false, false, null);long begin = System.currentTimeMillis();for (int i = 1; i <= 100; i++) {// 发送消息 并 让消息持久化channel.basicPublish("","singleConfirm", MessageProperties.PERSISTENT_TEXT_PLAIN,String.valueOf(i).getBytes() );// 发布一个 确认一个 channel.waitForConfirms()if ( channel.waitForConfirms() )System.out.println("消息".concat( String.valueOf(i) ).concat( "发送成功") );}long end = System.currentTimeMillis();System.out.println("单个确认发布发送这些消息花费".concat( String.valueOf( end-begin ) ).concat("ms") );
}

3.6.2.2、批量确认发布
一句话:只要结果,是怎么一个批量管不着,只需要把一堆消息发布之后,回复一个结果即可,这种发布也是同步的
优点:效率相比单个发布要高
缺点:如果因为什么系统故障而导致发布消息出现问题,那么就会导致是批量发了一些消息,然后再回复的,中间有哪个消息出问题了鬼知道
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
// 单个确认发布
// singleConfirm(); // 单个确认发布发送这些消息花费4797ms

    // 批量发布batchConfirm();         // 批量发布发送的消息共耗时:456ms}public static void batchConfirm() throws IOException, TimeoutException, InterruptedException {Channel channel = MQUtil.getChannel("");// 开启确认发布channel.confirmSelect();// 声明队列 并 让队列持久化channel.queueDeclare("batchConfirm", true, false, false, null);long begin = System.currentTimeMillis();for (int i = 1; i <= 100; i++) {// 发送消息 并 让消息持久化channel.basicPublish("","batchConfirm", MessageProperties.PERSISTENT_TEXT_PLAIN,String.valueOf(i).getBytes() );// 批量发布 并 回复批量发布的结果 - 发了10条之后再确认if (i % 10 == 0) {channel.waitForConfirms();System.out.println("消息" + ( i-10 ) + "====>" + i + "的消息发布成功");}}// 为了以防还有另外的消息未被确认,再次确认一下channel.waitForConfirms();long end = System.currentTimeMillis();System.out.println("批量发布发送的消息共耗时:" + (end - begin) + "ms");}

3.6.2.3、异步确认发布 - 必须会的一种

由上图可知:所谓的异步确认发布就是:
1、生产者只管发消息就行,不用管消息有没有成功
2、发布的消息是存在一个map集合中的,其key就是消息的标识tag / id,value就是消息内容
3、如果消息成功发布了,那么实体broker会有一个ackCallback()回调函数来进行处理【 ps:里面的处理逻辑是需要我们进行设计的 】
4、如果消息未成功发布,那么实体broker会调用一个nackCallback()回调函数来进行处理【 ps:里面的处理逻辑是需要我们进行设计的 】
5、而需要异步处理,就是因为生产者只管发就行了,因此:一轮的消息肯定是很快就发布过去了,就可以做下一轮的事情了,至于上一轮的结果是怎么样的,那就需要等到两个callback回调执行完了之后给结果,而想要能够调取到两个callback回调,那么:就需要对发送的信息进行监听 / 对信道进行监听
而上述牵扯到一个map集合,那么这个集合需要具备如下的条件:
1、首先此集合应是一个安全且有序的,同时还支持高并发
2、其次能够将序列号( key ) 和 消息( value )轻松地进行关联
代码实现

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {// 单个确认发布// singleConfirm();        // 单个确认发布发送这些消息花费4797ms// 批量发布// batchConfirm();         // 批量发布发送的消息共耗时:456msasyncConfirm();             // 异步发布确认耗时:10ms}// 异步发布确认
public static void asyncConfirm() throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");channel.confirmSelect();channel.queueDeclare("async confirm", true, false, false, null);// 1、准备符合条件的mapConcurrentSkipListMap<Long, Object> messagePoolMap = new ConcurrentSkipListMap<>();// 3、对信道channel进行监听// 成功确认发布回调ConfirmCallback ackCallback = (messageTag, multiple) -> {System.out.println("确认发布了消息=====>" + messagePoolMap.headMap(messageTag) );// 4、把确认发布的消息删掉,减少内存开销// 判断是否是批量删除if ( multiple ){// 通过消息标识tag 把 确认发布的消息取出messagePoolMap.headMap(messageTag).clear();/*** 上面这句代码拆分写法*    ConcurrentNavigableMap<Long, Object> confirmed = messagePoolMap.headMap(messageTag);*    confirmed.clear();*/}else {messagePoolMap.remove(messageTag);}};// 没成功发布确认回调ConfirmCallback nackCallback = (messageTag, multiple) -> {System.out.println("未确认的消息是:" + messagePoolMap.get(messageTag) );};// 进行channel监听 这是异步的/*** channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)* 参数1、消息成功发布的回调函数 ackCallback()* 参数2、消息未成功发布的回调函数 nackCallback()*/channel.addConfirmListener( ackCallback,nackCallback );long begin = System.currentTimeMillis();for (int i = 1; i <= 100; i++) {// 2、将要发布的全部信息保存到map中去/*channel.getNextPublishSeqNo() 获取下一次将要发送的消息标识tag*/messagePoolMap.put(channel.getNextPublishSeqNo(),String.valueOf(i) );// 生产者只管发布就行channel.basicPublish("","async confirm",MessageProperties.PERSISTENT_TEXT_PLAIN,String.valueOf(i).getBytes());System.out.println("消息=====>" + i + "发送完毕");}long end = System.currentTimeMillis();System.out.println("异步发布确认耗时:" + ( end-begin ) + "ms" );
}

3.7、交换机
正如前面一开始就画的原理图,交换机的作用就是为了接收生产者发送的消息 并 将消息发送到队列中去

注意点:前面一直玩的那些模式,虽然没有写交换机,但并不是说RabbitMQ就没用交换机【 ps:使用的是""空串,也就是使用了RabbitMQ的默认交换机 】,生产者发送的消息只能发到交换机中,从而由交换机来把消息发给队列

3.7.1、交换机exchange的分类
直接( direct ) / 默认
主题( topic )
标题 ( heanders ) - 这个已经很少用了
扇出( fancut ) / 发布订阅模式

临时队列

所谓的临时队列指的就是:自动帮我们生成队列名 并且 当生产者和队列断开之后,这个队列会被自动删除
所以这么一说:前面玩过的一种就属于临时队列,即:将下面的第四个参数改成true即可【 ps:当然让队列名随机生成就完全匹配了 】
/*
下面这是参数的完整意思,源码中偷懒了,没有见名知意
queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
参数1、队列名字
参数2、是否持久化( 保存到磁盘 ),默认是在内存中的
参数3、是否共享,即:是否只供一个消费者消费,是否让多个消费者共享这个队列中的信息
参数4、是否自动删除,即:最后一个消费者获取信息之后,这个队列是否自动删除
参数5、其他配置项,这涉及到后面的知识,目前选择null
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

而如果要更简单的生成临时队列,那么调用如下的API即可
String queueName = channel.queueDeclare().getQueue();

这样帮我们生成的队列效果就和channel.queueDeclare(QUEUE_NAME, false, false, true, null);是一样的了

3.7.2、fanout扇出 / 发布订阅模式
这玩意儿吧,好比群发,一人发,很多人收到消息,就是原理图的另一种样子,生产者发布的一个消息,可以供多个消费者进行消费

实现方式就是让一个交换机binding绑定多个队列

生产者

package cn.zixieqing.fanout;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class FanoutProducer {

public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");/*** 定义交换机* 参数1、交换机名字* 参数2、交换机类型*/channel.exchangeDeclare("fanoutExchange", BuiltinExchangeType.FANOUT);System.out.println("请输入要发送的内容:");Scanner input = new Scanner(System.in);while (input.hasNext()){String message = input.next();channel.basicPublish("fanoutExchange","", null,message.getBytes(StandardCharsets.UTF_8));System.out.println("消息=====>" + message + "发送完毕");}
}

}

消费者01

package cn.zixieqing.fanout;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class FanoutConsumer01 {

public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");// 绑定队列/*** 参数1、队列名字* 参数2、交换机名字* 参数3、用于绑定的routing key / binding key*/String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, "fanoutExchange", "");System.out.println("01消费者正在接收消息........");channel.basicConsume(queueName,true,(consumerTag,message)->{// 这里面接收到消息之后就可以用来做其他事情了,如:存到磁盘System.out.println("接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));},consumerTage->{});
}

}

消费者02

package cn.zixieqing.fanout;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class FanoutConsumer02 {

public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");// 绑定队列/*** 参数1、队列名字* 参数2、交换机名字* 参数3、用于绑定的routing key / binding key*/String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, "fanoutExchange", "");System.out.println("02消费者正在接收消息........");channel.basicConsume(queueName,true,(consumerTag,message)->{// 这里面接收到消息之后就可以用来做其他事情了,如:存到磁盘System.out.println("接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));},consumerTage->{});
}

}


image

3.7.3、direct交换机 / routing路由模式
这个玩意儿吧就是发布订阅模式,也就是fanout类型交换机的变样板,即:多了一个routing key的配置而已,也就是说:生产者和消费者传输消息就通过routing key进行关联起来,因此:现在就变成了生产者想把消息发给谁就发给谁

生产者

package cn.zixieqing.direct;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class DirectProducer {

public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");channel.exchangeDeclare("directExchange", BuiltinExchangeType.DIRECT);System.out.println("请输入要发送的消息:");Scanner input = new Scanner(System.in);while (input.hasNext()){String message = input.next();/*** 对第二个参数routing key做文章* 假如这里的routing key为zixieqing 那么:就意味着消费者只能是绑定了zixieqing的队列才可以进行接收这里发的消息内容*/channel.basicPublish("directExchange","zixieqing",null,message.getBytes(StandardCharsets.UTF_8));System.out.println("消息=====>" + message + "====>发送完毕");}
}

}

消费者01

package cn.zixieqing.direct;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class DirectConsumer01 {

public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");channel.queueDeclare("direct", false, false, false, null);/*** 队列绑定* 参数1、队列名* 参数2、交换机名字* 参数3、routing key 这里的routing key 就需要和生产者中的一样了,这样才可以通过这个routing key去对应的队列中取消息*/channel.queueBind("direct", "directExchange", "zixieqing");System.out.println("01消费者正在接收消息.......");channel.basicConsume("direct",true,(consumerTag,message)->{System.out.println("01消费者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));},consumerTag->{});
}

}

上面这种,生产者的消息肯定能够被01消费者给消费,因为:他们的交换机名字、队列名字和routing key的值都是相同的

而此时再加一个消费者,让它的routing key值和消费者中的不同
package cn.zixieqing.direct;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class DirectConsumer02 {

public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");channel.queueDeclare("direct", false, false, false, null);/*** 队列绑定* 参数1、队列名* 参数2、交换机名字* 参数3、routing key 这里的routing key 就需要和生产者中的一样了,这样才可以通过这个routing key去对应的队列中取消息*/// 搞点事情:这里的routing key的值zixieqing和生产者的不同channel.queueBind("direct", "directExchange", "xiegongzi");System.out.println("02消费者正在接收消息.......");channel.basicConsume("direct",true,(consumerTag,message)->{System.out.println("02消费者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));},consumerTag->{});
}

}

3.7.4、topic交换机 / topic主题模式 - 使用最广的一个
前面玩的fanout扇出类型的交换机 / 发布订阅模式是一个生产者发布,多个消费者共享消息,和qq群类似;而direct直接交换机 / 路由模式是消费者只能消费和消费者相同routing key的消息
而上述这两种还有局限性,如:现在生产者的routing key为zi.xie.qing,而一个消费者只消费含xie的消息,一个消费者只消费含qing的消息,另一个消费者只消费第一个为zi的零个或无数个单词的消息,甚至还有一个消费者只消费最后一个单词为qing,前面有三个单词的routing key的消息呢?
这样一看,发布订阅模式和路由模式都不能解决,更别说前面玩的简单模式、工作队列模式、发布确认模式了,这些和目前的这个需求更不搭了,因此:就来了这个topic主题模式

topic中routing key的要求

只要交换机类型是topic类型的,那么其routing key就不能乱写,要求:routing key只能是一个单词列表,多个单词之间采用点隔开,如:cn.zixieqing.rabbit
单词列表的长度不能超过255个字节

在routing key的规则列表中有两个替换符可以用
1、* 代表一个单词
2、# 代表零活无数个单词

假如有如下的一个绑定关系图

Q1绑定的是:中间带 orange 带 3 个单词的字符串(.orange.)
Q2绑定的是:
最后一个单词是 rabbit 的 3 个单词(…rabbit)
第一个单词是 lazy 的多个单词(lazy.#)
熟悉一下这种绑定关系( 左为一些routes路由规则,右为能匹配到上图绑定关系的结果 )

quick.orange.rabbit      被队列 Q1Q2 接收到
lazy.orange.elephant        被队列 Q1Q2 接收到
quick.orange.fox            被队列 Q1 接收到
lazy.brown.fox              被队列 Q2 接收到
lazy.pink.rabbit            虽然满足两个绑定,但只被队列 Q2 接收一次
quick.brown.fox             不满足任何绑定关系,不会被任何队列接收到,会被丢弃
quick.orange.male.rabbit    是四个单词,不满足任何绑定关系,会被丢弃
lazy.orange.male.rabbit     虽是四个单词,但匹配 Q2,因:符合lazy.#这个规则

当队列绑定关系是下列这种情况时需要引起注意
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

把上面的绑定关系和测试转换成代码玩一波

生产者

package cn.zixieqing.topic;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class TopicProducer {

public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC);/*** 准备大量的routing key 和 message*/HashMap<String, String> routesAndMessageMap = new HashMap<>();routesAndMessageMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");routesAndMessageMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");routesAndMessageMap.put("quick.orange.fox", "被队列 Q1 接收到");routesAndMessageMap.put("lazy.brown.fox", "被队列 Q2 接收到");routesAndMessageMap.put("lazy.pink.rabbit", "虽然满足两个绑定,但只被队列 Q2 接收一次");routesAndMessageMap.put("quick.brown.fox", "不满足任何绑定关系,不会被任何队列接收到,会被丢弃");routesAndMessageMap.put("quick.orange.male.rabbit", "是四个单词,不满足任何绑定关系,会被丢弃");routesAndMessageMap.put("lazy.orange.male.rabbit ", "虽是四个单词,但匹配 Q2,因:符合lazy.#这个规则");System.out.println("生产者正在发送消息.......");for (Map.Entry<String, String> routesAndMessageEntry : routesAndMessageMap.entrySet()) {String routingKey = routesAndMessageEntry.getKey();String message = routesAndMessageEntry.getValue();channel.basicPublish("topicExchange",routingKey,null,message.getBytes(StandardCharsets.UTF_8));System.out.println("消息====>" + message + "===>发送完毕");}
}

}

消费者01

package cn.zixieqing.topic;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class TopicConsumer01 {

public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC);channel.queueDeclare("Q1", false, false, false, null);channel.queueBind("Q1", "topicExchange", "*.orange.*");System.out.println("消费者01正在接收消息......");channel.basicConsume("Q1",true,(consumerTage,message)->{System.out.println("01消费者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));System.out.println("此条消息的交换机名为:" + message.getEnvelope().getExchange() + ",路由键为:" + message.getEnvelope().getRoutingKey());},consumerTag->{});
}

}

消费者02

package cn.zixieqing.topic;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class TopicConsumer02 {

public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC);channel.queueDeclare("Q2", false, false, false, null);channel.queueBind("Q2", "topicExchange", "*.*.rabbit");channel.queueBind("Q2", "topicExchange", "lazy.#");System.out.println("消费者02正在接收消息......");channel.basicConsume("Q2",true,(consumerTage,message)->{System.out.println("02消费者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));System.out.println("此条消息的交换机名为:" + message.getEnvelope().getExchange() + ",路由键为:" + message.getEnvelope().getRoutingKey());},consumerTag->{});
}

}

3.8.2、队列超过最大长度
3.8.2.1、队列超过所限制的最大个数
意思就是:某一个队列要求只能放N个消息,但是放了N+1个消息,这就超过队列的最大个数了

生产者

就是一个正常的生产者发送消息而已
package cn.zixieqing.dead_letter_queue.queuelength.queuenumber;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Producer {

public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");channel.exchangeDeclare("messageNumber_normal_exchange", BuiltinExchangeType.DIRECT);for (int i = 1; i < 11; i++) {String message = "生产者发送了消息" + i;channel.basicPublish("messageNumber_normal_exchange","zi",null,message.getBytes(StandardCharsets.UTF_8) );System.out.println("消息====>" + message + "====>发送完毕");}
}

}

01消费者

package cn.zixieqing.dead_letter_queue.queuelength.queuenumber;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class Consumer01 {

/*** 正常交换机名称*/
public static final String NORMAL_EXCHANGE = "messageNumber_normal_exchange";/*** 正常队列名称*/
public static final String NORMAL_QUEUE = "messageNumber_queue";/*** 死信交换机名称*/
public static final String DEAD_EXCHANGE = "messageNumber_dead_exchange";/*** 死信队列名称*/
public static final String DEAD_QUEUE = "messageNumber_dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");// 声明正常交换机、死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 死信交换机和死信队列进行绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "xie");// 声明正常队列 并 考虑达到条件时和死信交换机进行联系HashMap<String, Object> params = new HashMap<>();// 死信交换机params.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 死信路由键params.put("x-dead-letter-routing-key", "xie");// 达到队列能接受的最大个数限制就多了如下的配置params.put("x-max-length", 6);channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);// 正常队列和正常交换机进行绑定channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zi");System.out.println("01消费者正在接收消息......");channel.basicConsume(NORMAL_QUEUE,true,(consumeTag,message)->{System.out.println("01消费者接收到了消息:" + new String( message.getBody(), StandardCharsets.UTF_8));},consumeTag->{});
}

}

启动01消费者,然后关掉( 模仿异常 ),最后启动生产者,那么:生产者发送了10个消息,由于01消费者这边做了配置,所以有6个消息是在正常队列中,余下的4个消息就会进入死信队列

3.8.2.2、超过队列能接受消息的最大字节长度
和前面一种相比,在01消费者方做另一个配置即可
params.put(“x-max-length-bytes”, 255);

注意:关于两种情况同时使用的问题

如配置的如下两个
params.put(“x-max-length”, 6);
params.put(“x-max-length-bytes”, 255);

那么先达到哪个上限设置就执行哪个

3.8.3、消息被拒收
注意点:必须开启手动应答
// 第二个参数改成false
channel.basicConsume(NORMAL_QUEUE,false,(consumeTag,message)->{},consumeTag->{});

生产者

package cn.zixieqing.dead_letter_queue.reack;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Producer {

public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");channel.exchangeDeclare("reack_normal_exchange", BuiltinExchangeType.DIRECT);for (int i = 1; i < 11; i++) {String message = "生产者发送的消息" + i;channel.basicPublish("reack_normal_exchange","zixieqing",null,message.getBytes(StandardCharsets.UTF_8));System.out.println("消息===>" + message + "===>发送完毕");}
}

}

消费者

package cn.zixieqing.dead_letter_queue.reack;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class Consumer01 {

public static final String NORMAL_EXCHANGE = "reack_normal_exchange";
public static final String DEAD_EXCHANGE = "reack_dead_exchange";public static final String DEAD_QUEUE = "reack_dead_queue";
public static final String NORMAL_QUEUE = "reack_normal_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");// 声明正常交换机、死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 死信队列绑定死信交换机channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "xie");// 声明正常队列HashMap<String, Object> params = new HashMap<>();params.put("x-dead-letter-exchange", DEAD_EXCHANGE);params.put("x-dead-letter-routing-key", "xie");channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zixieqing");System.out.println("01消费者正在接收消息.....");// 1、注意:需要开启手动应答( 第二个参数为false )channel.basicConsume(NORMAL_QUEUE,false,(consumeTag,message)->{String msg = new String(message.getBody(), StandardCharsets.UTF_8);// 如果发送的消息为:生产者发送的消息5  则:拒收if ( "生产者发送的消息5".equals( msg ) ) {System.out.println("此消息====>" + msg + "===>是拒收的");// 2、做拒收处理 - 注意:第二个参数设为false,表示不再重新入正常队列的队,这样消息才可以进入死信队列channel.basicReject( message.getEnvelope().getDeliveryTag(),false);}else {System.out.println("01消费者接收到了消息=====>" + msg);}},consumeTag->{});
}

}

RabbitMQ 3.9相关推荐

  1. RabbitMQ 入门系列(11)— RabbitMQ 常用的工作模式(simple模式、work模式、publish/subscribe模式、routing模式、topic模式)

    1. simple 模式 simple 模式是最简单最常用的模式 2. work 模式 work 模式有多个消费者 消息产生者将消息放入队列.生产者系统不需知道哪一个任务执行系统在空闲,直接将任务扔到 ...

  2. Go 学习笔记(57)— Go 第三方库之 amqp (RabbitMQ 生产者、消费者整个流程)

    1. 安装 rabbitmq 的 golang 包 golang 可使用库 github.com/streadway/amqp 操作 rabbitmq .使用下面命令安装 RabbitMQ . go ...

  3. RabbitMQ 入门系列(4)— RabbitMQ 启动、停止节点和应用程序、用户管理、权限配置

    1. 服务器管理 我们使用 "节点" 来指代 RabbitMQ 实例,当我们谈到 RabbitMQ 节点时指的是 RabbitMQ 应用程序和其所在的 Erlang 节点. 1.1 ...

  4. RabbitMQ 入门系列(3)— 生产者消费者 Python 代码实现

    生产者消费者代码示例 上一章节中对消息通信概念做了详细的说明,本章节我们对 RabbitMQ 生产者和消费者代码分别做一示例说明. 1. 生产者代码 #!/usr/bin/env python # c ...

  5. RabbitMQ 入门系列(2)— 生产者、消费者、信道、代理、队列、交换器、路由键、绑定、交换器

    本系列是「RabbitMQ实战:高效部署分布式消息队列」和 「RabbitMQ实战指南」书籍的读书笔记. RabbitMQ 中重要概念 1. 生产者 生产者(producer)创建消息,然后发送到代理 ...

  6. RabbitMQ 入门系列(1)— Ubuntu 安装 RabbitMQ 及配置

    1. RabbitMQ 简介 消息 (Message) 是指在应用间传送的数据.消息可以非常简单,比如只包含文本字符串.JSON等,也可以很复杂,比如内嵌对象. 消息队列中间件(Message Que ...

  7. RabbitMQ超详细安装教程(Linux)

    目录 1.简介 2.下载安装启动RabbitMQ 2.1.下载RabbitMQ 2.2.下载Erlang 2.3.安装Erlang 2.4.安装RabbitMQ 2.5.启动RabbitMQ服务 3. ...

  8. 第五节 RabbitMQ在C#端的应用-消息收发

    原文:第五节 RabbitMQ在C#端的应用-消息收发 版权声明:未经本人同意,不得转载该文章,谢谢 https://blog.csdn.net/phocus1/article/details/873 ...

  9. RabbitMQ学习笔记一:本地Windows环境安装RabbitMQ Server

    一:安装RabbitMQ需要先安装Erlang语言开发包,百度网盘地址:http://pan.baidu.com/s/1jH8S2u6.直接下载地址:http://erlang.org/downloa ...

  10. RabbitMQ使用及与spring boot整合

    1.MQ 消息队列(Message Queue,简称MQ)--应用程序和应用程序之间的通信方法 应用:不同进程Process/线程Thread之间通信 比较流行的中间件: ActiveMQ Rabbi ...

最新文章

  1. google ProtoBuf开发者指南
  2. 数字图像处理- 3.4 空间滤波 and 3.5 平滑空间滤波器
  3. java list转成map对象_将List集合中的map对象转为List对象形式--封装类
  4. 程矢Axure夜话:Axure手机原型视频教程之中继器上下滑动加载更多
  5. js获取datagrid行,但是行改变了肿么办?
  6. [Leetcode][第332题][JAVA][重新安排行程][欧拉回路 / 欧拉通路][优先队列][DFS]
  7. 一场关于动态化开发实践的技术探讨
  8. 事件查看器 无法完成应用程序上的操作,接口未知
  9. CSS水平垂直居中布局方案概述
  10. cache控制器取值从TCM/CACHE/FLASH
  11. 考研经验-东南大学软件学院软件工程(这些基础课和专业课的各种坑和复习技巧你应该知道)
  12. 财务软件虚拟服务器,新中大财务软件远程虚拟化办公方案
  13. 开始Python的新手教程
  14. C# 之 扑克游戏 -- 21点规则介绍和代码实现
  15. 支持向量机识别数字集(数据采集+模型训练+预测输出)
  16. 会议论文与期刊论文的写作差异
  17. Tms320F28335中PWM触发ADC16路级联顺序采样
  18. php sql injection,PHP防止SQL Injection
  19. python 写脚本 获取qq好友地理位置_Python获取统计自己的qq群成员信息的方法
  20. 通过一个命令返回上级多层目录的方法

热门文章

  1. 那一年,让我整个人升华的C++ BERT项目
  2. 移动建站工具(二):分秒钟DIY一个移动网站
  3. 计算机复制粘贴教案,《复制与粘贴》教学设计
  4. ASP.NET Core中配置监听URLs的六种方式
  5. Azure SQL 数据库中的DTU和eDTU是什么
  6. Photoshop基础知识——第五章(色彩调整与校正)
  7. 设计模式(行为型)之观察者模式(Observer Pattern)
  8. 我们到底对绿油有多大的误会?用实际案例来解析!
  9. linux下运行dos命令,在Linux系统环境下运行DOS命令详解
  10. HackThisSite(Basic missions level1-11)攻略