What : RabbitMQ简介

消息队列实现系统之间的双向解耦,生产者往消息队列中发送消息,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到系统解耦的目的,也大大提高了系统的高可用性和高并发能力。

RabbitMQ的主要优势如下:

1、可靠性(Reliability):使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,既可以将多个Exchange绑定在一起,又可以通过插件机制实现自己的Exchange。

2、消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。高可用(Highly Available Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出现问题的情况下队列仍然可用。

3、多种协议(Multi-Protocol):支持多种消息队列协议,如STOMP、MQTT等。

4、多种语言支持(Many Clients):几乎支持所有常用语言,比如Java、.NET、Ruby等。管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。

5、插件机制(Plugin System):提供了许多插件进行扩展,也可以编辑自己的插件。

AMQP

RabbitMQ的核心概念和消息中间件中非常重要的协议——AMQP协议

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是应用层(计算机网络七层框架之一)协议的开放标准,是为面向消息的中间件设计。基于此协议的客户端可与消息中间件传递消息,从而不受产品、开发语言等条件限制。消息中间件主要用于组件之间的解耦,消息发送者无须知道消息使用者的存在,反之亦然。

与其他消息队列协议不同的是,AMQP中增加了Exchange和Binging角色。生产者把消息发布到Exchange上,消息最终到达队列并被消费者接收;而Binding决定Exchange的消息应该发送到哪个队列。

经典的生产者、消费者场景。

rabbitmq的组件功能

RabbitMQ中有几个非常重要的组件:服务实体(Broker)、虚拟主机(VirtualHost)、交换机(Exchange)、队列(Queue)和绑定(Binging)等

  • 服务实体(Broker):标识消息队列的服务器实体。
  • 虚拟主机(Virtual Host):一个虚拟主机只有一组交换机、队列和绑定,为什么还需要多个虚拟主机呢?很简单,在RabbitMQ中,用户只能在虚拟主机的粒度上进行权限控制。因此,如果需要禁用A组访问B组的交换机/队列/绑定,就必须为A和B分别创建一个虚拟主机,每个RabbitMQ服务器都有一个默认的虚拟主机“/”。
  • 交换机(Exchange):客户端不会直接给服务端发送消息,而是通过交换机转发。交换机用于转发消息,但是它不会进行存储,如果没有消息队列发送到交换机,它就会直接丢弃生成者(Producer)发送过来的消息。
  • 队列(Queue):用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列中,等待消费者连接到这个队列将其取走。
  • 绑定(Binging):也就是交换机需要与队列相互绑定,上图所示就是多对多的关系。

交换机

交换机(Exchange)的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,只是把消息分发给各自的队列。但是我们给交换机发送消息,它怎么知道给哪个消息队列发送呢?这里就要用到RoutingKey和BindingKey。

BindingKey是交换机和消息队列绑定的规则描述。RoutingKey是消息发送时携带的消息路由信息描述。当消息发送到交换机(Exchange)时,通过消息携带的RoutingKey与当前交换机所有绑定的BindingKey进行匹配,如果满足匹配规则,则往BindingKey所绑定的消息队列发送消息,这样就解决了向RabbitMQ发送一次消息,可以分发到不同的消息队列,实现消息路由分发的功能。交换机有Direct、Topic、Headers和Fanout四种消息分发类型。不同的类型在处理绑定到队列方面的行为时会有所不同。

  • Direct:其类型的行为是“先匹配,再发送”,即在绑定时设置一个BindingKey,当消息的RoutingKey匹配队列绑定的BindingKey时,才会被交换机发送到绑定的队列中。
  • Topic:按规则转发消息(最灵活)。支持用“”或“#”的模式进行绑定。“”表示匹配一个单词,“#”表示匹配0个或者多个单词。比如,某消息队列绑定的BindingKey为“*.user.#”时,能够匹配到RoutingKey为usd.user和eur.user.db的消息,但是不匹配user.hello。
  • Headers:设置header attribute参数类型的交换机。根据应用程序消息的特定属性进行匹配,这些消息可能在绑定key中标记为可选或者必选。
  • Fanout:转发消息到所有绑定队列(广播)。将消息广播到所有绑定到它的队列中,而不考虑队列绑定的BindingKey的值。

1、Direct模式

Direct是RabbitMQ默认的交换机模式,也是简单的模式,根据key全字匹配去寻找队列,当消息的RoutingKey为orange时,匹配Q1队列,所以消息被发送到Q1。

2、 Topic模式

Topic是RabbitMQ中使用最多的交换机模式,RoutingKey必须是一串字符,用符号“.”隔开,比如user.msg或者user.order.msg等。


Topic与Direct类似,只是路由匹配上支持通配符,可以使用以下两个通配符:*:表示匹配一个词。#:表示匹配0个或多个词。当消息的RoutingKey为color.orange.msg时,匹配Q1队列,所以消息被发送到Q1。

3、Headers模式

Headers也是根据规则匹配的,相较于Direct和Topic固定地使用RoutingKey与BindingKey的匹配规则来路由消息,Headers是根据发送的消息内容中的headers属性进行匹配的。

息队列绑定的header数据中有一个特殊的键x-match,有all和any两个值

all:表示传送消息的header中的“键-值对”(Key-Value Pair)和交换机的header中的“键-值对”全部匹配,才可以路由到对应的交换机。any:表示传送消息的header中的“键-值对”和交换机的header中的“键-值对”中的任意一个匹配,就可以路由到对应的交换机。

4、Fanout模式

Fanout是消息广播模式,交换机不处理RoutingKey,发送到交换机的消息都会分发到所有绑定的队列上。Fanout模式不需要RoutingKey,只需要提前将交换机与队列进行绑定即可。每个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout模式转发消息是最快的。

Why : RabbitMQ的作用以及场景

1、应用解耦:作为中间件作用是得系统之间耦合型不会很大

2、异步处理:

例如用户注册场景,用户发起注册信息之后写入数据库成功之后在发起注册短信,耗时为串行处理。(1+1=2)

而使用了rabbitMQ之后用户发起注册信息写入数据库与消息队列当中之后返回给用户,做异步处理(1+1<2)

3、流量削峰:引入消息队列做负载处理,可以防止负载过高导致服务器挂掉

How:SpringBoot集成RabbitMQ

1、pom依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.x.x</version>
</dependency>

2、修改配置文件

spring:rabbitmq:host: 127.0.0.1port: 5276username: xxxpassword: xxxvirtual-host: xx(非必须)

3、创建消费者

@Component
public class Consumer {@RabbitHandler//指定队列名称@RabbitListener(queuesToDeclare = @Queue("rabbitmq_queue"))public void process(String message){System.out.println("消费者收到消息:"+message);}
}
/**
Consumer消费者通过@RabbitListener注解创建侦听器端点,绑定rabbitmq_queue队列。
1)@RabbitListener注解提供了@QueueBinding、@Queue、@Exchange等对象,通过这个组合注解配置交换机、绑定路由并且配置监听功能等。
2)@RabbitHandler注解为具体接收的方法。
*/

4、创建生产者

@Component
public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void Producer(){String message = DateUtil.now()+"ShangHai";System.out.println("发送消息"+message);//向哪一个队列发送消息rabbitTemplate.convertAndSend("rabbitmq_queue",message);}
}
/**
RabbitTemplate提供了convertAndSend方法发送消息。convertAndSend方法有routingKey和message两个参数:
1)routingKey为要发送的路由地址。
2)message为具体的消息内容。发送者和接收者的queuename必须一致,不然无法接收。
*/

5、创建单元测试运行

@Test
void zz() throws InterruptedException {producer.Producer();TimeUnit.SECONDS.sleep(1);
}
/**
结果:
发送消息2022-06-09 13:23:20ShangHaitoken:22cef4a225177b9c6ee5fdd793d2bd0b
消费者收到消息:2022-06-09 13:23:20ShangHaitoken:22cef4a225177b9c6ee5fdd793d2bd0b
*/

消息发送模式

1、简单队列模式

简单队列是RabbitMQ中最简单的工作队列模式,也叫点对点模式,即一个消息的生产者对应一个消费者,它包含一个生产者、一个消费者和一个队列。生产者向队列中发送消息,消费者从队列中获取消息并消费。简单队列有3个角色:一个生产者、一个队列和一个消费者,这样理解起来比较简单。下面根据示例来演示简单队列的工作模式。

代码使用与上述相同

2、工作队列模式

除了一对一的简单队列模式,还有一个生产者对多个消费者的工作队列模式,该模式下可以是一个生产者将消息发送到一个队列,该队列对应多个消费者,此时每条消息只会被消费一次,多个消费者循环处理

1、定义生产者

@Component
public class ProducerWork {@AutowiredRabbitTemplate rabbitTemplate;public void ProducerWork(){for (int i = 0 ;i<100 ; i++) {String message = DateUtil.now();rabbitTemplate.convertAndSend("rabbit_mq_worker","no."+i+" producer send "+message);}}
}

2、定义消费者

@Component
public class ConsumerWork {@RabbitHandler@RabbitListener(queuesToDeclare = @Queue("rabbit_mq_worker"))public void consumerWork1(String message){System.out.println("consumer_1 reciver message" + message);}@RabbitHandler@RabbitListener(queuesToDeclare = @Queue("rabbit_mq_worker"))public void consumerWork2(String message){System.out.println("consumer_2 reciver message" + message);}
}

3、结果

/**
consumer_1 reciver messageno.1 producer send 2022-06-09 14:50:33
consumer_2 reciver messageno.0 producer send 2022-06-09 14:50:32
consumer_1 reciver messageno.3 producer send 2022-06-09 14:50:33
consumer_2 reciver messageno.2 producer send 2022-06-09 14:50:33
consumer_2 reciver messageno.4 producer send 2022-06-09 14:50:33
consumer_1 reciver messageno.5 producer send 2022-06-09 14:50:33
可以看到两个消费交替消费
*/

3、路由模式

之前介绍了Direct路由转发模式是“先匹配,再发送”,即在绑定时设置一个BindingKey,当消息的RoutingKey匹配队列绑定的BindingKey时,才会被交换机发送到绑定的队列中

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yZLehkzT-1654768127326)(/Users/ozzo/Library/Application Support/typora-user-images/image-20220609145402813.png)]

