一,Spring AMQP介绍

Java API 方式编程,有什么问题?

Spring 封装 RabbitMQ 的时候,它做了什么事情?

1、管理对象(队列、交换机、绑定)

2、封装方法(发送消息、接收消息)

Spring AMQP 是对 Spring 基于 AMQP 的消息收发解决方案,它是一个抽象层,不依赖于特定的 AMQP Broker 实现和客户端的抽象,所以可以很方便地替换。比如我们可以使用 spring-rabbit 来实现。

二,Spring AMQP核心组件

1,ConnectionFactory

Spring AMQP 的连接工厂接口,用于创建连接。CachingConnectionFactory 是ConnectionFactory 的一个实现类。

2,RabbitAdmin

RabbitAdmin 是 AmqpAdmin 的实现,封装了对 RabbitMQ 的基础管理操作,比如对交换机、队列、绑定的声明和删除等。

/*** @author yhd* @createtime 2021/1/28 19:24* @description Spring AMQP configuration class*/
@SpringBootConfiguration
public class AMQPConfig {private static final String EXCHANGE_NAME = "amqp.yhd.exchange";private static final String QUEUE_NAME = "amqp.yhd.queue";private static final String ROUTING_KEY = "amqp.admin";@Beanpublic ConnectionFactory factory() {CachingConnectionFactory factory = new CachingConnectionFactory();factory.setAddresses("121.199.31.160");factory.setPort(5672);factory.setUsername("root");factory.setPassword("root");return factory;}@Beanpublic AmqpAdmin amqpAdmin( ConnectionFactory factory) {RabbitAdmin admin = new RabbitAdmin(factory);//声明一个交换机 交换机名  是否持久化  是否自动删除admin.declareExchange(new DirectExchange(EXCHANGE_NAME, true, false));//队列名 持久化 是否批处理  自动删除admin.declareQueue(new Queue(QUEUE_NAME, true, false, false));//声明一个绑定 队列名 ,绑定类型,交换机名,路由键 参数admin.declareBinding(new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, ROUTING_KEY, null));return admin;}
}

为什么我们在配置文件(Spring)或者配置类(SpringBoot)里面定义了交换机、队列、绑定关系,并没有直接调用 Channel 的 declare 的方法,Spring 在启动的时候就可以帮我们创建这些元数据?这些事情就是由 RabbitAdmin 完成的。

RabbitAdmin 实 现 了 InitializingBean 接 口 , 里 面 有 唯 一 的 一 个 方 法afterPropertiesSet(),这个方法会在 RabbitAdmin 的属性值设置完的时候被调用。

在 afterPropertiesSet ()方法中,调用了一个 initialize()方法。这里面创建了三个Collection,用来盛放交换机、队列、绑定关系。

最后依次声明返回类型为 Exchange、Queue 和 Binding 这些 Bean,底层还是调用了 Channel 的 declare 的方法。

declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
declareQueues(channel, queues.toArray(new Queue[queues.size()]));
declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));

3,Message

Message 是 Spring AMQP 对消息的封装。两个重要的属性:body:消息内容。 messageProperties:消息属性。

4,RabbitTemplate 消息模板

RabbitTemplate 是 AmqpTemplate 的一个实现(目前为止也是唯一的实现),用来简化消息的收发,支持消息的确认(Confirm)与返回(Return)。跟 JDBCTemplate一 样 , 它 封 装 了 创 建 连 接 、 创 建 消 息 信 道 、 收 发 消 息 、 消 息 格 式 转 换(ConvertAndSend→Message)、关闭信道、关闭连接等等操作。

针对于多个服务器连接,可以定义多个 Template。可以注入到任何需要收发消息的地方使用。

    /*** return callback   &&  confirm callable** @param factory* @return*/@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory factory) {RabbitTemplate template = new RabbitTemplate(factory);template.setMandatory(true);template.setReturnCallback((Message message,int replyCode,String replyText,String exchange,String routingKey) -> {});template.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {if (ack) {log.info("消息确认成功!");} else {log.info("消息确认失败!");}});return template;}

5,Messager Listener 消息监听

MessageListener

MessageListener 是 Spring AMQP 异步消息投递的监听器接口,它只有一个方法onMessage,用于处理消息队列推送来的消息,作用类似于 Java API 中的 Consumer。

MessageListenerContainer

