目录

1、RabbitMQ 介绍

应用场景

其他消息队列

选择RabbitMQ 原因

2、AMQP消息队列其他相关知识

什么是AMQP?

什么是JMS?

3、RabbitMQ 快速入门

RabbitMQ的工作原理

RabbitMQ 消息发送和接受流程梳理

RabbitMQ 消息发送

RabbitMQ 消息接受

RabbitMQ 安装

RabbitMQ 之Hello World

第一步:创建Maven项目,添加RabbitMQ 消息队列依赖。

第二步:创建RabbitMQ 消息生产者

第三步: 创建RabbitMQ 消息消费者

4、RabbitMQ 支持工作模式

Work queues模式

Publish/subscribe 模式

Routing 路由模式

Topics 模式

Header 模式

RPC 模式

5、RabbitMQ 高级特性

RabbitMQ 消息有效期

默认情况

TTL(消息存活时间)

TTL之单条消息过期

TTL之队列消息过期

六、RabbitMQ 死信队列

RabbitMQ延迟队列

RabbitMQ延迟队列实现延迟队列方式:

RabbitMQ 集成插件实现

基于插件实现延迟队列

基于DXL实现延迟队列

七、RabbitMQ 发送可靠性

RabbitMQ 消息发送机制

RabbitMQ 提供解决方案

RabbitMQ 开启事务

RabbitMQ 发送确认机制

RabbitMQ 失败重试

自带重试机制

业务重试

八、RabbitMQ 消费可靠性

两种消费思路

推(push)方式

拉(pull)方式

确保消费成功思路

消息拒绝

消息确认

消息确认之自动确认

消息确认之手动确认

幂等性问题


1、RabbitMQ 介绍

MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com/

应用场景

1、任务异步处理。
        将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合
        MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

其他消息队列

ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。

选择RabbitMQ 原因

1、使得简单,功能强大。
2、基于AMQP协议。
3、社区活跃,文档完善。
4、高并发性能好,这主要得益于Erlang语言。
5、Spring Boot默认已集成RabbitMQ

2、AMQP消息队列其他相关知识

什么是AMQP?

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

什么是JMS?

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

3、RabbitMQ 快速入门

RabbitMQ的工作原理

组成部分说明如下:
  • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
  • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
  • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

RabbitMQ 消息发送和接受流程梳理

RabbitMQ 消息发送

1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)

RabbitMQ 消息接受

1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。

RabbitMQ 安装

请参考:Docker 安装RabbitMQ

RabbitMQ 之Hello World

第一步:创建Maven项目,添加RabbitMQ 消息队列依赖。

        <!--集成RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

第二步:创建RabbitMQ 消息生产者

package com.zzg.rabbitmq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ProduceMS {//队列名称private static final String QUEUE = "helloworld";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = null;Channel channel = null;ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.43.10");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建与RabbitMQ服务的TCP连接connection = factory.newConnection();//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务channel = connection.createChannel();/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数* */channel.queueDeclare(QUEUE, true, false, false, null);String message = "Hello World RabbitMQ" + System.currentTimeMillis();/*** 消息发布方法 * param1:Exchange的名称,如果没有指定,则使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * param3:消息包含的属性 * param4:消息体 * *//*** 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显 示绑定或解除绑定* 默认的交换机,routingKey等于队列名称 * */channel.basicPublish("", QUEUE, null, message.getBytes());System.out.println("Send Message is:'" + message + "'");if (channel != null) {channel.close();}if (connection != null) {connection.close();}}
}

第三步: 创建RabbitMQ 消息消费者

package com.zzg.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerMS {private static final String QUEUE = "helloworld";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();//设置RabbitMQ所在服务器的ip和端口factory.setHost("192.168.43.10");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明队列 channel.queueDeclare(QUEUE, true, false, false, null);//定义消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume()去指定* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送) * @param properties* @param body* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机 String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();//消息idlong deliveryTag = envelope.getDeliveryTag();//消息内容String msg = new String(body, "utf-8");System.out.println("receive message.." + msg);}};/*** 监听队列String queue, boolean autoAck,Consumer callback * 参数明细 * 1、队列名称* 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 为false则需要手动回复* 3、消费消息的方法,消费者接收到消息后调用此方法 */channel.basicConsume(QUEUE, true, consumer);}
}

4、RabbitMQ 支持工作模式

RabbitMQ有以下几种工作模式 :
  • Work queues
  • Publish/Subscribe
  • Routing
  • Topics
  • Header
  • RPC

Work queues模式

多个消费端消费同一个队列中的消息,队列采用轮询的方式将消息是平均发送给消费者;

缺少图片

特点:

1、一条消息只会被一个消费端接收;

2、队列采用轮询的方式将消息是平均发送给消费者的;

3、消费者在处理完某条消息后,才会收到下一条消息

Work Queues模式之生产者