在Direct模型下,队列与交换机不能任意绑定,而是要指定一个Bindingkey,消息的发送方在向Exchange发送消息时,也必须指定消息的Routingkey。消息的Routingkey与队列绑定的BindingKey必须完全匹配才进行发送。

1、配置路由规则

package com.practise.rabbitMQ;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 三个队列三个交换机、同时定义交换机类型为direct类型* */
@Configuration
public class DirectExchangeConfig {/*** 队列一* @return*/@Beanpublic Queue directQueueQ1() {return new Queue("direct.Q1");}/*** 队列二* @return*/@Beanpublic Queue directQueueQ2() {return new Queue("direct.Q2");}/*** 队列三* @return*/@Beanpublic Queue directQueueQ3() {return new Queue("direct.Q3");}/*** 定义交换机 direct类型* @return*/@Beanpublic DirectExchange myDirectExchange() {return new DirectExchange("directExchange");}/*** 队列 绑定到交换机 再指定一个路由键* directQueueOne() 会找到上方定义的队列bean* @return*/@Beanpublic Binding DirectExchangeQ1() {return BindingBuilder.bind(directQueueQ1()).to(myDirectExchange()).with("direct.Q1");}/*** 队列 绑定到交换机 再指定一个路由键* @return*/@Beanpublic Binding DirectExchangeQ2() {return BindingBuilder.bind(directQueueQ2()).to(myDirectExchange()).with("direct.Q2");}/*** 队列 绑定到交换机 再指定一个路由键* @return*/@Beanpublic Binding DirectExchangeQ3() {return BindingBuilder.bind(directQueueQ3()).to(myDirectExchange()).with("direct.Q3");}
}
/**
在上面的示例中,首先定义了交换机directExchange,然后分别定义了Q1、Q2、Q3三个队列,最后通过bind(directQueueQ3()).to(myDirectExchange()).with("direct.Q3")方法将3个队列绑定到Direct交换机上。
*/