MessageListenerContainer可以理解为MessageListener的容器,一个Container只有一个 Listener,但是可以生成多个线程使用相同的 MessageListener 同时消费消息。

Container 可以管理 Listener 的生命周期,可以用于对于消费者进行配置。

例如:动态添加移除队列、对消费者进行设置,例如 ConsumerTag、Arguments、并发、消费者数量、消息确认模式等等。

    /*** 消息监听器容器* @param connectionFactory* @return*/@Beanpublic SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);//监听的队列container.setQueues(new Queue(QUEUE_NAME, true, false, false));// 最小消费者数container.setConcurrentConsumers(1);// 最大的消费者数量container.setMaxConcurrentConsumers(5);//是否重回队列container.setDefaultRequeueRejected(false);//签收模式container.setAcknowledgeMode(AcknowledgeMode.AUTO);container.setExposeListenerChannel(true);//消费端的标签策略container.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString());return container;}

在 SpringBoot2.0 中新增了一个 DirectMessageListenerContainer。

MessageListenerContainerFactory

Spring 去整合 IBM MQ、JMS、Kafka 也是这么做的。

    /*** * @param connectionFactory* @return*/@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAcknowledgeMode(AcknowledgeMode.NONE);factory.setAutoStartup(true);return factory;}

可以在消费者上指定,当我们需要监听多个 RabbitMQ 的服务器的时候,指定不同的 MessageListenerContainerFactory。

@Slf4j
@Component
@PropertySource("classpath:application.properties")
@RabbitListener(queues = "${amqp.yhd.queue}", containerFactory = "rabbitListenerContainerFactory")
public class FirstConsumer {@RabbitHandlerpublic void process(@Payload String message) {log.info("First Queue received msg : {}", message);}
}

继承关系

6,转换器 MessageConvertor

MessageConvertor 的 作用?

RabbitMQ 的消息在网络传输中需要转换成 byte[](字节数组)进行发送,消费者需要对字节数组进行解析。

在 Spring AMQP 中,消息会被封装为 org.springframework.amqp.core.Message对象。消息的序列化和反序列化,就是处理 Message 的消息体 body 对象。

如果消息已经是 byte[]格式,就不需要转换。

如果是 String,会转换成 byte[]。

如果是 Java 对象,会使用 JDK 序列化将对象转换为 byte[](体积大,效率差)。

在 调 用 RabbitTemplate 的 convertAndSend() 方 法 发 送 消 息 时 , 会 使 用MessageConvertor 进行消息的序列化,默认使用 SimpleMessageConverter。

在某些情况下,我们需要选择其他的高效的序列化工具。如果我们不想在每次发送消息时自己处理消息,就可以直接定义一个 MessageConvertor。

@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return rabbitTemplate;
}

MessageConvertor 如何 工作?

调 用 了 RabbitTemplate 的 convertAndSend() 方 法 时 会 使 用 对 应 的MessageConvertor 进行消息的序列化和反序列化。

序列化:Object —— Json —— Message(body) —— byte[]

反序列化:byte[] ——Message —— Json —— Object

有 哪些 MessageConvertor ?

在 Spring 中提供了一个默认的转换器:SimpleMessageConverter。

Jackson2JsonMessageConverter(RbbitMQ 自带):将对象转换为 json,然后再转换成字节数组进行传递。

如何 自定义 MessageConverter ?

例如:我们要使用 Gson 格式化消息:

创建一个类,实现 MessageConverter 接口,重写 toMessage()和 fromMessage()方法。

toMessage(): Java 对象转换为 Message
fromMessage(): Message 对象转换为 Java 对象

三,SpringBoot集成RabbitMQ

为什么没有定义 Spring AMQP 的任何一个对象,也能实现消息的收发?Spring Boot 做了什么?

老套路

源码:RabbitAutoConfiguration

