springboot集成rabbitmq:fanout、topic
编写Fanout模式的消息接收
其他模块和上文保持一致https://blog.csdn.net/weixin_59334478/article/details/127740411?spm=1001.2014.3001.5501
ReceiveServiceImpl实现类
package com.it.rabbitmq.impl;import com.it.rabbitmq.ReceiveService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService {//注入amqp的模板类,利用这个对象来发送和接收消息@Resourceprivate AmqpTemplate amqpTemplate;@Overridepublic void receiveMessage() {/*** 发送消息* 参数1为交换机名称* 参数2位RoutingKey* 参数3为具体发送的消息数据*/String message= (String) amqpTemplate.receiveAndConvert("bootDirectQueue");System.out.println(message);}/*** @RabbitListener:用于标记当前方法是一个rabbitmq的消息监听方法,作用是持续性的接收消息* 这个方法不需要手动调用,spring会自动监听* 属性queues:用于指定一个已经存在的队列名称,用于队列的监听* @param message 参数就是接收到的具体消息数据*/@RabbitListener(queues = {"bootDirectQueue"})public void directReceive(String message) {System.out.println("监听器接收的消息:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name="fanoutExchange",type = "fanout"))})public void fanoutReceive1(String message) {System.out.println("fanoutReceive1监听器接收的消息:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name="fanoutExchange",type = "fanout"))})public void fanoutReceive2(String message) {System.out.println("fanoutReceive2监听器接收的消息:"+message);}}
编写Fanout模式的消息接收
1.SendService接口
package com.it.rabbitmq;public interface SendService {void sendMessage(String message);void sendFanoutMessage(String message);
}
SendServiceImpl实现类
package com.it.rabbitmq.impl;import com.it.rabbitmq.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service("sendService")
public class SendServiceImpl implements SendService {//注入amqp的模板类,利用这个对象来发送和接收消息@Resourceprivate AmqpTemplate amqpTemplate;@Overridepublic void sendMessage(String message) {/*** 发送消息* 参数1为交换机名称* 参数2位RoutingKey* 参数3为具体发送的消息数据*/amqpTemplate.convertAndSend("bootDirectExchange", "bootDirectRouting", message);}@Overridepublic void sendFanoutMessage(String message) {amqpTemplate.convertAndSend("fanoutExchange","",message);}
}
2.RabbitMQConfig类
package com.it.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {//配置一个direct类型的交换机@Beanpublic DirectExchange directExchange() {return new DirectExchange("bootDirectExchange");}//配置一个队列@Beanpublic Queue directQueue() {return new Queue("bootDirectQueue");}/*** 配置一个交换机和队列的绑定** @param directQueue 需要绑定的队列对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入* @param directQueue 需要绑定的交换机对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入* @return*/@Beanpublic Binding directBinding(Queue directQueue, DirectExchange directExchange) {//完成绑定:参数1为需要绑定的队列,参数2为需要绑定的交换机,参数3为需要绑定的routingkeyreturn BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRouting");}//配置一个fanout类型的交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}}
3.运行主函数
package com.it;import com.it.rabbitmq.SendService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;@SpringBootApplication
public class RabbitmqSpringbootApplication {public static void main(String[] args) {ApplicationContext ac = SpringApplication.run(RabbitmqSpringbootApplication.class, args);SendService sendService = (SendService) ac.getBean("sendService");
// sendService.sendMessage("boot的测试数据");sendService.sendFanoutMessage("boot的fanout测试数据!");}}
编写Topic模式消息接收
ReceiveServiceImpl实现类
package com.it.rabbitmq.impl;import com.it.rabbitmq.ReceiveService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService {//注入amqp的模板类,利用这个对象来发送和接收消息@Resourceprivate AmqpTemplate amqpTemplate;@Overridepublic void receiveMessage() {/*** 发送消息* 参数1为交换机名称* 参数2位RoutingKey* 参数3为具体发送的消息数据*/String message= (String) amqpTemplate.receiveAndConvert("bootDirectQueue");System.out.println(message);}/*** @RabbitListener:用于标记当前方法是一个rabbitmq的消息监听方法,作用是持续性的接收消息* 这个方法不需要手动调用,spring会自动监听* 属性queues:用于指定一个已经存在的队列名称,用于队列的监听* @param message 参数就是接收到的具体消息数据*/@RabbitListener(queues = {"bootDirectQueue"})public void directReceive(String message) {System.out.println("监听器接收的消息:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name="fanoutExchange",type = "fanout"))})public void fanoutReceive1(String message) {System.out.println("fanoutReceive1监听器接收的消息:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name="fanoutExchange",type = "fanout"))})public void fanoutReceive2(String message) {System.out.println("fanoutReceive2监听器接收的消息:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue("topic1"),key = {"aa"},exchange = @Exchange(name="topicExchange",type = "topic"))})public void topicReceive1(String message){System.out.println("topic1消费者---aa---"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue("topic2"),key = {"aa.*"},exchange = @Exchange(name="topicExchange",type = "topic"))})public void topicReceive2(String message){System.out.println("topic2消费者---aa---"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue("topic3"),key = {"aa.#"},exchange = @Exchange(name="topicExchange",type = "topic"))})public void topicReceive3(String message){System.out.println("topic3消费者---aa---"+message);}}
编写Topic模式消息发送
1.SendService接口
package com.it.rabbitmq;public interface SendService {void sendMessage(String message);void sendFanoutMessage(String message);void sendTopicMessage(String message);
}
SendServiceImpl类
package com.it.rabbitmq.impl;import com.it.rabbitmq.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service("sendService")
public class SendServiceImpl implements SendService {//注入amqp的模板类,利用这个对象来发送和接收消息@Resourceprivate AmqpTemplate amqpTemplate;@Overridepublic void sendMessage(String message) {/*** 发送消息* 参数1为交换机名称* 参数2位RoutingKey* 参数3为具体发送的消息数据*/amqpTemplate.convertAndSend("bootDirectExchange", "bootDirectRouting", message);}@Overridepublic void sendFanoutMessage(String message) {amqpTemplate.convertAndSend("fanoutExchange","",message);}@Overridepublic void sendTopicMessage(String message) {amqpTemplate.convertAndSend("topicExchange","",message);}}
2.RabbitMQConfig类,提前声明一个topic的交换机
package com.it.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {//配置一个direct类型的交换机@Beanpublic DirectExchange directExchange() {return new DirectExchange("bootDirectExchange");}//配置一个队列@Beanpublic Queue directQueue() {return new Queue("bootDirectQueue");}/*** 配置一个交换机和队列的绑定** @param directQueue 需要绑定的队列对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入* @param directQueue 需要绑定的交换机对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入* @return*/@Beanpublic Binding directBinding(Queue directQueue, DirectExchange directExchange) {//完成绑定:参数1为需要绑定的队列,参数2为需要绑定的交换机,参数3为需要绑定的routingkeyreturn BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRouting");}//配置一个fanout类型的交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}//配置一个topic类型的交换机@Beanpublic TopicExchange topicExchange() {return new TopicExchange("topicExchange");}}
3.运行主函数
package com.it;import com.it.rabbitmq.SendService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;@SpringBootApplication
public class RabbitmqSpringbootApplication {public static void main(String[] args) {ApplicationContext ac = SpringApplication.run(RabbitmqSpringbootApplication.class, args);SendService sendService = (SendService) ac.getBean("sendService");
// sendService.sendMessage("boot的测试数据");// sendService.sendFanoutMessage("boot的fanout测试数据!");sendService.sendTopicMessage("boot的topic测试数据,key:aa");}}
功能测试:
查看接收类
修改发送信息的routingkey
修改发送信息的routingkey
springboot集成rabbitmq:fanout、topic相关推荐
- springboot 集成rabbitmq 实例
springboot 集成rabbitmq 实例 个人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理 ...
- RabbitMQ——SpringBoot集成RabbitMQ
文章目录: 1.创建一个SpringBoot工程--消息发送者 1.创建一个SpringBoot工程--消息接收者 3.测试结果 3.1 direct 3.2 fanout 3.3 topic 3.4 ...
- SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门
1.Windows下安装RabbitMQ的步骤详解+图解(erlang+RabbitMQ) 2.SpringBoot集成RabbitMQ参考文章 1.RabbitMQ介绍 RabbitMQ是实现AMQ ...
- Springboot集成rabbitMQ之mandatory和备份交换机
Springboot集成rabbitMQ之mandatory和备份交换机 mandatory 之前编写的消息队列代码中,通过重写ConfirmCallback中的confirm方法实现了消息送达的确认 ...
- Springboot集成RabbitMQ一个完整案例
springboot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,springboot 提供了 spring-boot-starter-amqp 对消息各种支持. 1.配置pom文 ...
- springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式
springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式 说明: RabbitMQ消息的可靠投递 在使用 RabbitMQ 的时 ...
- Springboot集成rabbitmq实现延时队列
Springboot集成rabbitmq实现延时队列 什么是延时队列? 列举几个使用场景: 常见的种类有: 延时任务-实现方式: 详细信息:[https://www.cnblogs.com/JonaL ...
- springboot集成rabbitmq商品秒杀业务实战(流量削峰)
消息队列如何实现流量削峰? 要对流量进行削峰,最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送 ...
- springboot集成rabbitMQ实现消息的推送
RabbitMQ消息中间件组件,目前比较流行的消息中间件有:RabbitMQ.RocketMQ.ActiveMQ.Kafka等. 我的代码中用的是RabbitMQ,先介绍几个概念: 一:消息队列的特性 ...
最新文章
- 基于visual Studio2013解决C语言竞赛题之1085相邻之和素数
- ecplice中class.forname一直报错_英雄联盟LOL闪退,弹出吉格斯报错BUGSPLAT
- K8s Pod 钩子生命周期
- [恢]hdu 1407
- C语言float数据类型介绍、示例和应用经验
- 活动选择问题 贪心
- Android ASCII编码转换成相对应字符
- 什么是文本分析,文本分析主要运用到了哪些关键技术?
- arcgis路网密度计算、提取中心线、面积计算
- 微信小程序支持分享到朋友圈了
- MATLAB数据导入
- 雅虎免费邮箱开通POP3和自动转发的方法
- 详解Yarn中三种资源调度器(FIFO Scheduler、Capacity Scheduler、Fair Scheduler)和配置自定义队列实现任务提交不同队列
- 基于java springboot租房平台设计,公寓租赁系统
- Redis 安装+设置密码
- 少年之文明与国之文明——---从奥运会看国人素质之飞跃
- OR-CAD CAPTURE学习笔记——ERROR(ORCAP-11010)
- 发哥莫慌!这56亿让区块链帮你搞定
- CSS盒子塌陷及解决方法
- Linux中硬盘分区、创建逻辑卷