2、生产者

@Component
public class ConsumerDirect {@RabbitHandler@RabbitListener(queuesToDeclare = @Queue("direct.Q1"))public void processQ1(String message) {System.out.println("direct Receiver Q1: " + message);}@RabbitHandler@RabbitListener(queuesToDeclare = @Queue("direct.Q2"))public void processQ2(String message) {System.out.println("direct Receiver Q2: " + message);}@RabbitHandler@RabbitListener(queuesToDeclare = @Queue("direct.Q3"))public void processQ3(String message) {System.out.println("direct Receiver Q3: " + message);}
}

3、消费者

@Component
public class ProducerDirect {@Autowiredprivate RabbitTemplate rabbitTemplate;public void produce(String routingKey) {String context = "direct msg hello";System.out.println("Direct Sender,routingKey: " + routingKey+",context:"+context);//第一个参数表示绑定到交换机在configuration里面设置过。this.rabbitTemplate.convertAndSend("directExchange",routingKey, context);}
}
/**
在上面的示例中,通过convertAndSend()方法发送消息,传入directExchange、routingKey、context三个参数。
1)directExchange为交换机名称。
2)routingKey为消息的路由键。
3)context为消息的内容。我们看到使用direct路由模式时,传入了具体的routingKey参数。这样RabbitMQ将消息发送到对应的交换机,交换机再通过消息的routingKey匹配队列绑定的bindingKey,从而实现消息路由传递的功能。
*/

4、测试以及测试结果

@SpringBootTest(classes = MyStart.class)
public class test {@Autowiredprivate ProducerDirect producerDirect;@Testvoid zz() throws InterruptedException {producerDirect.produce("direct.Q1");producerDirect.produce("direct.Q2");TimeUnit.SECONDS.sleep(1);}
}
/**
结果:
Direct Sender,routingKey: direct.Q1,context:direct msg hello
Direct Sender,routingKey: direct.Q2,context:direct msg hello
direct Receiver Q2: direct msg hello
direct Receiver Q1: direct msg hello
*/

4、广播模式

Fanout就是熟悉的广播模式或者订阅模式,每个发送到Fanout类型交换机的消息都会分到所有绑定的队列上。Fanout交换机不处理路由键,只是简单地将队列绑定到交换机上,每个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。如图12-15所示,Fanout模式很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout类型转发消息是最快的。

1、定义配置类

@Configuration
public class FanoutConfig {//定义队列@Beanpublic Queue Q1Message() {return new Queue("fanout.Q1");}@Beanpublic Queue Q2Message() {return new Queue("fanout.Q2");}@Beanpublic Queue Q3Message() {return new Queue("fanout.Q3");}//定义交换器@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}//分别进⾏绑定@BeanBinding bindingExchangeQ1(Queue Q1Message, FanoutExchange fanoutExchange) {return BindingBuilder.bind(Q1Message).to(fanoutExchange);}@BeanBinding bindingExchangeQ2(Queue Q2Message, FanoutExchange fanoutExchange) {return BindingBuilder.bind(Q2Message).to(fanoutExchange);}@BeanBinding bindingExchangeQ3(Queue Q3Message, FanoutExchange fanoutExchange) {return BindingBuilder.bind(Q3Message).to(fanoutExchange);}
}

2、定义消费者

@Component
public class FanoutConsumer {@RabbitHandler@RabbitListener(queues = "fanout.Q1")public void processA(String message) {System.out.println("fanout Receiver Q1: " + message);}@RabbitHandler@RabbitListener(queues = "fanout.Q2")public void processB(String message) {System.out.println("fanout Receiver Q2: " + message);}@RabbitHandler@RabbitListener(queues = "fanout.Q3")public void processC(String message) {System.out.println("fanout Receiver Q3: " + message);}
}

3、定义生产者

@Component
public class FanoutProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void produce() {String context = "fanout msg weiz";System.out.println("Fanout Sender : " + context);this.rabbitTemplate.convertAndSend("fanoutExchange","", context);}
}

