springboot集成rabbitMQ实现消息的推送
RabbitMQ消息中间件组件,目前比较流行的消息中间件有:RabbitMQ、RocketMQ、ActiveMQ、Kafka等。
我的代码中用的是RabbitMQ,先介绍几个概念:
一:消息队列的特性如下:
- 异步性,将耗时的同步操作通过以发送消息的方式进行了异步化处理,减少了同步等待的时间。
- 松耦合,消息队列减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,而不用关心彼此的实现细节,只要定义好消息的格式就行。
- 分布式,通过对消费者的横向扩展,降低了消息队列阻塞的风险,以及单个消费者产生单点故障的可能性(当然消息队列本身也可以做成分布式集群)。
- 可靠性,消息队列一般会把接收到的消息存储到本地硬盘上(当消息被处理完之后,存储信息根据不同的消息队列实现,有可能将其删除),这样即使应用挂掉或者消息队列本身挂掉,消息也能够重新加载。
二:AMQP(Advanced Message Queuing Protocol)高级消息队列协议,
是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种客户端,如 Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 三:消息发送过程中的几个对象
发消息者 send
队列 queue
收消息者 receive
交换器 Exchange
路由 routing
发送者------交换器(路由并且过滤消息)-------队列(队列存储并发送消息)------接收者
虚拟主机、交换机、队列和绑定。
一个虚拟主机持有一组交换机、队列、绑定。RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止 A 组访问 B 组的交换机/队列/绑定,必须为 A 和 B 分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。
交换器常用的规则:topic(最灵活。根据路由键可以模糊匹配)fanout(广播形式的) 一下是我的测试代码。都测试通过!
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
@Configuration public class RabbitConfig {@Beanpublic Queue helloQueue() {return new Queue("hello");}@Beanpublic Queue neoQueue() {return new Queue("neo");}@Beanpublic Queue objectQueue() {return new Queue("object");}}
定义三个队列名称分别为hello.neo.object
发送者
@Component public class HelloSend {public static final Logger logger = LoggerFactory.getLogger(HelloSend.class);@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(){String message = "hello "+new Date();logger.info("message:"+message);rabbitTemplate.convertAndSend("hello",message);logger.info("队列发送成功");} }
- 接受者 监听hello队列
@Component @RabbitListener(queues = "hello") public class HelloReceiver {@RabbitHandlerpublic void process(String hello) {System.out.println("Receiver : " + hello);} }
- 以上是最简单的直接向队列里面发送信息。接收者监听队列并接受信息 TOPIC模式的信息推送(分红色的代码前后对应。如果不对应需要在队列上添加(name="")指明调用的具体队列名称
package com.neo.rabbitmq.send.topic;import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** topic交换机模式配置类: 描述信息** @author liyy* @date 2018-07-18 18:36*/ @Configuration public class TopicRabbitConfig {public static final String message = "topic.message";public static final String messages = "topic.messages";/*** 定义消息队列1* @return*/@Bean(name = "queueMessage")public Queue messageQueue(){return new Queue(TopicRabbitConfig.message);}/*** 定义消息队列2* @return*/@Bean(name = "queueMessages")public Queue messagesQueue(){return new Queue(TopicRabbitConfig.messages);}/*** 定义交换机*/@Beanpublic TopicExchange exchange(){return new TopicExchange("topicExchange");}/*** 绑定消息队列到交换机,路由key:topic.message* @return*/@BeanBinding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");}/*** 绑定消息队列到交换机,路由key:topic.#* @return*/@BeanBinding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange){return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");}}
package com.neo.rabbitmq.send.topic;import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/*** : 描述信息** @author liyy* @date 2018-07-18 18:52*/ @Component public class TopicSend {@Autowiredprivate AmqpTemplate amqpTemplate;public void send1(){String context = "hi, i am message 1";amqpTemplate.convertAndSend("topicExchange","topic.message",context);}public void send2(){String context = "hi, i am message 2";amqpTemplate.convertAndSend("topicExchange","topic.messages",context);//交换机、路邮键}}
package com.neo.rabbit.topic;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "topic.message") public class TopicReceiver {@RabbitHandlerpublic void process(String message) {System.out.println("Topic Receiver1 : " + message);}}
package com.neo.rabbit.topic;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "topic.messages") public class TopicReceiver2 {@RabbitHandlerpublic void process(String message) {System.out.println("Topic Receiver2 : " + message);}}
send1的测试结果:
2018-07-19 13:51:12.325 INFO 37968 --- [cTaskExecutor-1] c.n.rabbitmq.receive.topic.TopicReceive : Topic Receiver1 hi, i am message 1
2018-07-19 13:51:12.332 INFO 37968 --- [cTaskExecutor-1] c.n.r.receive.topic.TopicReceive2 : Topic Receiver2 hi, i am message 1
send2的测试结果:
2018-07-19 13:50:02.416 INFO 51108 --- [cTaskExecutor-1] c.n.r.receive.topic.TopicReceive2 : Topic Receiver2 hi, i am message 2
- 最后再测一个广播模式的交换器
package com.neo.rabbitmq.send.fanout;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** 广播模式: 描述信息** @author liyy* @date 2018-07-19 10:44*/ @Configuration public class FanoutRabbitConfig {public static String queueA = "fanoutqueueA";public static String queueB = "fanoutqueueB";public static String queueC = "fanoutqueueC";@Bean(name = "queueA")public Queue queueA(){return new Queue(FanoutRabbitConfig.queueA);}@Bean(name = "queueB")public Queue queueB(){return new Queue(FanoutRabbitConfig.queueB);}@Bean(name = "queueC")public Queue queueC(){return new Queue(FanoutRabbitConfig.queueC);}/*** 定义广播模式的交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("exchange");}/*** 绑定交换机到队列*/@BeanBinding bindingExchangeA(Queue queueA, FanoutExchange exchange){return BindingBuilder.bind(queueA).to(exchange);}/*** 绑定交换机到队列*/@BeanBinding bindingExchangeB(Queue queueB, FanoutExchange exchange){return BindingBuilder.bind(queueB).to(exchange);}/*** 绑定交换机到队列*/@BeanBinding bindingExchangeC(Queue queueC, FanoutExchange exchange){return BindingBuilder.bind(queueC).to(exchange);}}
package com.neo.rabbitmq.send.fanout;import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/*** : 描述信息** @author liyy* @date 2018-07-18 18:52*/ @Component public class FanoutSend {@Autowiredprivate AmqpTemplate amqpTemplate;public void send(){String context = "hi, i am message 1";//routing key设置为空串amqpTemplate.convertAndSend("fanoutExchange","",context);}}
package com.neo.rabbitmq.receive.fanout;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;/*** 接受者: 描述信息监听队列,可以接受对象** @author liyy* @date 2018-07-18 17:25*/ @Component @RabbitListener(queues = "fanoutqueueA") public class FanoutReceive1 {public static final Logger logger = LoggerFactory.getLogger(FanoutReceive1.class);@RabbitHandlerpublic void process(String message){logger.info("fanoutA Receiver1 "+message);} }
package com.neo.rabbitmq.receive.fanout;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;/*** 接受者: 描述信息监听队列,可以接受对象** @author liyy* @date 2018-07-18 17:25*/ @Component @RabbitListener(queues = "fanoutqueueB") public class FanoutReceive2 {public static final Logger logger = LoggerFactory.getLogger(FanoutReceive2.class);@RabbitHandlerpublic void process(String message){logger.info("fanoutB Receiver2 "+message);} }
package com.neo.rabbitmq.receive.fanout;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;/*** 接受者: 描述信息监听队列,可以接受对象** @author liyy* @date 2018-07-18 17:25*/ @Component @RabbitListener(queues = "fanoutqueueC") public class FanoutReceive3 {public static final Logger logger = LoggerFactory.getLogger(FanoutReceive3.class);@RabbitHandlerpublic void process(String message){logger.info("fanoutC Receiver3 "+message);} }
- 测试结果:
- 2018-07-19 11:34:09.680 INFO 55432 --- [cTaskExecutor-1] c.n.r.receive.fanout.FanoutReceive3 : fanoutC Receiver3 hi, i am message 1
2018-07-19 11:34:09.680 INFO 55432 --- [cTaskExecutor-1] c.n.r.receive.fanout.FanoutReceive2 : fanoutB Receiver2 hi, i am message 1
2018-07-19 11:34:09.680 INFO 55432 --- [cTaskExecutor-1] c.n.r.receive.fanout.FanoutReceive1 : fanoutA Receiver1 hi, i am message 1
springboot集成rabbitMQ实现消息的推送相关推荐
- SpringBoot 集成 WebSocket 实现消息群发推送
一. 什么是 WebSocket WebSocket 是一种全新的协议.它将 TCP 的 Socket(套接字)应用在了web page上,从而使通信双方建立起一个保持在活动状态的连接通道,并且属于全 ...
- spring boot 集成 websocket 实现消息主动推送
前言 http协议是无状态协议,每次请求都不知道前面发生了什么,而且只可以由浏览器端请求服务器端,而不能由服务器去主动通知浏览器端,是单向的,在很多场景就不适合,比如实时的推送,消息通知或者股票等信息 ...
- SpringBoot 实现微信模板消息通知推送提醒
添加依赖 在SpringBoot项目中添加依赖 <!--微信模版消息推送三方sdk--><dependency><groupId>com.github.binary ...
- RabbitMQ(九):RabbitMQ 延迟队列,消息延迟推送(Spring boot 版)
应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持 ...
- RabbitMQ 延迟队列,消息延迟推送
应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持 ...
- SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门
1.Windows下安装RabbitMQ的步骤详解+图解(erlang+RabbitMQ) 2.SpringBoot集成RabbitMQ参考文章 1.RabbitMQ介绍 RabbitMQ是实现AMQ ...
- springboot 集成rabbitmq 实例
springboot 集成rabbitmq 实例 个人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理 ...
- netty服务器定时发送消息,netty+websocket+quartz实现消息定时推送
netty+websocket+quartz实现消息定时推送&&IM聊天室 在讲功能实现之前,我们先来捋一下底层的原理,后面附上工程结构及代码 1.NIO NIO主要包含三大核心部分: ...
- Springboot集成RabbitMQ一个完整案例
springboot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,springboot 提供了 spring-boot-starter-amqp 对消息各种支持. 1.配置pom文 ...
最新文章
- Python3中生成器介绍
- HTML转义字符大全
- python 输出“Hello, world”
- CRC32碰撞解密压缩包密码的脚本
- tomcat使用说明
- SAP Spartacus UI TabParagraphContainerComponent 的工作原理
- SQL Server--通过存储过程生成表数据的脚本
- AWS 人工智能黑客马拉松正式开启!用实力演绎科技向善!
- Oracle 20c 新特性:原生的 JSON 数据类型(Native JSON Datatype)
- onvif学习笔记9:OSD命令学习
- ctfshow-WEB-web6
- 今天加入了OSChina,准备将我的BLOG搬到这里。
- RubyOnRails with Ajax
- vue3实现商城左右联动数据---BScroll(vue3代码复制就能用)
- echart柱状图即显示数值,又显示百分比
- SVD分解和矩阵的Lipschitz条件等
- 即使是庸才我也要成为庸才中的人才
- 学生鲜花网页设计作品静态HTML网页模板源码 大学生鲜花商城网站制作 简单鲜花网站网页设计成品
- android 永久root权限,安卓 实现永久性开启adb 的root权限
- qq绑定outlook邮箱服务器,Outlook2013怎么绑定QQ邮箱