package com.zzg.rabbitmq.work.queues;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*
生产者核心步骤
1、声明队列
2、创建连接
3、创建通道
4、通道声明队列
5、制定消息
6、发送消息,使用默认交换机
*/
public class WorkQueueProduceMS {//声明队列private static final String QUEUE = "queue";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.43.10");//mq服务ip地址connectionFactory.setPort(5672);//mq client连接端口connectionFactory.setUsername("guest");//mq登录用户名connectionFactory.setPassword("guest");//mq登录密码connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建与RabbitMQ服务的TCP连接connection = connectionFactory.newConnection();//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务channel = connection.createChannel();//通道绑定队列/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE, true, false, false, null);//通道绑定邮件队列for (int i = 0; i < 10; i++) {String message = new String("mq 发送消息。。。");/*** 消息发布方法* param1:Exchange的名称,如果没有指定,则使用Default Exchange* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列* param3:消息包含的属性* param4:消息体* 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定* 默认的交换机,routingKey等于队列名称*///String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish("", QUEUE, null, message.getBytes("utf-8"));System.out.println("mq消息发送成功!");}} catch (Exception e) {e.printStackTrace();} finally {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

Work Queues模式之消费者

package com.zzg.rabbitmq.work.queues;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*
消费者核心步骤
1、声明队列
2、创建连接
3、创建通道
4、通道声明队列
5、重写消息消费方法
6、执行消息方法
*/
public class WorkQueueConsumerMS {private static final String QUEUE = "queue";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.43.10");connectionFactory.setPort(5672);connection = connectionFactory.newConnection();channel = connection.createChannel();//通道绑定队列/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE, true, false, false, null);//通道绑定邮件队列//String consumerTag, Envelope envelope, BasicProperties properties, byte[] bodyDefaultConsumer consumer = new DefaultConsumer(channel) {/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume()去指定* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException* String consumerTag, Envelope envelope, BasicProperties properties, byte[] body*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();envelope.getDeliveryTag();String msg = new String(body, "utf-8");System.out.println("mq收到的消息是:" + msg);}};System.out.println("消费者启动成功!");channel.basicConsume(QUEUE, true, consumer);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}

生产端启动后,控制台打印信息如下:

mq消息发送成功!
mq消息发送成功!
mq消息发送成功!
mq消息发送成功!
mq消息发送成功!
mq消息发送成功!
mq消息发送成功!
mq消息发送成功!
mq消息发送成功!

RabbitMQ中的已有消息:

缺失图片

消费端启动后,控制台打印信息如下:

消费者启动成功!
mq收到的消息是:mq 发送消息。。。
mq收到的消息是:mq 发送消息。。。
mq收到的消息是:mq 发送消息。。。
mq收到的消息是:mq 发送消息。。。
mq收到的消息是:mq 发送消息。。。
mq收到的消息是:mq 发送消息。。。
mq收到的消息是:mq 发送消息。。。
mq收到的消息是:mq 发送消息。。。
mq收到的消息是:mq 发送消息。。。
mq收到的消息是:mq 发送消息。。。

Publish/subscribe 模式

这种模式又称为发布订阅模式,相对于Work queues模式,该模式多了一个交换机,生产端先把消息发送到交换机,再由交换机把消息发送到绑定的队列中,每个绑定的队列都能收到由生产端发送的消息。

缺少图片

特点

1、每个消费者监听自己的队列;

2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息

应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法;

publish/subscribe 模式之生产者

package com.zzg.rabbitmq.publish.subscribe;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产端核心步骤:* <p>* 1、声明队列,声明交换机* <p>* 2、创建连接* <p>* 3、创建通道* <p>* 4、通道声明交换机* <p>* 5、通道声明队列* <p>* 6、通过通道使队列绑定到交换机* <p>* 7、制定消息* <p>* 8、发送消息*/
public class PublicSubProduceMS {//声明两个队列和一个交换机//Publish/subscribe发布订阅模式private static final String QUEUE_EMAIL = "queueEmail";private static final String QUEUE_SMS = "queueSms";private static final String EXCHANGE = "messageChange";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.43.10");//mq服务ip地址connectionFactory.setPort(5672);//mq client连接端口connectionFactory.setUsername("guest");//mq登录用户名connectionFactory.setPassword("guest");//mq登录密码connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建与RabbitMQ服务的TCP连接connection = connectionFactory.newConnection();//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务channel = connection.createChannel();//通道绑定交换机/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*///Publish/subscribe发布订阅模式channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);//通道绑定队列/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列//交换机和队列绑定/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*///Publish/subscribe发布订阅模式channel.queueBind(QUEUE_EMAIL, EXCHANGE, "");channel.queueBind(QUEUE_SMS, EXCHANGE, "");for (int i = 0; i < 10; i++) {String message = new String("mq 发送消息。。。");/*** 消息发布方法* param1:Exchange的名称,如果没有指定,则使用Default Exchange* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列* param3:消息包含的属性* param4:消息体* 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定* 默认的交换机,routingKey等于队列名称*///String exchange, String routingKey, BasicProperties props, byte[] body//Publish/subscribe发布订阅模式channel.basicPublish(EXCHANGE, "", null, message.getBytes());System.out.println("mq消息发送成功!");}} catch (Exception e) {e.printStackTrace();} finally {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

publish/subscribe 模式之消费者(邮件)

package com.zzg.rabbitmq.publish.subscribe;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消费者核心步骤* 1、声明队列,声明交换机* <p>* 2、创建连接* <p>* 3、创建通道* <p>* 4、通道声明交换机* <p>* 5、通道声明队列* <p>* 6、通过通道使队列绑定到交换机* <p>* 7、重写消息消费方法* <p>* 8、执行消息方法*/
public class PublicSubEMailComsumerMS {//Publish/subscribe发布订阅模式private static final String QUEUE_EMAIL = "queueEmail";private static final String EXCHANGE = "messageChange";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.43.10");connectionFactory.setPort(5672);connection = connectionFactory.newConnection();channel = connection.createChannel();//通道绑定交换机/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*///Publish/subscribe发布订阅模式channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);//通道绑定队列/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列//交换机和队列绑定/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*///Publish/subscribe发布订阅模式channel.queueBind(QUEUE_EMAIL, EXCHANGE, "");//String consumerTag, Envelope envelope, BasicProperties properties, byte[] bodyDefaultConsumer consumer = new DefaultConsumer(channel) {/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume()去指定* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException* String consumerTag, Envelope envelope, BasicProperties properties, byte[] body*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();envelope.getDeliveryTag();String msg = new String(body, "utf-8");System.out.println("mq收到的消息是:" + msg);}};System.out.println("消费者启动成功!");channel.basicConsume(QUEUE_EMAIL, true, consumer);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}

publish/subscribe 模式之消费者(短信)

package com.zzg.rabbitmq.publish.subscribe;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消费者核心步骤* 1、声明队列,声明交换机* <p>* 2、创建连接* <p>* 3、创建通道* <p>* 4、通道声明交换机* <p>* 5、通道声明队列* <p>* 6、通过通道使队列绑定到交换机* <p>* 7、重写消息消费方法* <p>* 8、执行消息方法*/
public class PubSubSMSComsumerMS {//Publish/subscribe发布订阅模式private static final String QUEUE_SMS = "queueSms";private static final String EXCHANGE = "messageChange";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.43.10");connectionFactory.setPort(5672);connection = connectionFactory.newConnection();channel = connection.createChannel();//通道绑定交换机/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*///Publish/subscribe发布订阅模式channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);//通道绑定队列/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列//交换机和队列绑定/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*///Publish/subscribe发布订阅模式channel.queueBind(QUEUE_SMS, EXCHANGE, "");DefaultConsumer consumer = new DefaultConsumer(channel) {/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume()去指定* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException* String consumerTag, Envelope envelope, BasicProperties properties, byte[] body*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();envelope.getDeliveryTag();String msg = new String(body, "utf-8");System.out.println("mq收到的消息是:" + msg);}};System.out.println("消费者启动成功!");channel.basicConsume(QUEUE_SMS, true, consumer);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}
publish/subscribe与work queues有什么区别?
区别:
1)work queues不用定义交换机,而publish/subscribe需要定义交换机。
2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认交换机)。
3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实质上work queues会将队列绑定到默认的交换机 。
相同点:
所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
工作中用publish/subscribe还是work queues?
        建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大,并且发布订阅模式可以指定自己专用的交换机。

Routing 路由模式

Routing 模式又称路由模式,该种模式除了要绑定交换机外,发消息的时候还要制定routing key,即路由key,队列通过通道绑定交换机的时候,需要指定自己的routing key,这样,生产端发送消息的时候也会指定routing key,通过routing key就可以把相应的消息发送到绑定相应routing key的队列中去。

缺失图片

特点:

1、每个消费者监听自己的队列,并且设置routingkey;
2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列;

应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法;

routing路由模式之生产者

package com.zzg.rabbitmq.routing;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者核心步骤* 1、声明队列,声明交换机* <p>* 2、创建连接* <p>* 3、创建通道* <p>* 4、通道声明交换机* <p>* 5、通道声明队列* <p>* 6、通过通道使队列绑定到交换机并指定该队列的routingkey* <p>* 7、制定消息* <p>* 8、发送消息并指定routingkey*/
public class RoutingProduceMS {//声明两个队列和一个交换机//Routing 路由模式private static final String QUEUE_EMAIL = "queueEmail";private static final String QUEUE_SMS = "queueSms";private static final String EXCHANGE = "routingMessageChange";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.43.10");//mq服务ip地址connectionFactory.setPort(5672);//mq client连接端口connectionFactory.setUsername("guest");//mq登录用户名connectionFactory.setPassword("guest");//mq登录密码connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建与RabbitMQ服务的TCP连接connection = connectionFactory.newConnection();//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务channel = connection.createChannel();//通道绑定交换机/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*///Routing 路由模式channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);//通道绑定队列/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列//交换机和队列绑定/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*///Routing 路由模式channel.queueBind(QUEUE_EMAIL, EXCHANGE, QUEUE_EMAIL);channel.queueBind(QUEUE_SMS, EXCHANGE, QUEUE_SMS);//给email队列发消息for (int i = 0; i < 10; i++) {String message = new String("mq 发送email消息。。。");/*** 消息发布方法* param1:Exchange的名称,如果没有指定,则使用Default Exchange* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列* param3:消息包含的属性* param4:消息体* 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定* 默认的交换机,routingKey等于队列名称*///String exchange, String routingKey, BasicProperties props, byte[] body//Routing 路由模式channel.basicPublish(EXCHANGE, QUEUE_EMAIL, null, message.getBytes());System.out.println("mq消息发送成功!");}//给sms队列发消息for (int i = 0; i < 10; i++) {String message = new String("mq 发送sms消息。。。");/*** 消息发布方法* param1:Exchange的名称,如果没有指定,则使用Default Exchange* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列* param3:消息包含的属性* param4:消息体* 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定* 默认的交换机,routingKey等于队列名称*///String exchange, String routingKey, BasicProperties props, byte[] body//Routing 路由模式channel.basicPublish(EXCHANGE, QUEUE_SMS, null, message.getBytes());System.out.println("mq消息发送成功!");}} catch (Exception e) {e.printStackTrace();} finally {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

routing路由模式之消费者(邮件)

package com.zzg.rabbitmq.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消费者核心步骤* 1、声明队列,声明交换机* <p>* 2、创建连接* <p>* 3、创建通道* <p>* 4、通道声明交换机* <p>* 5、通道声明队列* <p>* 6、通过通道使队列绑定到交换机并指定routingkey* <p>* 7、重写消息消费方法* <p>* 8、执行消息方法*/
public class RoutingEMailConsumerMS {//Routing 路由模式private static final String QUEUE_EMAIL = "queueEmail";private static final String EXCHANGE = "routingMessageChange";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.43.10");connectionFactory.setPort(5672);connection = connectionFactory.newConnection();channel = connection.createChannel();//通道绑定交换机/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*///Routing 路由模式channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);//通道绑定队列/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列//交换机和队列绑定/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*///Routing 路由模式channel.queueBind(QUEUE_EMAIL, EXCHANGE, QUEUE_EMAIL);//String consumerTag, Envelope envelope, BasicProperties properties, byte[] bodyDefaultConsumer consumer = new DefaultConsumer(channel) {/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume()去指定* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException* String consumerTag, Envelope envelope, BasicProperties properties, byte[] body*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();envelope.getDeliveryTag();String msg = new String(body, "utf-8");System.out.println("mq收到的消息是:" + msg);}};System.out.println("消费者启动成功!");channel.basicConsume(QUEUE_EMAIL, true, consumer);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}

routing路由模式之消费者(短信)

package com.zzg.rabbitmq.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消费者核心步骤* 1、声明队列,声明交换机* <p>* 2、创建连接* <p>* 3、创建通道* <p>* 4、通道声明交换机* <p>* 5、通道声明队列* <p>* 6、通过通道使队列绑定到交换机并指定routingkey* <p>* 7、重写消息消费方法* <p>* 8、执行消息方法*/
public class RoutingSMSConsumerMS {//Routing 路由模式private static final String QUEUE_SMS = "queueSms";private static final String EXCHANGE = "routingMessageChange";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.43.10");connectionFactory.setPort(5672);connection = connectionFactory.newConnection();channel = connection.createChannel();//通道绑定交换机/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*///Routing 路由模式channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);//通道绑定队列/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列//交换机和队列绑定/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*///Routing 路由模式channel.queueBind(QUEUE_SMS, EXCHANGE, QUEUE_SMS);DefaultConsumer consumer = new DefaultConsumer(channel) {/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume()去指定* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException* String consumerTag, Envelope envelope, BasicProperties properties, byte[] body*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();envelope.getDeliveryTag();String msg = new String(body, "utf-8");System.out.println("mq收到的消息是:" + msg);}};System.out.println("消费者启动成功!");channel.basicConsume(QUEUE_SMS, true, consumer);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}
Routing模式和Publish/subscibe有啥区别?
        Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列。

Topics 模式

Topics 模式和Routing 路由模式最大的区别就是,Topics 模式发送消息和消费消息的时候是通过通配符去进行匹配的。

缺失图片

特点:

1、每个消费者监听自己的队列,并且设置带统配符的routingkey

2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列

应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法;

topics模式之生产者

package com.zzg.rabbitmq.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者核心步骤* 1、声明队列,声明交换机* <p>* 2、创建连接* <p>* 3、创建通道* <p>* 4、通道声明交换机* <p>* 5、通道声明队列* <p>* 6、通过通道使队列绑定到交换机并指定该队列的routingkey(通配符)* <p>* 7、制定消息* <p>* 8、发送消息并指定routingkey(通配符)*/
public class TopicProduceMS {//声明两个队列和一个交换机//Topics 模式private static final String QUEUE_EMAIL = "queueEmail";private static final String QUEUE_SMS = "queueSms";private static final String EXCHANGE = "topicMessageChange";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.43.10");//mq服务ip地址connectionFactory.setPort(5672);//mq client连接端口connectionFactory.setUsername("guest");//mq登录用户名connectionFactory.setPassword("guest");//mq登录密码connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建与RabbitMQ服务的TCP连接connection = connectionFactory.newConnection();//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务channel = connection.createChannel();//通道绑定交换机/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*///Topics 模式channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);//通道绑定队列/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列//交换机和队列绑定/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(QUEUE_EMAIL, EXCHANGE, "inform.#.email.#");channel.queueBind(QUEUE_SMS, EXCHANGE, "inform.#.sms.#");//给email队列发消息for (int i = 0; i < 10; i++) {String message = new String("mq 发送email消息。。。");/*** 消息发布方法* param1:Exchange的名称,如果没有指定,则使用Default Exchange* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列* param3:消息包含的属性* param4:消息体* 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定* 默认的交换机,routingKey等于队列名称*///String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish(EXCHANGE, "inform.email", null, message.getBytes());System.out.println("mq email 消息发送成功!");}//给sms队列发消息for (int i = 0; i < 10; i++) {String message = new String("mq 发送sms消息。。。");/*** 消息发布方法* param1:Exchange的名称,如果没有指定,则使用Default Exchange* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列* param3:消息包含的属性* param4:消息体* 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定* 默认的交换机,routingKey等于队列名称*///String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish(EXCHANGE, "inform.sms", null, message.getBytes());System.out.println("mq sms 消息发送成功!");}//给email和sms队列发消息for (int i = 0; i < 10; i++) {String message = new String("mq 发送email sms消息。。。");/*** 消息发布方法* param1:Exchange的名称,如果没有指定,则使用Default Exchange* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列* param3:消息包含的属性* param4:消息体* 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定* 默认的交换机,routingKey等于队列名称*///String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish(EXCHANGE, "inform.email.sms", null, message.getBytes());System.out.println("mq email sms 消息发送成功!");}} catch (Exception e) {e.printStackTrace();} finally {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

topics模式之消费者(邮件)

package com.zzg.rabbitmq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消费者核心步骤*/
public class TopicEMailConsumerMS {private static final String QUEUE_EMAIL = "queueEmail";private static final String EXCHANGE = "topicMessageChange";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.43.10");connectionFactory.setPort(5672);connection = connectionFactory.newConnection();channel = connection.createChannel();//通道绑定交换机/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);//通道绑定队列/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列//交换机和队列绑定/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(QUEUE_EMAIL, EXCHANGE, "inform.#.email.#");//String consumerTag, Envelope envelope, BasicProperties properties, byte[] bodyDefaultConsumer consumer = new DefaultConsumer(channel) {/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume()去指定* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException* String consumerTag, Envelope envelope, BasicProperties properties, byte[] body*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();envelope.getDeliveryTag();String msg = new String(body, "utf-8");System.out.println("mq收到的消息是:" + msg);}};System.out.println("消费者启动成功!");channel.basicConsume(QUEUE_EMAIL, true, consumer);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}

topics模式之消费者(短信)

package com.zzg.rabbitmq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class TopicSMSConsumerMS {private static final String QUEUE_SMS = "queueSms";private static final String EXCHANGE = "topicMessageChange";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.43.10");connectionFactory.setPort(5672);connection = connectionFactory.newConnection();channel = connection.createChannel();//通道绑定交换机/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);//通道绑定队列/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定邮件队列//交换机和队列绑定/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(QUEUE_SMS, EXCHANGE, "inform.#.sms.#");//String consumerTag, Envelope envelope, BasicProperties properties, byte[] bodyDefaultConsumer consumer = new DefaultConsumer(channel) {/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume()去指定* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException* String consumerTag, Envelope envelope, BasicProperties properties, byte[] body*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();envelope.getDeliveryTag();String msg = new String(body, "utf-8");System.out.println("mq收到的消息是:" + msg);}};System.out.println("消费者启动成功!");channel.basicConsume(QUEUE_SMS, true, consumer);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}
使用Routing工作模式能否实现Topic模式?
        使用Routing模式也可以实现本案例Topic模式,共设置三个 routingkey,分别是email、sms、all,email队列绑定email和all,sms队列绑定sms和all,这样就可以实现上边案例的功能,实现过程比topics复杂。
        Topic模式更多加强大,它可以实现Routing、publish/subscirbe模式的功能

Header 模式

header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。

案例:

根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。

根据假设使用场景,需要一个生产端,两个消费端,不同的是,生产端声明交换机时,交换机的类型不同,是headers类型,生产端队列绑定交换机时,不使用routingkey,而是使用header中的 key/value(键值对)匹配队列,发送消息时也是使用header中的 key/value(键值对)匹配队列。

消费端同样是声明交换机时,交换机的类型不同,是headers类型,消费端队列绑定交换机时,不使用routingkey,而是使用header中的 key/value(键值对)匹配队列,消费消息时也是使用header中的 key/value(键值对)匹配队列。

Header模式之生产者

package com.zzg.rabbitmq.header;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class HeaderProduceMS {//声明两个队列和一个交换机//Header 模式private static final String QUEUE_EMAIL = "queueEmail";private static final String QUEUE_SMS = "queueSms";private static final String EXCHANGE = "HeaderMessageChange";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.43.10");//mq服务ip地址connectionFactory.setPort(5672);//mq client连接端口connectionFactory.setUsername("guest");//mq登录用户名connectionFactory.setPassword("guest");//mq登录密码connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建与RabbitMQ服务的TCP连接connection = connectionFactory.newConnection();//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务channel = connection.createChannel();//通道绑定交换机/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*///Header 模式channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);//通道绑定队列/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列//交换机和队列绑定/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key* 4、* String queue, String exchange, String routingKey, Map<String, Object> arguments*/Map<String, Object> headers_email = new Hashtable<String, Object>();headers_email.put("inform_type", "email");Map<String, Object> headers_sms = new Hashtable<String, Object>();headers_sms.put("inform_type", "sms");channel.queueBind(QUEUE_EMAIL, EXCHANGE, "", headers_email);channel.queueBind(QUEUE_SMS, EXCHANGE, "", headers_sms);//给email队列发消息for (int i = 0; i < 10; i++) {String message = new String("mq 发送email消息。。。");Map<String, Object> headers = new Hashtable<String, Object>();headers.put("inform_type", "email");//匹配email通知消费者绑定的header/*** 消息发布方法* param1:Exchange的名称,如果没有指定,则使用Default Exchange* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列* param3:消息包含的属性* param4:消息体* 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定* 默认的交换机,routingKey等于队列名称*///String exchange, String routingKey, BasicProperties props, byte[] bodyAMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();properties.headers(headers);//Email通知channel.basicPublish(EXCHANGE, "", properties.build(), message.getBytes());System.out.println("mq email 消息发送成功!");}//给sms队列发消息for (int i = 0; i < 10; i++) {String message = new String("mq 发送sms消息。。。");Map<String, Object> headers = new Hashtable<String, Object>();headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header/*** 消息发布方法* param1:Exchange的名称,如果没有指定,则使用Default Exchange* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列* param3:消息包含的属性* param4:消息体* 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定* 默认的交换机,routingKey等于队列名称*///String exchange, String routingKey, BasicProperties props, byte[] bodyAMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();properties.headers(headers);//sms通知channel.basicPublish(EXCHANGE, "", properties.build(), message.getBytes());System.out.println("mq sms 消息发送成功!");}} catch (Exception e) {e.printStackTrace();} finally {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

Header模式之消费者(邮件)

package com.zzg.rabbitmq.header;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class HeaderEMailConsumerMS {private static final String QUEUE_EMAIL = "queueEmail";private static final String EXCHANGE = "HeaderMessageChange";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.43.10");connectionFactory.setPort(5672);connection = connectionFactory.newConnection();channel = connection.createChannel();//通道绑定交换机/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);//通道绑定队列/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列//交换机和队列绑定/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key* 4、* String queue, String exchange, String routingKey, Map<String, Object> arguments*/Map<String, Object> headers_email = new Hashtable<String, Object>();headers_email.put("inform_email", "email");channel.queueBind(QUEUE_EMAIL, EXCHANGE, "", headers_email);//String consumerTag, Envelope envelope, BasicProperties properties, byte[] bodyDefaultConsumer consumer = new DefaultConsumer(channel) {/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume()去指定* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException* String consumerTag, Envelope envelope, BasicProperties properties, byte[] body*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();envelope.getDeliveryTag();String msg = new String(body, "utf-8");System.out.println("mq收到的消息是:" + msg);}};System.out.println("消费者启动成功!");channel.basicConsume(QUEUE_EMAIL, true, consumer);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}

Header模式之消费者(短信)

package com.zzg.rabbitmq.header;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class HeaderSMSConsumerMS {private static final String QUEUE_SMS = "queueSms";private static final String EXCHANGE = "HeaderMessageChange";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.43.10");connectionFactory.setPort(5672);connection = connectionFactory.newConnection();channel = connection.createChannel();//通道绑定交换机/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);//通道绑定队列/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments**/channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定邮件队列//交换机和队列绑定/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key* 4、* String queue, String exchange, String routingKey, Map<String, Object> arguments*/Map<String, Object> headers_email = new Hashtable<String, Object>();headers_email.put("inform_email", "sms");channel.queueBind(QUEUE_SMS, EXCHANGE, "", headers_email);//String consumerTag, Envelope envelope, BasicProperties properties, byte[] bodyDefaultConsumer consumer = new DefaultConsumer(channel) {/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume()去指定* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException* String consumerTag, Envelope envelope, BasicProperties properties, byte[] body*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();envelope.getDeliveryTag();String msg = new String(body, "utf-8");System.out.println("mq收到的消息是:" + msg);}};System.out.println("消费者启动成功!");channel.basicConsume(QUEUE_SMS, true, consumer);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}

RPC 模式

缺失图片

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。

3、服务端将RPC方法 的结果发送到RPC响应队列。

4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

5、RabbitMQ 高级特性

RabbitMQ 消息有效期

大家有没有设想过RabbitMQ 消息长时间没有处理,消息是否会过期?,带着上述疑问开始本章节的学习。

默认情况

默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直存储在队列中。

TTL(消息存活时间)

TTL(Time-To-Live),消息存活的时间,即消息的有效期。如果我们希望消息能够有一个存活时间,那么我们可以通过设置 TTL 来实现这一需求。如果消息的存活时间超过了 TTL 并且还没有被消息,此时消息就会变成死信,关于死信以及死信队列,我们在下一个章节会说到。

TTL 设置方式

  1. 在声明队列的时候,我们可以在队列属性中设置消息的有效期,这样所有进入该队列的消息都会有一个相同的有效期。

  2. 在发送消息的时候设置消息的有效期,这样不同的消息就具有不同的有效期。

思考:如果队列声明了过期时间且消息也设置过期时间,以谁的为准?

以时间短为准。

TTL之单条消息过期

第一步:创建Maven项目,添加Web 和RabbitMQ jar包依赖

  <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

第二步:添加RabbitMQ 配置对象

package com.zzg.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class QueueConfig {public static final String TTL_QUEUE_DEMO = "ttl_queue_demo";public static final String TTL_EXCHANGE_DEMO = "ttl_exchange_demo";public static final String TTL_ROUTING_KEY = "ttl_routing_key";@BeanQueue queue() {return new Queue(TTL_QUEUE_DEMO, true, false, false);}@BeanDirectExchange directExchange() {return new DirectExchange(TTL_EXCHANGE_DEMO, true, false);}@BeanBinding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with(TTL_ROUTING_KEY);}
}

此配置对象主要解决:

  1. 首先配置一个消息队列,new 一个 Queue:第一个参数是消息队列的名字;第二个参数表示消息是否持久化;第三个参数表示消息队列是否排他,一般我们都是设置为 false,即不排他;第四个参数表示如果该队列没有任何订阅的消费者的话,该队列会被自动删除,一般适用于临时队列。

  2. 配置一个 DirectExchange 交换机。

  3. 将交换机和队列绑定到一起。

第三步:添加消息过期Controller ,通过PostMan 工具模拟请求

package com.zzg.controller;import com.zzg.config.QueueConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/ttl")
public class MessageTTLController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/message")public void hello() {Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())// 设置消息过期时间.setExpiration("10000").build();rabbitTemplate.convertAndSend(QueueConfig.TTL_QUEUE_DEMO, message);}}

TTL之队列消息过期

仅仅需要修改RabbitMQ 配置对象和消息过期Controller

第一:修改RabbitMQ 配置对象,设置队列过期 时间

  @BeanQueue queue() {Map<String, Object> args = new HashMap<>();// 消息队列设置过期时间args.put("x-message-ttl", 10000);return new Queue(TTL_QUEUE_DEMO, true, false, false, args);}

其他都不需要修改,直接复用。

第二步:修改消息过期Controller,移除单条消息过期时间

package com.zzg.controller;import com.zzg.config.QueueConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/ttl")
public class MessageTTLController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/message")public void hello() {
//        Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
//                // 设置消息过期时间
//                .setExpiration("10000")
//                .build();Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes()).build();rabbitTemplate.convertAndSend(QueueConfig.TTL_QUEUE_DEMO, message);}}

思考:如果将消息过期时间设置为0,这表示消息不能立马消费则立即被丢掉。

六、RabbitMQ 死信队列

死信交换机,Dead-Letter-Exchange 即 DLX。

死信交换机用来接收死信消息(Dead Message)的,那什么是死信消息呢?一般消息变成死信消息有如下几种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false

  • 消息过期

  • 队列达到最大长度

当消息在一个队列中变成了死信消息后,此时就会被发送到 DLX,绑定 DLX 的消息队列则称为死信队列。

DLX 本质上也是一个普普通通的交换机,我们可以为任意队列指定 DLX,当该队列中存在死信时,RabbitMQ 就会自动的将这个死信发布到 DLX 上去,进而被路由到另一个绑定了 DLX 的队列上(即死信队列)。

死信队列实践

第一步:创建Maven项目,添加Web 和RabbitMQ jar包依赖

  <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

第二步:添加RabbitMQ 配置对象 (普通队列和死信队列)

package com.zzg.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 普通消息队列配置*/
@Configuration
public class RabbitMQConfig {public static final String ROUTING_QUEUE_DEMO = "routing_queue_demos";public static final String ROUTING_EXCHANGE_DEMO = "routing_exchange_demos";public static final String ROUTING_KEY = "routing_keys";@BeanQueue queue() {Map<String, Object> args = new HashMap<>();//设置消息过期时间args.put("x-message-ttl", 0);//设置死信交换机args.put("x-dead-letter-exchange", RabbitMQDSLConfig.DLX_EXCHANGE_NAME);//设置死信 routing_keyargs.put("x-dead-letter-routing-key", RabbitMQDSLConfig.DLX_ROUTING_KEY);return new Queue(ROUTING_QUEUE_DEMO, true, false, false, args);}@BeanDirectExchange directExchange() {return new DirectExchange(ROUTING_EXCHANGE_DEMO, true, false);}@BeanBinding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with(ROUTING_KEY);}
}

温馨提示:为普通消息队列配置死信 队列

@Bean
Queue queue() {Map<String, Object> args = new HashMap<>();//设置消息过期时间args.put("x-message-ttl", 0);//设置死信交换机args.put("x-dead-letter-exchange", RabbitMQDSLConfig.DLX_EXCHANGE_NAME);//设置死信 routing_keyargs.put("x-dead-letter-routing-key", RabbitMQDSLConfig.DLX_ROUTING_KEY);return new Queue(ROUTING_QUEUE_DEMO, true, false, false, args);
}

两个参数:

  • x-dead-letter-exchange:配置死信交换机。

  • x-dead-letter-routing-key:配置死信 routing_key

发送到这个消息队列上的消息,如果发生了 nack、reject 或者过期等问题,就会被发送到 DLX 上,进而进入到与 DLX 绑定的消息队列上。

package com.zzg.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RabbitMQ  死信队列配置*/
@Configuration
public class RabbitMQDSLConfig {public static final String DLX_EXCHANGE_NAME = "dlx_exchange_names";public static final String DLX_QUEUE_NAME = "dlx_queue_names";public static final String DLX_ROUTING_KEY = "dlx_routing_keys";/*** 配置死信交换机** @return*/@BeanDirectExchange dlxDirectExchange() {return new DirectExchange(DLX_EXCHANGE_NAME, true, false);}/*** 配置死信队列* @return*/@BeanQueue dlxQueue() {return new Queue(DLX_QUEUE_NAME);}/*** 绑定死信队列和死信交换机* @return*/@BeanBinding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with(DLX_ROUTING_KEY);}
}

第三步:配置死信队列监听器

package com.zzg.components;import com.zzg.config.RabbitMQDSLConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 死信队列处理组件*/
@Component
public class DSLComponent {@RabbitListener(queues = RabbitMQDSLConfig.DLX_QUEUE_NAME)public void handler(String message){System.out.println("dlx msg = " + message);}
}

第四步:编写Controller,实现死信队列功能

package com.zzg.controller;import com.zzg.config.RabbitMQConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/ttl")
public class MessageTTLController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/message")public void hello() {Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_QUEUE_DEMO, message);}
}

RabbitMQ延迟队列

延迟队列场景:

  • 在电商项目中,当我们下单之后,一般需要 20 分钟之内或者 30 分钟之内付款,否则订单就会进入异常处理逻辑中,被取消,那么进入到异常处理逻辑中,就可以当成是一个延迟队列。
  • 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。

RabbitMQ延迟队列实现延迟队列方式:

  • 利用 RabbitMQ 自带的消息过期和私信队列机制,实现定时任务。

  • 使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来实现定时任务,这种方案较简单。

RabbitMQ 集成插件实现

我们需要下载 rabbitmq_delayed_message_exchange 插件,这是一个 GitHub 上的开源项目,我们直接下载即可。

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

选择适合自己的版本,我这里选择3.9.0 版。

下载完成后,通过SFTP 上传CentOS服务器,再将此文件拷贝至RabbitMQ 容器中。

docker cp /home/rabbitmq_delayed_message_exchange-3.9.0.ez 4b0032:/plugins

第一个参数是宿主机上的文件地址,第二个参数是拷贝到容器的位置

再执行如下命令进入到 RabbitMQ 容器中:

 docker exec -it 4b0032 /bin/bash

进入到容器之后,执行如下命令启用插件:

 rabbitmq-plugins enable rabbitmq_delayed_message_exchange

启用成功之后通过如下命令查看所有安装的插件:

rabbitmq-plugins list

MobeXterm 操作详细截图:

[root@localhost home]# docker cp /home/rabbitmq_delayed_message_exchange-3.9.0.ez 4b0032:/plugins
[root@localhost home]# docker exec -it 4b0032 /bin/bash
root@4b0032878886:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@4b0032878886:
rabbitmq_delayed_message_exchange
The following plugins have been configured:rabbitmq_delayed_message_exchangerabbitmq_managementrabbitmq_management_agentrabbitmq_prometheusrabbitmq_web_dispatch
Applying plugin configuration to rabbit@4b0032878886...
The following plugins have been enabled:rabbitmq_delayed_message_exchangestarted 1 plugins.
root@4b0032878886:/# rabbitmq-plugins list
Listing plugins with pattern ".*" ...Configured: E = explicitly enabled; e = implicitly enabled| Status: * = running on rabbit@4b0032878886|/
[  ] rabbitmq_amqp1_0                  3.9.11
[  ] rabbitmq_auth_backend_cache       3.9.11
[  ] rabbitmq_auth_backend_http        3.9.11
[  ] rabbitmq_auth_backend_ldap        3.9.11
[  ] rabbitmq_auth_backend_oauth2      3.9.11
[  ] rabbitmq_auth_mechanism_ssl       3.9.11
[  ] rabbitmq_consistent_hash_exchange 3.9.11
[E*] rabbitmq_delayed_message_exchange 3.9.0
[  ] rabbitmq_event_exchange           3.9.11
[  ] rabbitmq_federation               3.9.11
[  ] rabbitmq_federation_management    3.9.11
[  ] rabbitmq_jms_topic_exchange       3.9.11
[E*] rabbitmq_management               3.9.11
[e*] rabbitmq_management_agent         3.9.11
[  ] rabbitmq_mqtt                     3.9.11
[  ] rabbitmq_peer_discovery_aws       3.9.11
[  ] rabbitmq_peer_discovery_common    3.9.11
[  ] rabbitmq_peer_discovery_consul    3.9.11
[  ] rabbitmq_peer_discovery_etcd      3.9.11
[  ] rabbitmq_peer_discovery_k8s       3.9.11
[E*] rabbitmq_prometheus               3.9.11
[  ] rabbitmq_random_exchange          3.9.11
[  ] rabbitmq_recent_history_exchange  3.9.11
[  ] rabbitmq_sharding                 3.9.11
[  ] rabbitmq_shovel                   3.9.11
[  ] rabbitmq_shovel_management        3.9.11
[  ] rabbitmq_stomp                    3.9.11
[  ] rabbitmq_stream                   3.9.11
[  ] rabbitmq_stream_management        3.9.11
[  ] rabbitmq_top                      3.9.11
[  ] rabbitmq_tracing                  3.9.11
[  ] rabbitmq_trust_store              3.9.11
[e*] rabbitmq_web_dispatch             3.9.11
[  ] rabbitmq_web_mqtt                 3.9.11
[  ] rabbitmq_web_mqtt_examples        3.9.11
[  ] rabbitmq_web_stomp                3.9.11
[  ] rabbitmq_web_stomp_examples       3.9.11
root@4b0032878886:/# exit

基于插件实现延迟队列

第一:创建Maven项目,添加Web和RabbitMQ 依赖。

省略...

第二:添加RabbitMQ 配置对象 (延迟队列)

package com.zzg.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 延迟队列配置对象*/
@Configuration
public class RabbitMQDelayedConfig {public static final String QUEUE_NAME = "delay_queue";public static final String EXCHANGE_NAME = "delay_exchange";public static final String EXCHANGE_TYPE = "x-delayed-message";@BeanQueue queue() {return new Queue(QUEUE_NAME, true, false, false);}@BeanCustomExchange customExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(EXCHANGE_NAME, EXCHANGE_TYPE, true, false,args);}@BeanBinding binding() {return BindingBuilder.bind(queue()).to(customExchange()).with(QUEUE_NAME).noargs();}
}

延迟队列使用交换机为CustomExchange,这是一个 Spring 中提供的交换机,创建 CustomExchange 时有五个参数,含义分别如下:

  • 交换机名称。

  • 交换机类型,这个地方是固定的。

  • 交换机是否持久化。

  • 如果没有队列绑定到交换机,交换机是否删除。

  • 其他参数。

最后一个 args 参数中,指定了交换机消息分发的类型,这个类型就是大家熟知的 direct、fanout、topic 以及 header 几种,用了哪种类型,将来交换机分发消息就按哪种方式来。

第三步:创建延迟队列的消费者

package com.zzg.components;import com.zzg.config.RabbitMQDelayedConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 延迟队列处理组件*/
@Component
public class DelayedComponent {@RabbitListener(queues = RabbitMQDelayedConfig.QUEUE_NAME)public void handleMsg(String message) {System.out.println("delayed msg = " + message);}
}

第四步:创建Controller,并且在头部设置消息延迟时间

   @GetMapping("/delayed")public void delayed() {Message message = MessageBuilder.withBody("Delayed RabbitMQ".getBytes())// 消息头中设置消息的延迟时间。.setHeader("x-delay", 3000).build();rabbitTemplate.convertAndSend(RabbitMQDelayedConfig.EXCHANGE_NAME, RabbitMQDelayedConfig.QUEUE_NAME, message);}

基于DXL实现延迟队列

延迟队列实现的思路也很简单,就是我们所说的 DLX(死信交换机)+TTL(消息超时时间)。

功能需求说明: 

假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信 routing_key,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。

 功能代码

第一步:创建Maven项目,添加Web 和RabbitMQ依赖

省略...

第二步: 添加RabbitMQ配置对象(普通队列和死信队列)

package com.zzg.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 普通消息队列配置*/
@Configuration
public class RabbitMQConfig {public static final String ROUTING_QUEUE_DEMO = "routing_queue_demos";public static final String ROUTING_EXCHANGE_DEMO = "routing_exchange_demos";public static final String ROUTING_KEY = "routing_keys";@BeanQueue queue() {Map<String, Object> args = new HashMap<>();//设置消息过期时间args.put("x-message-ttl", 1000 * 3);//设置死信交换机args.put("x-dead-letter-exchange", RabbitMQDSLConfig.DLX_EXCHANGE_NAME);//设置死信 routing_keyargs.put("x-dead-letter-routing-key", RabbitMQDSLConfig.DLX_ROUTING_KEY);return new Queue(ROUTING_QUEUE_DEMO, true, false, false, args);}@BeanDirectExchange directExchange() {return new DirectExchange(ROUTING_EXCHANGE_DEMO, true, false);}@BeanBinding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with(ROUTING_KEY);}
}
package com.zzg.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RabbitMQ  死信队列配置*/
@Configuration
public class RabbitMQDSLConfig {public static final String DLX_EXCHANGE_NAME = "dlx_exchange_names";public static final String DLX_QUEUE_NAME = "dlx_queue_names";public static final String DLX_ROUTING_KEY = "dlx_routing_keys";/*** 配置死信交换机** @return*/@BeanDirectExchange dlxDirectExchange() {return new DirectExchange(DLX_EXCHANGE_NAME, true, false);}/*** 配置死信队列* @return*/@BeanQueue dlxQueue() {return new Queue(DLX_QUEUE_NAME);}/*** 绑定死信队列和死信交换机* @return*/@BeanBinding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with(DLX_ROUTING_KEY);}
}

第三步:为死信队列配置消息监听器

package com.zzg.components;import com.zzg.config.RabbitMQDSLConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 死信队列处理组件*/
@Component
public class DSLComponent {@RabbitListener(queues = RabbitMQDSLConfig.DLX_QUEUE_NAME)public void handler(String message){System.out.println("dlx msg = " + message);}
}

第四步:添加Controller, 触发消息发送

    @GetMapping("/message")public void hello() {Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes()).build();rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_QUEUE_DEMO, message);}

控制台输出:

2022-08-02 14:30:25.100  INFO 6960 --- [nio-8087-exec-3] o.s.web.servlet.DispatcherServlet        : Completed initialization in 4 ms
dlx msg = Hello RabbitMQ

七、RabbitMQ 发送可靠性

温馨提示:本章节内容主要讨论如何确保消息生产者将消息发送成功,并不涉及消息消费的问题。

RabbitMQ 消息发送机制

RabbitMQ 中的消息发送引入了 Exchange(交换机)的概念,消息的发送首先到达交换机上,然后再根据既定的路由规则,由交换机将消息路由到不同的 Queue(队列)中,再由不同的消费者去消费。

要确保消息发送的可靠性,主要从两方面去确认:

  1. 消息成功到达 Exchange

  2. 消息成功到达 Queue

如果能确认这两步,那么我们就可以认为消息发送成功了。

如果这两步中任一步骤出现问题,那么消息就没有成功送达,此时我们可能要通过重试等方式去重新发送消息,多次重试之后,如果消息还是不能到达,则可能就需要人工介入了。

经过上面的分析,我们可以确认,要确保消息成功发送,我们只需要做好三件事就可以了:

  1. 确认消息到达 Exchange。

  2. 确认消息到达 Queue。

  3. 开启定时任务,定时投递那些发送失败的消息。

RabbitMQ 提供解决方案

保证消息的成功发送的三个步骤,前两个RabbitQ 提供了对应的解决办法,第三个步骤需要我们自己实现。

如何确保消息成功到达 RabbitMQ?RabbitMQ 给出了两种方案:

  1. 开启事务机制

  2. 发送方确认机制

温馨提示

两种不同的方案,不可以同时开启,只能选择其中之一,如果两者同时开启,则会报如下错误:

RabbitMQ 开启事务

第一步:创建Maven项目,添加Web 和RabbitMQ依赖

省略...

第二步: 添加RabbitMQ配置对象(普通队列+ 事务)

package com.zzg.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 普通消息队列配置*/
@Configuration
public class RabbitMQConfig {public static final String ROUTING_QUEUE_DEMO = "routing_queue";public static final String ROUTING_EXCHANGE_DEMO = "routing_exchange";public static final String ROUTING_KEY = "routing_key_transactional";@BeanQueue queue() {return new Queue(ROUTING_QUEUE_DEMO, true, false, false);}@BeanDirectExchange directExchange() {return new DirectExchange(ROUTING_EXCHANGE_DEMO, true, false);}@BeanBinding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with(ROUTING_KEY);}@BeanRabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}
}

第三步:在消息生产者上面做两件事:添加事务注解并设置通信信道为事务模式:

package com.zzg.service;import com.zzg.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
public class MessageServiceImpl {@AutowiredRabbitTemplate rabbitTemplate;@Transactionalpublic void send(){rabbitTemplate.setChannelTransacted(true);rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_EXCHANGE_DEMO,RabbitMQConfig.ROUTING_QUEUE_DEMO,"transactional Rabbitmq!".getBytes());int i = 1 / 0;}}

这里注意两点:

  1. 发送消息的方法上添加 @Transactional 注解标记事务。

  2. 调用 setChannelTransacted 方法设置为 true 开启事务模式。

这就 OK 了。

在上面的案例中,我们在结尾来了个 1/0 ,这在运行时必然抛出异常,我们可以尝试运行该方法,发现消息并未发送成功。

当我们开启事务模式之后,RabbitMQ 生产者发送消息会多出四个步骤:

  1. 客户端发出请求,将信道设置为事务模式。

  2. 服务端给出回复,同意将信道设置为事务模式。

  3. 客户端发送消息。

  4. 客户端提交事务。

  5. 服务端给出响应,确认事务提交。

上面的步骤,除了第三步是本来就有的,其他几个步骤都是平白无故多出来的。所以大家看到,事务模式其实效率有点低,这并非一个最佳解决方案。我们可以想想,什么项目会用到消息中间件?一般来说都是一些高并发的项目,这个时候并发性能尤为重要。

所以,RabbitMQ 还提供了发送方确认机制(publisher confirm)来确保消息发送成功,这种方式,性能要远远高于事务模式,一起来看下。

RabbitMQ 发送确认机制

移除刚刚关于事务的代码,然后在 application.yml中配置开启消息发送方确认机制。

spring:rabbitmq:host: 192.168.43.10port: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: correlatedpublisher-returns: true

publisher-confirm-type:配置消息到达交换器的确认回调,publisher-returns:配置消息到达队列的回调。

publisher-confirm-type属性的配置有三个取值:

  1. none:表示禁用发布确认模式,默认即此。

  2. correlated:表示成功发布消息到交换器后会触发的回调方法。

  3. simple:类似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的调用。

RabbitMQ配置对象,开启两个监听

package com.zzg.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;/*** 普通消息队列配置*/
@Configuration
public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {public static final String ROUTING_QUEUE_DEMO = "routing_queue";public static final String ROUTING_EXCHANGE_DEMO = "routing_exchange";public static final String ROUTING_KEY = "routing_key_transactional";@AutowiredRabbitTemplate rabbitTemplate;@BeanQueue queue() {return new Queue(ROUTING_QUEUE_DEMO, true, false, false);}@BeanDirectExchange directExchange() {return new DirectExchange(ROUTING_EXCHANGE_DEMO, true, false);}@BeanBinding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with(ROUTING_KEY);}@PostConstructpublic void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {if (ack) {System.out.println(correlationData.getId() +":消息成功到达交换器");}else{System.out.println(correlationData.getId() +":消息发送失败");}}@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {System.out.println(message.getMessageProperties().getMessageId() +", 消息未成功路由到队列");}
}

关于这个配置类,我说如下几点:

  1. 定义配置类,实现 RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 两个接口,这两个接口,前者的回调用来确定消息到达交换器,后者则会在消息路由到队列失败时被调用。