4、测试

@Autowired
private FanoutProducer fanoutProducer;
@Test
void zz() throws InterruptedException {fanoutProducer.produce();TimeUnit.SECONDS.sleep(1);
}
/**
测试结果
Fanout Sender : fanout msg weiz
fanout Receiver Q1: fanout msg weiz
fanout Receiver Q2: fanout msg weiz
fanout Receiver Q3: fanout msg weiz
*/

5、发布订阅者模式

Topic是RabbitMQ中灵活的一种方式,可以根据路由键绑定不同的队列。Topic类型的Exchange与Direct相比,都可以根据路由键将消息路由到不同的队列。只不过Topic类型的Exchange可以让队列在绑定路由键时使用通配符。

有关通配符的规则为:

#:匹配一个或多个词。

*:只匹配一个词。

1、配置类

@Configuration
public class TopicRabbitConfig {final static String message = "topic.color";final static String message2 = "topic.color.red";final static String message3 = "topic.msg.feedback";//定义队列@Beanpublic Queue queueMessage() {return new Queue(TopicRabbitConfig.message);}@Beanpublic Queue queueMessage2() {return new Queue(TopicRabbitConfig.message2);}@Beanpublic Queue queueMessage3() {return new Queue(TopicRabbitConfig.message3);}//交换器@BeanTopicExchange exchange() {return new TopicExchange("topicExchange");}//将队列和交换器绑定@BeanTopicExchange exchange() {return new TopicExchange("topicExchange");}//将队列和交换器绑定@BeanBinding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {return BindingBuilder.bind(queueMessage).to(exchange).with("topic.#");}@BeanBinding bindingExchangeMessage2(Queue queueMessage2, TopicExchange exchange) {return BindingBuilder.bind(queueMessage2).to(exchange).with("topic.*.*");}@BeanBinding bindingExchangeMessage3(Queue queueMessage3, TopicExchange exchange) {return BindingBuilder.bind(queueMessage3).to(exchange).with("topic.green.*");}
}
/**
然后通过.bind(queueMessage2).to(exchange).with("topic.#")绑定3个队列,
queueMessage绑定topic.#,
queueMessage2绑定topic.*.*,
queueMessage3绑定topic.green.*。
*/