SpringAMQP详解相关推荐

  1. RabbitMQ详解以及spring对RabbitMQ的集成(附带部分源码解读)

    一·简介 1丶为什么要使用消息队列 https://wenku.baidu.com/view/e297236f83c4bb4cf7ecd193.html ①异步处理(高并发) ②系统解耦 ③流量削锋 ...

  2. 从命令行到IDE,版本管理工具Git详解(远程仓库创建+命令行讲解+IDEA集成使用)

    首先,Git已经并不只是GitHub,而是所有基于Git的平台,只要在你的电脑上面下载了Git,你就可以通过Git去管理"基于Git的平台"上的代码,常用的平台有GitHub.Gi ...

  3. JVM年轻代,老年代,永久代详解​​​​​​​

    秉承不重复造轮子的原则,查看印象笔记分享连接↓↓↓↓ 传送门:JVM年轻代,老年代,永久代详解 速读摘要 最近被问到了这个问题,解释的不是很清晰,有一些概念略微模糊,在此进行整理和记录,分享给大家.在 ...

  4. docker常用命令详解

    docker常用命令详解 本文只记录docker命令在大部分情境下的使用,如果想了解每一个选项的细节,请参考官方文档,这里只作为自己以后的备忘记录下来. 根据自己的理解,总的来说分为以下几种: Doc ...

  5. 通俗易懂word2vec详解词嵌入-深度学习

    https://blog.csdn.net/just_so_so_fnc/article/details/103304995 skip-gram 原理没看完 https://blog.csdn.net ...

  6. 深度学习优化函数详解(5)-- Nesterov accelerated gradient (NAG) 优化算法

    深度学习优化函数详解系列目录 深度学习优化函数详解(0)– 线性回归问题 深度学习优化函数详解(1)– Gradient Descent 梯度下降法 深度学习优化函数详解(2)– SGD 随机梯度下降 ...

  7. CUDA之nvidia-smi命令详解---gpu

    nvidia-smi是用来查看GPU使用情况的.我常用这个命令判断哪几块GPU空闲,但是最近的GPU使用状态让我很困惑,于是把nvidia-smi命令显示的GPU使用表中各个内容的具体含义解释一下. ...

  8. Bert代码详解(一)重点详细

    这是bert的pytorch版本(与tensorflow一样的,这个更简单些,这个看懂了,tf也能看懂),地址:https://github.com/huggingface/pytorch-pretr ...

  9. CRF(条件随机场)与Viterbi(维特比)算法原理详解

    摘自:https://mp.weixin.qq.com/s/GXbFxlExDtjtQe-OPwfokA https://www.cnblogs.com/zhibei/p/9391014.html C ...

最新文章

  1. 什么是折线图?怎样用Python绘制?怎么用?终于有人讲明白了(附代码)
  2. mysql5.0修改字符集,查看mysql字符集及修改表结构
  3. 【大话Hibernate】Hibernate的核心接口和类
  4. iOS10 资料汇总:值得回看的 10 篇 iOS 热文
  5. 函数使用了堆栈的字节超过_在C语言中如何访问堆栈?
  6. 基于win32的windows画板程序
  7. python向it新增5个元素_Python序列、元组、列表、集合及字典笔记整理
  8. 开源中国大佬是怎么用Selenium做自动化web测试的
  9. ODOO从哪里开始??OpenERP的第一根线头儿
  10. 安装oh-my-zsh+、插件zsh-syntax-highlighting、zsh-autosuggestions、zsh-autosuggestions、autojump修改配置
  11. 计算机一寸照编辑教程,超简单的一寸照制作及排版教程,再也不花冤枉钱!
  12. 一款开源短视频去水印程序,大爱!
  13. oeasy教您玩转vim - 88 - # 自动命令autocmd
  14. 电机控制中空间矢量脉宽调制SVPWM与simulink仿真详解
  15. Windows 10 D盘操作需要管理员权限
  16. Python添加下拉菜单
  17. 用python画雪花飘落_python实现雪花飘落效果实例讲解
  18. 你不知道的前端图片处理(万字长文,建议收藏)
  19. python传输视频文件_Python视频传输
  20. 被印在纸币上的七大科学家

热门文章

  1. 面试官说我的简历像包装的,面试后面试官向我道歉了
  2. acrotray.exe
  3. taobao.products.get( 获取产品列表 )接口,淘宝店铺商品列表官方接口,淘宝R2店铺上传接口,淘宝oAuth2.0接口
  4. [FML]学习笔记三 Rademacher Complexity
  5. c语言之排序算法(一)
  6. android 滚轮刻度尺的实现
  7. 2014年最酷的30个JavaScript库
  8. excel减法函数_Excel公式练习35: 拆分连字符分隔的数字并放置在同一列中
  9. 自由自在亮法宝 远离流感提升免疫力
  10. Unity 3D 制作贴纸,模型任意形状贴画制作,简易贴花工具,撞击模型凹凸效果