前面我们学习了RabbitMQ的核心基础组件,了解了基本消息模型由队列、交换机、路由构成。而在RabbitMQ的核心组件体系中,主要有4种消息模型:基于HeadersExchange、DirectExchange、FanoutExchange、TopicExchange的消息模型;在实际生产环境中应用最广泛的莫过于后3中消息模型。

本篇主要介绍FanoutExchange消息模型。

FanoutExchange消息模型实战

FanoutExchange,顾名思义是交换机的一种,具有广播消息的作用,即当前消息进入交换机这个中转站以后,交换机会检查哪个队列和自己有绑定在一起,找到相关的队列后,将消息发送到绑定的队列中,并最终由队列对应的消费者进行监听消费。

心细的人,可能会发现这里我们没有用到路由。为什么没有说道它呢? 因为,基于FanoutExchange的消息模式具有广播式的作用,纵然为它绑定了路由,也是起不了作用的。所以严格来说,FanoutExchange的模型不能称为真正的消息模型,但是该消费模型中仍然是有交换机、队列和隐形的“路由”,所以在这里我们也将它当做消息模型中的一种。

如图,为基于FanoutExchange的消息模型结构图:

从图可以看到,生产者生产的消息首先进入交换机,并由交换机中转至绑定的N条队列中,其中N>=1, 并最终由队列所绑定的消费者进行监听接收消费处理。

下面以案例说明:将一个实体对象充当消息,然后发送到基于FanoutExchange构成的消息模型中,最终绑定的多条队列对应的消费者进行监听消费接收处理。

1.在自定义注入配置类RabbitmqConfig中创建交换机、多条队列及其绑定

