1. Exchange作用

在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列再将消息以推送或者拉取方式给消费者进行消费。

生产者--(创建消息)-->交换机--(路由键)-->队列--(pull/push)-->消费者

2. Exchange的类型

1)直连交换器: Direct Exchange
直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的

什么是路由键?
每个消息都有一个称为路由键(routing key)的属性,它其实就是一个简单的字符串

直连交换机适用场景?
有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。

直连交换机不适合的场景
直连交换机的routing_key方案非常简单,如果我们希望一条消息发送给多个队列,那么直连交换机就不合适了

2)主题交换机: Topic Exchange(发布/订阅)
RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。

主题交换机的routing_key定义规则:
交换机和队列的binding_key需要采用*.#.*.....的格式,每个部分用.分开,其中:
*表示一个单词
#表示任意数量(零个或多个)单词

示例:

Q1:  *.TT.*
Q2:   TT.#如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到

3)广播交换机: Fanout Exchange
用于广播消息,将发送到Exchange中的消息发送到与该交换器关联的所有队列中。

3. 死信队列

死信队列用于存储没匹配队列的消息,超时没有被处理的消息,如果没有配置死信队列这些消息会被丢弃。即当出现没有匹配的队列的消息,或是超时的消息则将消息转入到死信队列里去,等待重新处理或人工干预。

死信队列的应用场景:

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)

4. 队列参数说明

参数 作用
exchange 交换机名称
type 交换机类型
durable 是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除
autoDelete 是否自动删除,如果没有与之绑定的Queue,直接删除
internal 是否内置的,如果为true,只能通过Exchange到Exchange
arguments 结构化参数

示例:

Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments) throws IOException;

5. 开发示例

准配虚拟机 开启一个Docker 拉取镜像rabbitmq 运行容器

具体步骤:有道云笔记

需要架包

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope>
</dependency>
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope>
</dependency>

配置文件 application 生产与消费者都可用 端口需要改动 还有RabbitMQ服务地址需要改动


server.port=8081
## rabbitmq config
spring.rabbitmq.host=192.168.164.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=xhz
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=my_vhost
## 消费者数量
spring.rabbitmq.listener.simple.concurrency=10
spring.rabbitmq.listener.simple.max-concurrency=10
#消费者每次从队列中获取的消息数量
spring.rabbitmq.listener.simple.prefetch=1
#消费者自动启动
spring.rabbitmq.listener.simple.auto-startup=true
#消费失败,自动重新入队
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#启用发送重试
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000
spring.rabbitmq.template.retry.multiplier=1.0

完成演示图

所有消费消息类 模块

5.1 Direct交换机

1)配置直接交换机,队列,并将直接交换机和该队列绑定。(在RabbitMQConfig类中配置,该类使用了@Configuration注解)

package com.rabbitmq.provider.rabbitmqprovider.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfig {
@Beanpublic Queue directQueue(){return new Queue("zking-direct-queue");
}@Beanpublic DirectExchange directExchange(){return new DirectExchange("zking-direct-exchange");}@Beanpublic Binding directBinding(){return BindingBuilder.bind(directQueue()).to(directExchange()).with("zking-direc");}}

2)编写通过直接交换机发送消息的方法

package com.rabbitmq.provider.rabbitmqprovider.web;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;@RestController
public class SenderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/sendDirect")public String sendDirect(String routing){Map msg=new HashMap<>();msg.put("code",200);msg.put("msg","this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));rabbitTemplate.convertAndSend("zking-direct-exchange",routing,msg);return "direct success";}}

3.测试交换机发送消息

http://localhost:8081/sendDirect?routing=zking-direc

4.消费消息

创建模块

消费消息 我们运行这个项目

package com.zking.rabbitmq.consumer.rabbitmqconsumer.component;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@Slf4j
//queues参数指定的是与直接交换机关联的队列名称
@RabbitListener(queues = "zking-direct-queue")
public class DirecReciewer {@RabbitHandlerpublic void receive(Map msg) {log.info("接收通过直接交换机发送的消息: " + msg);}
}

打印结果

5.2 主题交换机

1) 配置主题交换机,队列,并将主题交换机和该队列绑定。

第一种方式 ---选一种即可