2、定义消费者

@Component
public class TopticConsumer {@RabbitHandler@RabbitListener(queues = "topic.color")public void processA(String message) {System.out.println("topic.color Receiver: " + message);}@RabbitHandler@RabbitListener(queues = "topic.color.red")public void processB(String message) {System.out.println("topic.color.red Receiver: " + message);}@RabbitHandler@RabbitListener(queues = "topic.msg.feedback")public void processC(String message) {System.out.println("topic.msg.feedback Receiver: " + message);}
}

3、定义生产者

@Component
public class TopicProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void produce(String routingKey) {String context = "topic msg weiz";System.out.println("topic Sender,routingKey: " + routingKey+",context:"+context);this.rabbitTemplate.convertAndSend("topicExchange",routingKey, context);}
}

4、测试

//----测试一-----
@Autowired
private TopicProducer topicProducer;
@Test
void zz() throws InterruptedException {topicProducer.produce("topic.*.*");TimeUnit.SECONDS.sleep(1);
}
/**
预期:
final static String message = "topic.color";
final static String message2 = "topic.color.red";
这两个收到消息
结果:
topic Sender,routingKey: topic.*.*,context:topic msg weiz
topic.color Receiver: topic msg weiz
topic.color.red Receiver: topic msg weiz
*/
//---------测试二------------
@Autowiredprivate TopicProducer topicProducer;@Testvoid zz() throws InterruptedException {topicProducer.produce("topic.green.*");TimeUnit.SECONDS.sleep(1);}
/**
预期:全部收到
结果:
topic Sender,routingKey: topic.green.*,context:topic msg weiz
topic.color.red Receiver: topic msg weiz
topic.color Receiver: topic msg weiz
topic.msg.feedback Receiver: topic msg weiz
*/

确认消息机制

虽然使用RabbitMQ可以降低系统的耦合度,提高整个系统的高并发能力,但是也使得业务变得复杂,可能造成消息丢失,导致业务中断的情况。

listener:simple:acknowledge-mode: AUTO
publisher-confirm-type: simple

场景

  • 生产者发送消息到RabbitMQ服务器失败。

  • RabbitMQ服务器自身故障导致消息丢失。

  • 消费者处理消息失败。