package com.debug.middleware.server.config;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;/**
* @className:
* @PackageName: com.debug.middleware.server.config
* @author: youjp
* @create: 2020-04-06 16:39
* @description:    TODO  RabbitMQ自定义注入配置Bean相关组件
* @Version: 1.0
*/
@Configuration
public class RabbitmqConfig {//定义日志private static final Logger logger=LoggerFactory.getLogger(RabbitmqConfig.class);//自动装配RabbitMQ的链接工厂实例@Autowiredprivate CachingConnectionFactory connectionFactory;//自动装配消息监听器所在的容器工厂配置类实例@Autowiredprivate SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;/*** 下面为单一消费者实例的配置* @return*/@Bean("singleListenerContainer")public SimpleRabbitListenerContainerFactory listenerContainer(){//定义消息监听器所在的容器工厂SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();//设置容器工厂所用的实例factory.setConnectionFactory(connectionFactory);//设置消息在传输中的格式,在这里采用json格式进行传输factory.setMessageConverter(new Jackson2JsonMessageConverter());//设置并发消费者实例的初始数量。在这里为1个factory.setConcurrentConsumers(1);//设置并发消费者实例的最大数量。在这里为1个factory.setMaxConcurrentConsumers(1);//设置并发消费者实例中每个实例拉取到的消息数量-在这里为1个factory.setPrefetchCount(1);return factory;}/*** 多个消费者:主要针对高并发业务场景的配置* @return*/@Bean(name = "multiListenerContainer")public SimpleRabbitListenerContainerFactory multiListenerContainer(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factoryConfigurer.configure(factory,connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAcknowledgeMode(AcknowledgeMode.NONE);factory.setConcurrentConsumers(10);factory.setMaxConcurrentConsumers(15);factory.setPrefetchCount(10);return factory;}/*** RabbitMQ发送消息的操作组件实例* @return*/@Beanpublic RabbitTemplate rabbitTemplate(){//设置发现消息后进行确认connectionFactory.setPublisherConfirms(true);//设置发现消息后返回确认信息connectionFactory.setPublisherReturns(true);//构造发送消息组件的实例对象RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);//发送消息后,如果发送成功,则输出"消息发送成功"的反馈信息rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {public void confirm(CorrelationData correlationData, boolean ack, String cause) {logger.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);}});//发送消息后,如果发送失败,则输出"消息发送失败-消息丢失"的反馈信息rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);}});return rabbitTemplate;}//定义读取配置文件的环境变量实例@Autowiredprivate Environment env;/**  创建消息模型FanoutExchange消息模型**///创建队列1@Bean("fanoutQueueOne")public Queue fanoutQueueOne(){return new Queue(env.getProperty("mq.fanout.queue.one.name"),true);}//创建队列2@Bean("fanoutQueueTwo")public  Queue fanoutQueueTwo(){return new Queue(env.getProperty("mq.fanout.queue.two.name"),true);}//创建交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(env.getProperty("mq.fanout.queue.exchange.name"),true,false);}//创建绑定1@Beanpublic Binding fanoutBindingOne(){return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());}//创建绑定2@Beanpublic Binding fanoutBindingTwo(){return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());}}

application.yml文件配置交换机、队列名称

mq:env: local
#定义广播式消息模型-fanoutExchange:创建队列1,2fanout:queue:one:name:  ${mq.env}.middleware.mq.fanout.one.queuetwo:name:  ${mq.env}.middleware.mq.fanout.two.queueexchange:name:  ${mq.env}.middleware.mq.fanout.exchange

2.创建对象实体信息EventInfo

package com.debug.middleware.server.rabbitmq.entity;import lombok.Data;
import lombok.ToString;import java.io.Serializable;/**
* @ClassName:
* @PackageName: com.debug.middleware.server.rabbitmq.entity
* @author: youjp
* @create: 2020-04-08 20:21
* @description:    TDOO 实体对象信息
* @Version: 1.0
*/
@Data
@ToString
public class EventInfo implements Serializable {private Integer id; //id标识private String module; //模块private String name; //名称private String desc;// 描述public EventInfo(){}public EventInfo(Integer id, String module, String name, String desc) {this.id = id;this.module = module;this.name = name;this.desc = desc;}
}

3.创建对象实体生产者

package com.debug.middleware.server.rabbitmq.publisher;import com.debug.middleware.server.rabbitmq.entity.EventInfo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;/**
* @ClassName:
* @PackageName: com.debug.middleware.server.rabbitmq.publisher
* @author: youjp
* @create: 2020-04-08 20:26
* @description:    TODO 消息生产者
* @Version: 1.0
*/
@Component
public class ModelPublisher {private Logger logger=LoggerFactory.getLogger(ModelPublisher.class);@Autowiredprivate RabbitTemplate rabbitTemplate;//json序列化和反序列化组件@Autowiredprivate ObjectMapper objectMapper;//定义读取配置文件的环境变量实例@Autowiredprivate Environment env;/*** 发送消息* @param eventInfo*/public void sendMsg(EventInfo eventInfo){if(eventInfo!=null){//定义消息的传输格式为jsonrabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());//设置交换机rabbitTemplate.setExchange(env.getProperty("mq.fanout.queue.exchange.name"));//设置消息try {Message msg= MessageBuilder.withBody(objectMapper.writeValueAsBytes(eventInfo)).build();//发送消息rabbitTemplate.convertAndSend(msg);logger.info("消息模型fanoutExchange-生产者-发送消息:{}",msg);} catch (JsonProcessingException e) {logger.error("消息模型fanoutExchange-生产者-发送消息发送异常:{}",eventInfo,e.fillInStackTrace());e.printStackTrace();}}}
}

4.创建对象实体消费者,在该消费者类中,将开放两条队列分别对应的监听消费方法

package com.debug.middleware.server.rabbitmq.consumer;import com.debug.middleware.server.rabbitmq.entity.EventInfo;
import com.debug.middleware.server.rabbitmq.publisher.ModelPublisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.io.IOException;/**
* @ClassName:
* @PackageName: com.debug.middleware.server.rabbitmq.consumer
* @author: youjp
* @create: 2020-04-08 20:41
* @description:    TODO 消息消费者
* @Version: 1.0
*/
@Component
public class ModelConsumer {private Logger logger=LoggerFactory.getLogger(ModelPublisher.class);//json序列化和反序列化组件@Autowiredprivate ObjectMapper objectMapper;/*** @Param [msg]* @return void* @Author youjp* @Description //TODO 第一个队列消费者* @throw**/@RabbitListener(containerFactory = "singleListenerContainer",queues = "${mq.fanout.queue.one.name}")public void consumerFanoutMsgOne(@Payload byte[] msg){try {//监听消息队列中的消息,并进行解析处理EventInfo eventInfo=objectMapper.readValue(msg,EventInfo.class);logger.info("消息模型-fanoutExchange-one-消费者-监听到消息:{}",eventInfo);}catch (IOException e) {logger.error("消息模型-fanoutExchange-one-消费者-监听到消息异常:{}",e.fillInStackTrace());e.printStackTrace();}}/*** @Param [msg]* @return void* @Author youjp* @Description //TODO 第2个队列消费者* @throw**/@RabbitListener(containerFactory = "singleListenerContainer",queues = "${mq.fanout.queue.two.name}")public void consumerFanoutMsgTwo(@Payload byte[] msg){try {//监听消息队列中的消息,并进行解析处理EventInfo eventInfo=objectMapper.readValue(msg,EventInfo.class);logger.info("消息模型-fanoutExchange-two-消费者-监听到消息:{}",eventInfo);}catch (IOException e) {logger.error("消息模型-fanoutExchange-two-消费者-监听到消息异常:{}",e.fillInStackTrace());e.printStackTrace();}}
}

5.编写单元测试,触发生产者生产消息

package com.debug.middleware.server;import com.debug.middleware.server.entity.Student;
import com.debug.middleware.server.rabbitmq.entity.EventInfo;
import com.debug.middleware.server.rabbitmq.publisher.ModelPublisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;/**
* @className:
* @PackageName: com.debug.middleware.server
* @author: youjp
* @create: 2020-04-06 20:58
* @description:    TODO rabbitMQ 的java单元测试
* @Version: 1.0
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RabbitmqBasicTest {//定义日志private static final Logger logger=LoggerFactory.getLogger(RabbitmqBasicTest.class);//定义Json序列化和反序列化实例@Autowiredprivate ObjectMapper objectMapper;//定义fanoutExchange消息模型中的发生消息的生产者@Autowiredprivate ModelPublisher modelPublisher;@Testpublic void testFanout(){EventInfo eventInfo=new EventInfo(1,"基于FanoutExchange模型","测试fanout模块","这是基于FanoutExchange模型");//触发生产者发送消息modelPublisher.sendMsg(eventInfo);}}

运行改测试方法,查看控制台的输出结果。可以看到生产者生产消息并由RabbitTemplate操作组件发送成功,同时由于该FanoutExchange绑定了两条队列,所以两条队列分别对应的消费者将监听接收到相应的消息。

打开浏览器,在地址栏输入http://127.0.0.1:15672,输入账号密码,进入RabbitMQ后端控制台,可查看到相应队列和交换机列表:

点击可查看到该交换机绑定的队列

至此,基于FanoutExchange消息模型的大概使用已经讲解完毕,此消费模型适用于“业务数据需要广播式传输”的场景,比如“用户操作写日志”。

当用户在系统中做了某种操作以后,需要在本身业务系统中将用户的操作内容记入数据库。同时也需要单独将用户的操作内容传输到专门的日子系统中进行存储(以便后续系统,进行日志分析等)。这个时候,可以将用户操作的日志封装为实体对象,并将其序列化后的json数据充当消息,最终采用广播式的交换机,即FanoutExchange消息模型进行发送、接收和处理。

源码下载:

https://gitee.com/yjp245/middleware_study.git

RabbitMQ消息模型之FanoutExchange消息模型实战相关推荐

  1. 消息队列 策略_消息模型:主题和队列有什么区别?

    首发公众号 可以看到,技术圈的风向一直在变,大数据.云的热度已经在慢慢消退,现在当红的是 AI 和 IoT.这些火热的概念,它最终要从论文和 PPT 落地,变成真正能解决问题的系统,否则就是一个空中楼 ...

  2. videojs如何获取请求消息_中通消息平台 Kafka 顺序消费线程模型的实践与优化

    各类消息中间件对顺序消息实现的做法是将具有顺序性的一类消息发往相同的主题分区中,只需要将这类消息设置相同的 Key 即可,而 Kafka 会在任意时刻保证一个消费组同时只能有一个消费者监听消费,因此可 ...

  3. 【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战

    目录 一.绪论 二.生产者 2.1事务机制 2.2confirm模式 串行模式 批量模式 异步模式 三.消费者 3.1手动ACK 一.绪论 上篇文章介绍了rabbitmq的基本知识.交换机类型实战&l ...

  4. RabbitMQ 从入门到精通 消息应答 持久化 交换机 队列 发布确认 集群 等

    RabbitMQ消息队列 RabbitMQ 的概念 RabbitMQ 是一个消息中间件:它接受并转发消息.你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快 ...

  5. 详解,最新整理,RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略

    消息队列常见问题处理 分布式事务 什么是分布式事务 我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之前需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用 ...

  6. RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略

    消息队列常见问题处理 分布式事务 什么是分布式事务 我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之前需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用 ...

  7. rabbit和mysql事务_分布式事务原理及SpringBoot整合RabbitMQ实现可靠事件,TCC事务模型及接口幂等性...

    分布式事务 我们知道在单数据库系统中,实现数据的一致性,通过数据库的事务来处理比较简单.在微服务或分布式系统中,各个独立的服务都会有自己的数据库,而不是在同一个数据库中,所以当一组事务(如商品交易中, ...

  8. springBoot整合rabbitmq并测试五种常用模型

    之前我们记录了原生java代码使用rabbitmq的方法,很简单,类似于原生jdbc代码一样,将连接对象抽离出来作为工具类,生产者和消费者通过工具类获取连接对象,进而获取通道对象,再注册交换机或者是队 ...

  9. RabbitMQ真延时队列实现消息提醒功能

    RabbitMQ真延时队列实现消息提醒功能 一.需求场景 用户可以制定多个计划,同时可给该计划设置是否需要到点提醒,且中途可以取消提醒或修改提醒时间. 二.需要解决的问题 学习过rabbitmq的同学 ...

最新文章

  1. 逆水寒服务器新消息,游戏新消息:逆水寒太火爆服务器爆满王思聪都挤不进去...
  2. Treelabeling 异或性质,位运算,染色法,二分图(2100)
  3. C语言课后习题(63)
  4. mysql获取上月的某一天
  5. Voxengo Peakbuster for mac(音频谐波增强插件)
  6. 用华为模拟器ENSP构造校园网(第三天)配置总网络拓扑DHCP中继和实现路由通信
  7. 详解数字音频接口DAI
  8. ai跟随路径_怎么在ai中创建文本路径?Ai中怎样沿路径创建文本?
  9. python打印列表的下标和值的例子:
  10. 密码框password调用数字键盘
  11. web前端学习路线图
  12. 计算机少儿编程考级,少儿编程能力怎么评定?有什么考级可以参加?
  13. iOS开发 info.plist设置app启动页面
  14. Notepad++的字体设置加Consolas和微软雅黑混合字体转载
  15. 我的资源里边有好东西(gmt、panoply等等)
  16. 如何做好英文外贸B2C网站的优化
  17. 最新FreeBbs论坛社区APP源码
  18. CodeForces - 29C Mail Stamps【离散化】【DFS】
  19. 理化计算程序必备:CoolProp+UnitsNet
  20. Flask_migrate最新攻略,教你怎么优雅的使用Flask_migrate

热门文章

  1. 用java写我的世界
  2. javaweb——模拟用户登录和新闻发布
  3. php datedif,关于VB的DateDiff()函数与EXcel DateDif()函数
  4. Eclipse中图标含义
  5. onreadystatechange的认识
  6. 10分钟学会搭建sovits第二篇
  7. c语言圈复杂度switch,干货|C语言switch\/case圈复杂度优化重构
  8. JavaScript 获取随机数
  9. Win10设置打不开
  10. matlab 单位圆网格,MATLAB-曲面与网格图命令