  2. 定义 initRabbitTemplate 方法并添加 @PostConstruct 注解,在该方法中为 rabbitTemplate 分别配置这两个 Callback。

这就可以了。

尝试将消息发送到一个不存在的交换机中,代码如下:

    @GetMapping("/message")public void hello() {
//        Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
//                .build();// rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_QUEUE_DEMO, message);// exchange 不存在rabbitTemplate.convertAndSend("RabbitConfig.EXCHANGE_NAME",RabbitMQConfig.ROUTING_QUEUE_DEMO,"Hello RabbitMQ!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));}

注意第一个参数是一个字符串,不是变量,这个交换器并不存在,此时控制台会报如下错误:

2022-08-02 15:33:26.051 ERROR 12100 --- [.168.43.10:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'RabbitConfig.EXCHANGE_NAME' in vhost '/', class-id=60, method-id=40)
29cd1572-2db9-4236-b985-632597ab210e:消息发送失败

给定一个真实存在的交换器,但是给一个不存在的队列,像下面这样:

    @GetMapping("/message")public void hello() {
//        Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
//                .build();// rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_QUEUE_DEMO, message);// exchange 不存在//rabbitTemplate.convertAndSend("RabbitConfig.EXCHANGE_NAME",RabbitMQConfig.ROUTING_QUEUE_DEMO,"Hello RabbitMQ!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));// queue 不存在rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_EXCHANGE_DEMO,"RabbitMQConfig.ROUTING_QUEUE_DEMO","Hello RabbitMQ!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));}

控制台输出错误信息:

2022-08-02 15:36:07.112  INFO 1536 --- [nio-8087-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 3 ms
null, 消息未成功路由到队列
a1906bf2-c4e9-44b5-adc6-d2e3f9d95728:消息成功到达交换器

RabbitMQ 失败重试

失败重试分两种情况,一种是压根没找到 MQ 导致的失败重试,另一种是找到 MQ 了,但是消息发送失败了。

自带重试机制

事务机制和发送方确认机制,都是发送方确认消息发送成功的办法。如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,但是这个重试机制就和 MQ 本身没有关系了,这是利用 Spring 中的 retry 机制来完成的,具体配置如下:

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000ms
spring.rabbitmq.template.retry.max-attempts=10
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=2

从上往下配置含义依次是:

  • 开启重试机制。

  • 重试起始间隔时间。

  • 最大重试次数。

  • 最大重试间隔时间。

  • 间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)

配置完成后,再次启动 Spring Boot 项目,然后关掉 MQ,此时尝试发送消息,就会发送失败,进而导致自动重试。

业务重试

业务重试主要是针对消息没有到达交换器的情况。

如果消息没有成功到达交换器,根据我们上面的讲解,此时就会触发消息发送失败回调,在这个回调中,我们就可以做文章了!

整体思路是这样:

  1. 首先创建一张表,用来记录发送到中间件上的消息,像下面这样:

每次发送消息的时候,就往数据库中添加一条记录。这里的字段都很好理解,有三个我额外说下:

  • status:表示消息的状态,有三个取值,0,1,2 分别表示消息发送中、消息发送成功以及消息发送失败。

  • tryTime:表示消息的第一次重试时间(消息发出去之后,在 tryTime 这个时间点还未显示发送成功,此时就可以开始重试了)。

  • count:表示消息重试次数。

其他字段都很好理解,我就不一一啰嗦了。

  1. 在消息发送的时候,我们就往该表中保存一条消息发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后。

  2. 在 confirm 回调方法中,如果收到消息发送成功的回调,就将该条消息的 status 设置为1(在消息发送时为消息设置 msgId,在消息发送成功回调时,通过 msgId 来唯一锁定该条消息)。

  3. 另外开启一个定时任务,定时任务每隔 10s 就去数据库中捞一次消息,专门去捞那些 status 为 0 并且已经过了 tryTime 时间记录,把这些消息拎出来后,首先判断其重试次数是否已超过 3 次,如果超过 3 次,则修改该条消息的 status 为 2,表示这条消息发送失败,并且不再重试。对于重试次数没有超过 3 次的记录,则重新去发送消息,并且为其 count 的值+1。

当然这种思路有两个弊端:

  1. 去数据库走一遭,可能拖慢 MQ 的 Qos,不过有的时候我们并不需要 MQ 有很高的 Qos,所以这个应用时要看具体情况。

  2. 按照上面的思路,可能会出现同一条消息重复发送的情况,不过这都不是事,我们在消息消费时,解决好幂等性问题就行了。

当然,大家也要注意,消息是否要确保 100% 发送成功,也要看具体情况。

八、RabbitMQ 消费可靠性

本章节主要讨论:如何确保消息消费成功,并且确保幂等。

两种消费思路

RabbitMQ 的消息消费,整体上来说有两种不同的思路:

  • 推(push):MQ 主动将消息推送给消费者,这种方式需要消费者设置一个缓冲区去缓存消息,对于消费者而言,内存中总是有一堆需要处理的消息,所以这种方式的效率比较高,这也是目前大多数应用采用的消费方式。

  • 拉(pull):消费者主动从 MQ 拉取消息,这种方式效率并不是很高,不过有的时候如果服务端需要批量拉取消息,倒是可以采用这种方式。

推(push)方式

这种方式大家比较常见,就是通过 @RabbitListener 注解去标记消费者,如下:

package com.zzg.components;import com.zzg.config.RabbitMQDelayedConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 延迟队列处理组件*/
@Component
public class DelayedComponent {@RabbitListener(queues = RabbitMQDelayedConfig.QUEUE_NAME)public void handleMsg(String message) {System.out.println("delayed msg = " + message);}
}

当监听的队列中有消息时,就会触发该方法。

拉(pull)方式

@Test
public void test01() throws UnsupportedEncodingException {Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.QUEUE_NAME);System.out.println("o = " + new String(((byte[]) o),"UTF-8"));
}

调用 receiveAndConvert 方法,方法参数为队列名称,方法执行完成后,会从 MQ 上拉取一条消息下来,如果该方法返回值为 null,表示该队列上没有消息了。receiveAndConvert 方法有一个重载方法,可以在重载方法中传入一个等待超时时间,例如 3 秒。此时,假设队列中没有消息了,则 receiveAndConvert 方法会阻塞 3 秒,3 秒内如果队列中有了新消息就返回,3 秒后如果队列中还是没有新消息,就返回 null,这个等待超时时间要是不设置的话,默认为 0。

如果需要从消息队列中持续获得消息,就推荐使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可。切忌将拉模式放到一个死循环中,变相的订阅消息,这会严重影响 RabbitMQ 的性能。

确保消费成功思路

为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式。

  • 当 autoAck 为 false 的时候,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除。

  • 当 autoAck 为 true 的时候,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使这些消息并没有到达消费者。

来看一张图:

如上图所示,在 RabbitMQ 的 web 管理页面:

  • Ready 表示待消费的消息数量。

  • Unacked 表示已经发送给消费者但是还没收到消费者 ack 的消息数量。

这是我们可以从 UI 层面观察消息的消费情况确认情况。

当我们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,消费分成了两个部分:

  • 待消费的消息

  • 已经投递给消费者,但是还没有被消费者确认的消息

换句话说,当设置 autoAck 为 false 的时候,消费者就变得非常从容了,它将有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack,此时 RabbitMQ 才会认为这条消息消费成功了。如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。

综上所述,确保消息被成功消费,无非就是手动 Ack 或者自动 Ack,无他。当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息时,解决幂等性问题。

消息拒绝

当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。我们来看下拒绝的方式:

@Component
public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)public void handle(Channel channel, Message message) {//获取消息编号long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//拒绝消息channel.basicReject(deliveryTag, true);} catch (IOException e) {e.printStackTrace();}}
}

消费者收到消息之后,可以选择拒绝消费该条消息,拒绝的步骤分两步:

  1. 获取消息编号 deliveryTag。

  2. 调用 basicReject 方法拒绝消息。

调用 basicReject 方法时,第二个参数是 requeue,即是否重新入队。如果第二个参数为 true,则这条被拒绝的消息会重新进入到消息队列中,等待下一次被消费;如果第二个参数为 false,则这条被拒绝的消息就会被丢掉,不会有新的消费者去消费它了。

需要注意的是,basicReject 方法一次只能拒绝一条消息。

消息确认

消息确认分为自动确认和手动确认

消息确认之自动确认

在 Spring Boot 中,默认情况下,消息消费就是自动确认。

@Component
public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)public void handle2(String msg) {System.out.println("msg = " + msg);int i = 1 / 0;}
}

通过 @Componet 注解将当前类注入到 Spring 容器中,然后通过 @RabbitListener 注解来标记一个消息消费方法,默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费的消息会重新回到队列中等待下一次被消费,如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了。

消息确认之手动确认

手动确认又分为两种:推模式手动确认与拉模式手动确认

推模式手动确认

要开启手动确认,需要我们首先关闭自动确认,关闭方式如下:

spring.rabbitmq.listener.simple.acknowledge-mode=manual

这个配置表示将消息的确认模式改为手动确认。

消费者端代码修改

@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
public void handle3(Message message,Channel channel) {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消息消费的代码写到这里String s = new String(message.getBody());System.out.println("s = " + s);//消费完成后,手动 ackchannel.basicAck(deliveryTag, false);} catch (Exception e) {//手动 nacktry {channel.basicNack(deliveryTag, false, true);} catch (IOException ex) {ex.printStackTrace();}}
}

将消费者要做的事情放到一个 try..catch 代码块中。

如果消息正常消费成功,则执行 basicAck 完成确认。

如果消息消费失败,则执行 basicNack 方法,告诉 RabbitMQ 消息消费失败。

这里涉及到两个方法:

  • basicAck:这个是手动确认消息已经成功消费,该方法有两个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅确认当前消息消费成功,如果为 true,则表示当前消息之前所有未被当前消费者确认的消息都消费成功。

  • basicNack:这个是告诉 RabbitMQ 当前消息未被成功消费,该方法有三个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅拒绝当前消息的消费,如果为 true,则表示拒绝当前消息之前所有未被当前消费者确认的消息;第三个参数 requeue 含义和前面所说的一样,被拒绝的消息是否重新入队。

当 basicNack 中最后一个参数设置为 false 的时候,还涉及到一个死信队列的问题。

拉模式手动确认

拉模式手动 ack 比较麻烦一些,在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法,所以我们得用原生的办法,如下:

public void receive2() {Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);long deliveryTag = 0L;try {GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false);deliveryTag = getResponse.getEnvelope().getDeliveryTag();System.out.println("o = " + new String((getResponse.getBody()), "UTF-8"));channel.basicAck(deliveryTag, false);} catch (IOException e) {try {channel.basicNack(deliveryTag, false, true);} catch (IOException ex) {ex.printStackTrace();}}
}

涉及到的 basicAck 和 basicNack 方法跟前面的一样,我就不再赘述。

幂等性问题

问题场景:消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,此时由于网络断开或者其他原因导致 RabbitMQ 并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条消息删除,当重新建立起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。同时,由于类似的原因,消息在发送的时候,同一条消息也可能会发送两次。种种原因导致我们在消费消息时,一定要处理好幂等性问题。

幂等性问题的处理倒也不难,基本上都是从业务上来处理,我来大概说说思路。

采用 Redis,在消费者消费消息之前,现将消息的 id 放到 Redis 中,存储方式如下:

  • id-0(正在执行业务)

  • id-1(执行业务成功)

如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在(说明之前有人消费过该消息),获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack。

极端情况:第一个消费者在执行业务时,出现了死锁,在 setnx 的基础上,再给 key 设置一个生存时间。生产者,发送消息时,指定 messageId。

RabbitMQ 一文读懂相关推荐

  1. 一文读懂大数据平台——写给大数据开发初学者的话!

     一文读懂大数据平台--写给大数据开发初学者的话! 文|miao君 导读: 第一章:初识Hadoop 第二章:更高效的WordCount 第三章:把别处的数据搞到Hadoop上 第四章:把Hado ...

  2. 腾讯资深架构师干货总结:一文读懂大型分布式系统设计的方方面面

    1.引言 我们常常会听说,某个互联网应用的服务器端系统多么牛逼,比如QQ.微信.淘宝.那么,一个大型互联网应用的服务器端系统,到底牛逼在什么地方?为什么海量的用户访问,会让一个服务器端系统变得更复杂? ...

  3. 一文读懂:Kafka(分布式消息队列)的基础概念,教程

    [提前声明] 文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章 写作不易,转载请注明,谢谢! 代码案例地址: ?https://github.com/Mydreamandrea ...

  4. 即时通讯新手入门:一文读懂什么是Nginx?它能否实现IM的负载均衡?

    本文引用了"蔷薇Nina"的"Nginx 相关介绍(Nginx是什么?能干嘛?)"一文部分内容,感谢作者的无私分享. 1.引言 Nginx(及其衍生产品)是目前 ...

  5. 从实验室走向大众,一文读懂Nanopore测序技术的发展及应用

    关键词/Nanopore测序技术    文/基因慧 随着基因测序技术不断突破,二代测序的发展也将基因检测成本大幅降低.理想的测序方法,是对原始DNA模板进行直接.准确的测序,消除PCR扩增带来的偏差, ...

  6. 一文读懂Faster RCNN

    来源:信息网络工程研究中心本文约7500字,建议阅读10+分钟 本文从四个切入点为你介绍Faster R-CNN网络. 经过R-CNN和Fast RCNN的积淀,Ross B. Girshick在20 ...

  7. 福利 | 一文读懂系列文章精选集发布啦!

    大数据时代已经悄然到来,越来越多的人希望学习一定的数据思维和技能来武装自己,虽然各种介绍大数据技术的文章每天都扑面而来,但纷繁又零散的知识常常让我们不知该从何入手:同时,为了感谢和回馈读者朋友对数据派 ...

  8. ​一文读懂EfficientDet

    一文读懂EfficientDet. 今年年初Google Brain团队在 CVPR 2020 上发布了 EfficientDet目标检测模型, EfficientDet是一系列可扩展的高效的目标检测 ...

  9. 一文读懂序列建模(deeplearning.ai)之序列模型与注意力机制

    https://www.toutiao.com/a6663809864260649485/ 作者:Pulkit Sharma,2019年1月21日 翻译:陈之炎 校对:丁楠雅 本文约11000字,建议 ...

最新文章

  1. LINQ中的延迟查询特性
  2. vue调试工具vue-devtools安装及使用
  3. 为什么银行存款不能按复利计息?
  4. java统计空间占用_JVM —— Java 对象占用空间大小计算
  5. md5 java代码_JAVA简单实现MD5注册登录加密实例代码
  6. ios 平滑移动view_iOS 关于列表上拉(平滑加载数据)自动加载数据的问题
  7. About 日常生活感想
  8. python安装及运行环境_Python 安装及环境搭建
  9. target is busy / device is busy 设备无法取消挂载问题处理
  10. gtp传输java_一种GTP数据包传输方法、相关装置及存储介质与流程
  11. html 网页表格居中,网页中表格如何居中
  12. ES6 模板字符串基本用法
  13. 使用 Ctrl + R 命令反向查找/搜索历史【笔记】
  14. 【自然语言处理-2】word2vec词嵌入算法“男人”+“女人”=“爱情的坟墓”
  15. win服务器系统2012和2016,将 Windows Server 2012 升级到 Windows Server 2016
  16. binwalk有MySQL_linux – 使用binwalk提取所有文件
  17. 用正则表达式将文字转换成表情图片
  18. 中国大学MOOC音乐与健康试题及答案
  19. 程序员们,千万不要接私活!
  20. Zemax-如何导入实体?

热门文章

  1. 使用AWK进行分割字符串以及截取字符串
  2. Mysql高频面试题(后端大数据面试必备)
  3. voip push推送
  4. 借钱不还,还装X——对不起,我爱你
  5. html5在线制作 知乎,html5canvas: 教你实现知乎登录动态粒子背景
  6. 如何在CSDN获得积分
  7. Bitmap 转 BGR
  8. 社群运营中粉丝对于商家的价值是什么?
  9. 微信小程序:实用多功能工具箱微信小程序源码
  10. eml文件是什么格式的文件?怎么打开?怎么导出eml文件?