针对上面的情况,RabbitMQ提供了多种消息确认机制,确保消息的正常处理,主要有生产者消息确认机制、

Return确认消息消息机制

spring:rabbitmq:listener:
#      开启手动确认模式simple:acknowledge-mode: manual(AUTO、NONE)template:
#      消费者在没找到合适路由的情况下会被Return监听,而不会自动删除mandatory: true
#      开启return机制publisher-returns: true
#    开启确认correlationDatapublisher-confirm-type: correlated(SIMPLE,CORRELATED,NONE;)

1、消费者

@Component
public class Consumer {@RabbitListener(queues = "rabbit_confirm_queue")public void process(Message message, Channel channel) throws IOException, InterruptedException {try {System.out.println("正常收到消息:" + new String(message.getBody()));int i=1/0;channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {System.out.println("消息已重复处理失败,拒绝再次接收");// 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {System.out.println("消息即将再次返回队列处理");// requeue为是否重新回到队列,true重新入队channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}
}

2、生产者

/*** 配置 confirm 机制*/
private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 消息相关的数据,一般用于获取 唯一标识 id* @param b 是否发送成功* @param error 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String error) {if (b) {System.out.println("confirm 消息发送确认成功...消息ID为:" + correlationData.getId());} else {System.out.println("confirm 消息发送确认失败...消息ID为:" + correlationData.getId() + " 失败原因: " + error);}}
};private final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("return 消息退回 exchange: " + exchange + ", routingKey: "+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);}
};/*** 发送消息 参数有:交换机 ,空路由键,消息,并设置一个唯一消息ID*/
public void sendConfirm(String routingKey) {rabbitTemplate.convertAndSend("confirm_direct_exchange",routingKey,"这是一个带confirm的消息",new CorrelationData("" + System.currentTimeMillis()));//使用咱们上方配置的发送回调方法rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);
}

消费端ACK和Nack机制

RabbitMQ消费端的确认机制分为3种:none、manual、auto(默认)。

none:表示没有任何应答会被发送。

manual:表示监听者必须通过调用channel.basicAck()来告知消息被处理。

  • ① channel.basicAck(long,boolean):确认收到消息,消息将从队列中被移除,为false时只确认当前一个消费者收到的消息,为true时确认所有消费者收到的消息。

  • ② channel.basicNack(long,boolean,boolean):确认没有收到消息,第一个boolean表示是一个消费者还是所有的消费者,第二个boolean表示消息是否重新回到队列,为true时表示重新入队。

  • ③ channel.basicReject(long,boolean):拒绝消息,requeue=false表示消息不再重新入队,如果配置了死信队列,则消息进入死信队列。

auto:表示自动应答,除非MessageListener抛出异常,这是默认配置方式。

  • ① 如果消息成功处理,则自动确认。
  • ② 当发生异常抛出AmqpRejectAndDontRequeueException时,则消息会被拒绝且不重新进入队列。
  • ③ 当发生异常抛出ImmediateAcknowledgeAmqpException时,则消费者会被确认。
  • ④ 当抛出其他的异常时,则消息会被拒绝,且requeue=true时会发生死循环,可以通过setDefaultRequeueRejected(默认是true)设置抛弃消息。

修改配置

rabbitmq:listener:simple:acknowledge-mode: manualtemplate:mandatory: true

消费者代码(生产者如上不变)