package com.rabbitmq.provider.rabbitmqprovider.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;//@Configuration
public class TopicConfig {/*** 声明Topic类型的交换机,支持序列化,后面队列进行绑定(topic_queue_q1,topic_queue_q2)* @return*/@Bean(name="topicExchange")public Exchange topicExchange() {return ExchangeBuilder.topicExchange("topic_exchange").durable(true).build();}/*** 声明队列,该队列与topic交换机绑定* @return*/@Bean(name="topicQueue1")public Queue topicQueue1() {return QueueBuilder.durable("topic_queue_q1").build();}/*** 声明队列,该队列与topic交换机绑定* @return*/@Bean(name="topicQueue2")public Queue topicQueue2() {return QueueBuilder.durable("topic_queue_q2").build();}/*** 将队列(topic_queue_q1)与topic型交换机进行绑定* @param queue* @param exchange* @return*/@Beanpublic Binding topicBindingQ1(@Qualifier("topicQueue1") Queue queue,@Qualifier("topicExchange") Exchange exchange)  {return BindingBuilder.bind(queue).to(exchange).with("topic.queue.#").noargs();}/*** 将队列(topic_queue_q2)与topic型交换机进行绑定* @param queue* @param exchange* @return*/@Beanpublic Binding topicBindingQ2(@Qualifier("topicQueue2") Queue queue,@Qualifier("topicExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("topic.queue.#").noargs();}
}

测试 发送消息

package com.rabbitmq.provider.rabbitmqprovider;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;@SpringBootTest
class RabbitmqProviderApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid contextLoads() {Map msg=new HashMap<>();msg.put("code",200);msg.put("msg","this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));rabbitTemplate.convertAndSend("topic_exchange","topic.queue.ab",msg);}}

第二种发送消息

package com.rabbitmq.provider.rabbitmqprovider.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TopicConfig1 {@Bean(name="topicQueue1")public Queue topicQueue1() {return QueueBuilder.durable("topic_queue_q1").build();}@Bean(name="topicQueue2")public Queue topicQueue2() {return QueueBuilder.durable("topic_queue_q2").build();}@Beanpublic TopicExchange topicExchange(){return new TopicExchange("topic-exchange");}
@Beanpublic Binding topicBinding1(  @Qualifier("topicQueue1") Queue queue,@Qualifier("topicExchange") TopicExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("person.yy");
}@Beanpublic Binding topicBinding2(  @Qualifier("topicQueue2") Queue queue,@Qualifier("topicExchange") TopicExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("person.*");}}

测试发送消息

package com.rabbitmq.provider.rabbitmqprovider.web;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;@RestController
public class SenderController {@RequestMapping("/sendTopic")public String sendTopic(String routing){Map msg=new HashMap<>();msg.put("code",200);msg.put("msg","this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));rabbitTemplate.convertAndSend("topic-exchange",routing,msg);return "direct success";}}

http://localhost:8081/sendTopic?routing=person.y 只有条件为 y 或者 *

http://localhost:8081/sendTopic?routing=person.yy

消费消息

package com.zking.rabbitmq.consumer.rabbitmqconsumer.component;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Componentpublic class TopicReciewer {@RabbitListener(queues={"topic_queue_q1"})@RabbitHandlerpublic void handler(Map map){System.out.println(map);}@RabbitListener(queues={"topic_queue_q2"})@RabbitHandlerpublic void handler1(Map map){System.out.println(map);}}

5.3 广播交换机 (扇形)

1)配置广播交换机,队列,并将主题交换机和该队列绑定。

package com.rabbitmq.provider.rabbitmqprovider.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {@Beanpublic Queue fanoutQueue1() {return new Queue("fanout-queue1");}@Beanpublic Queue fanoutQueue2() {return new Queue("fanout-queue2");}@Beanpublic Queue fanoutQueue3() {return new Queue("fanout-queue3");}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout-exchange");}@Beanpublic Binding fanoutBInding1(@Qualifier("fanoutQueue1") Queue queue,@Qualifier("fanoutExchange") FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}@Beanpublic Binding fanoutBInding2(@Qualifier("fanoutQueue2") Queue queue,@Qualifier("fanoutExchange") FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}@Beanpublic Binding fanoutBInding3(@Qualifier("fanoutQueue3") Queue queue,@Qualifier("fanoutExchange") FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}
}

生产的队列

向服务器发送消息

package com.rabbitmq.provider.rabbitmqprovider.web;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;@RestController
public class SenderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/sendFanout")public String sendFanout(){Map msg=new HashMap<>();msg.put("code",200);msg.put("msg","this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));rabbitTemplate.convertAndSend("fanout-exchange",null,msg);return "direct success";}
}

发送与消费

消费消息

package com.zking.rabbitmq.consumer.rabbitmqconsumer.component;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class FanoutRecevier {@RabbitListener(queues={"fanout-queue1"})@RabbitHandlerpublic void fanout(Map map){System.out.println(map);}@RabbitListener(queues={"fanout-queue2"})@RabbitHandlerpublic void fanout1(Map map){System.out.println(map);}@RabbitListener(queues={"fanout-queue3"})@RabbitHandlerpublic void fanout2(Map map){System.out.println(map);}
}

RabbitMQ交换机相关推荐

  1. 认识RabbitMQ交换机模型

    认识RabbitMQ交换机模型 原文:认识RabbitMQ交换机模型 前言 RabbitMQ是消息队列中间件(Message Queue Middleware)中一种,工作虽然有用到,但是却没有形成很 ...

  2. Rabbitmq交换机详解

    rabbitmq交换机 1.作用: 接受生产者的消息,然后根据路由键routingKey把消息投递到跟交换机绑定的对应的队列上 2.属性 Name: 交换机的名称 Type: 交换机的类型,direc ...

  3. RabbitMQ交换机类型

    RabbitMQ交换机类型 一.Direct Exchange(直连交换机) 二. Fanout Exchange(扇型交换机) 三.Topic Exchange(主题交换机) 四.Headers E ...

  4. RabbitMQ交换机(Fanout、Direct、Topic)三种模式详解

    一. 交换机 1.1 Exchanges 1.1.1 Exchanges概念 ​ RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列.实际上,通常生产 者甚至都不知道这 ...

  5. RabbitMQ交换机简介

    介绍 RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列.实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中. 相反,生产者只能将消息发送到交换机(exchang ...

  6. 还不知道 RabbitMQ 常用的几种交换机模式?这篇小白都能看懂的 RabbitMQ 交换机模式

    要了解 RabbitMQ 的交换机发布订阅模型,先来了解下 RabbitMQ 消息传递模型的核心思想:生产者从不直接向队列发送任何消息.实际上,通常情况下,生产者甚至根本不知道消息是否会被传递到任何队 ...

  7. RabbitMQ交换机(扇出模式、直接模式)学习笔记

    视频地址 什么是交换机? RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列.实际上,通常生产 者甚至都不知道这些消息传递传递到了哪些队列中. 情况实际上是这样的,生产 ...

  8. RabbitMQ——交换机

    1. 交换机 exchange 不指定交换机直接发送到队列时,多个消费者之间存在的是竞争关系,一个消息只能被一个消费者接收,其他的消费者不能够再次接收:交换机可以绑定多个不同的队列,但是其Routin ...

  9. #rabbitMQ #重复消费 #可靠投递 #延时投递 #rabbitMQ交换机类型#重复消费#消息积压#消息丢失

    exchange类型: 1, direct 指定direct后, 消息会根据你设置的routeing key(路由键), 发送到对应的队列中 1,新建direct交换机 2,添加队列, 并且绑定路由键 ...

最新文章

  1. 基于yolov5的行人检测跟踪与社交距离预测 (pedestrian detection and social distance prediction)
  2. 信息系统项目管理知识--物联网
  3. Java写入大字符串到oracle数据库_java程序如何提高oracle百万级数据的insert效率
  4. canvas特效代码详解(2)
  5. 论文浅尝 | 具有图卷积网络和顺序注意力机制的应用于目标导向的对话系统
  6. CSAPP--整数的表示
  7. 22课时、19大主题,CS 231n进阶版课程视频上线!
  8. are exo exo是什么歌 we_从演出酬劳只有一袋米到万人追捧!EXO在七年中究竟经历了什么?...
  9. ANSYS APDL实例01:动力学分析
  10. 【转载】人工智能发展简史
  11. Ubuntu下安装UE和AirSim并使用UE4.27编译C++项目
  12. 数模(2)——多属性决策模型
  13. Comparable+Comparator+Cloneable接口
  14. 关于部分VPython差异
  15. SwiftUI 中的水平条形图
  16. 【日记】假期正式学习
  17. 把汇编程序翻译成C语言,pic单片机汇编程序翻译成c语言解决办法
  18. spawn-fcgi php-fpm,白话php工作方式:mod_php、mod_fastcgi、php-fpm、spawn-fcgi
  19. c语言typedef结构体_typedef在C中具有结构的示例
  20. python设计程序求10个数去掉最高分和最低分后的平均值_7-6、以下伪代码是体育评分,10个评委进行评分,去掉一个最高分,去掉一个最低分,再求平均值。...

热门文章

  1. python求数列数量积_python中矩阵运算(乘法和数量积)
  2. MM的时尚笔记本(图)
  3. pgsql修改表名和修改字段的操作
  4. Maxscale Keepalived MySQL实现高可用读写分离集群
  5. 合金装备幸存服务器维护时间,合金装备幸存有什么技巧_合金装备生存技巧心得分享_3DM单机...
  6. AC自动机(题目+模板)
  7. KMP+MANACHER题目总结
  8. PPTP拨号过程分析
  9. 虽然隔行如隔山但是还是得改
  10. 超3000亿美元市值被斩,特斯拉暴跌13.57%,美股“黑天鹅”引科技股“竞折腰”...