@RabbitListener(queues = "rabbit_confirm_queue")
public void process(Message message, Channel channel) throws IOException, InterruptedException {try {System.out.println("正常收到消息:" + new String(message.getBody()));int i=1/0;channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {System.out.println("消息已重复处理失败,拒绝再次接收");// 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {System.out.println("消息即将再次返回队列处理");// requeue为是否重新回到队列,true重新入队channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}
}
/**
正常收到消息:这是一个带confirm的消息
消息即将再次返回队列处理
正常收到消息:这是一个带confirm的消息
消息已重复处理失败,拒绝再次接收
*/

rabbbitMQ简单笔记相关推荐

  1. ES6 -- 简单笔记总结

    文章目录 ES6 - 简单笔记总结 JSON 严格模式 箭头函数编写简洁的匿名函数 高阶箭头函数 设置函数的默认参数 rest 操作符 与 函数参数一起使用 spread 运算符展开数组项 使用解构赋 ...

  2. True FFS内核编程(简单笔记)

    True FFS内核编程(简单笔记) 2006-08-22 19:22 True FFS内核编程 1 格式化FLASH 即使FLASH没有和块设备驱动绑定,也可对其进行格式化. tffsDevForm ...

  3. jToken与JArray简单笔记

    jToken与JArray简单笔记 //=============string outhtml = string.Empty;int error = HttpWebHelp.HttpHelp(&quo ...

  4. bigpipe php,php 使用 bigpipe技术 简单笔记

    php 使用 bigpipe技术 简单笔记 php 使用 bigpipe技术 简单笔记 1.配置nginx 关闭proxy_buffering 为 off ,关闭 gzip压缩,  设置 fastcg ...

  5. git clone 一些简单笔记

    自使用了git后,就彻底喜欢上了,深深体会到了自由的感觉,记录一些简单的笔记和使用心得,仅供留迹,以备后查... git clone 命令参数: usage: git clone [options] ...

  6. 【Android】Fragment的简单笔记

    被虐了,做某公司笔试时,发现自己连个Fragment的生命周期都写不详细.平时敲代码,有开发工具的便利,有网上各大神的文章,就算忘了也很容易的可以查到,但当要自己不借助外界,却发现自己似乎对该知识点并 ...

  7. webpack简单笔记

    本文简单记录学习webpack3.0的笔记,已备日后查阅.节省查阅文档时间 安装 可以使用npm安装 //全局安装 npm install -g webpack //安装到项目目录 npm insta ...

  8. 重学JavaSE —— Map、Set、Iterator(迭代器) 简单笔记

    前言 本文是学习笔记,也有配套源码实例,主要是针对本人对Map.Set.Iterator进行的简单学习和笔记总结. 本文源代码已上传至Gitee:https://gitee.com/da-ji/ful ...

  9. egret 白鹭笔记(2020)简单笔记

    目录结构 核心文件夹 .src文件夹,所有项目的源代码都放在这个目录下. Main.ts 为项目入口类,也称文档类. egretProperties.json 是项目的配置文件 resource 目录 ...

最新文章

  1. 《数字质量手册》新书问答
  2. java8学习:用流收集数据
  3. Castle ActiveRecord学习实践(1):快速入门指南
  4. 3.1 神经网络概览-深度学习-Stanford吴恩达教授
  5. windows下安装ElasticSearch的Head插件
  6. rust军用船指令_RUST物品指令清单(英文版)
  7. Git根据文件名字查询修改文件内容
  8. 一次微信小程序的快速开发体验
  9. [学习笔记]状压dp
  10. thymeleaf 复选框回显_Thymeleaf+layui+jquery复选框回显
  11. VS2010 + C#4.0使用 async + await
  12. 虚拟资源拳王公社:最适合上班没时间的副业赚钱项目是什么,简单易操作的副业项目
  13. UDK游戏开发基础命令
  14. Lua游戏开发----游戏搭建
  15. 海南省月降水量分布数据
  16. mysql 转发_【mysql】【转发】my.cnf 讲解
  17. 若依集成yuicompressor实现(CSS/JS压缩)
  18. 创新企业如何“跨越鸿沟”?
  19. 五、python的数据容器(站在前辈们的肩膀上注入自己的理解,强势总结,适合入门,也适合复习)
  20. 解决 Please use the NLTK Downloader to obtain the resource

热门文章

  1. 个人博客构建——github个人博客
  2. CiteSpace关键词共现图谱含义详细解析
  3. 你的智能音箱为什么无所不能?如何创建一个自己的音箱服务
  4. 2020-2021学年——图像图形编程实践实验4_Canny图像边缘检测
  5. 基于MFC的凸多面体的消隐实现
  6. 爱迪生的复仇:直流电的崛起
  7. 解决迅雷下载会卡的问题
  8. 和海草一起学C语言---一看就懂的选择、循环语句函数和数组
  9. Django动态获取mysql连接,django model中的choices 动态从数据库中获取
  10. iphone软件启动画面